Migrating A Large ASCII Data Set From CSV To Oracle To DSE/Cassandra – A look at storage impact

The objective of this exercise is to demonstrate how the migration of data from CSV to Oracle to Cassandra can change the required underlying storage volume of the data. How do data volumes change between pure ASCII source records versus records stored in tables in an Oracle database versus records stored in tables in a DSE/Cassandra database?

Here is the high-level plan:

  • Start out with a 1.4 GB CSV data file containing 6.1 million crime records from the Chicago Police Department for the period 2001-2016
  • Define this file as an external data file in Oracle 12c
  • Read the external data file into a physical Oracle database table
  • Use the Spark DataFrame capability introduced in Apache Spark 1.3 to load data from tables in the Oracle database via Oracle’s JDBC thin driver
  • Save the data in the dataframe to Cassandra
  • Compare the volumes used by the data in Oracle versus DSE/Cassandra

Pre-requisites

DataStax Enterprise (release 5.0.2 at time of publication)

You’ll need a working installation of DataStax Enterprise.

– Ubuntu/Debian – https://docs.datastax.com/en/datastax_enterprise/5.0/datastax_enterprise/install/installDEBdse.html
– Red Hat/Fedora/CentOS/Oracle Linux – https://docs.datastax.com/en/datastax_enterprise/5.0/datastax_enterprise/install/installRHELdse.html

To setup your environment, you’ll also need the following resources:

– Python 2.7
– Java 8
– For Red Hat, CentOS and Fedora, install EPEL (Extra Packages for Enterprise Linux).
– An Oracle database:
– In my example the database name is orcl
– A running Oracle tns listener. In my example I’m using the default port of 1521.
– You are able to make a tns connection to the Oracle database e.g. “sqlplus user/password@service_name”.
– The Oracle thin JDBC driver. You can download the ojdbc JAR file from:
http://www.oracle.com/technetwork/database/features/jdbc/jdbc-drivers-12c-download-1958347.html
I’ve used ojdbc7.jar which is certified for use with both JDK7 and JDK8
In my 12c Oracle VM Firefox this downloaded to /app/oracle/downloads/ so you’ll see the path referenced in the instructions below.

As we will connect from Spark, using the Oracle jdbc driver, to the “orcl” database on TNS port 1521, all these components must be working correctly.

Now on to installing DataStax Enterprise and playing with some data!


Set Up DataStax Components

Installation instructions for DSE are provided at the top of this doc. I’ll show the instructions for Red Hat/CentOS/Fedora here. I’m using an Oracle Enterprise Linux VirtualBox instance.

Add The DataStax Repo


As root create “`/etc/yum.repos.d/datastax.repo“`

# vi /etc/yum.repos.d/datastax.repo

Paste in these lines:

[datastax] 
name = DataStax Repo for DataStax Enterprise
baseurl=https://datastaxrepo_gmail.com:utJVKEg4lKeaWTX@rpm.datastax.com/enterprise
enabled=1
gpgcheck=0

Import The DataStax Repo Key

rpm --import http://rpm.datastax.com/rpm/repo_key 

Install DSE Components

DSE Platform

# yum install dse-full-5.0.1-1

DataStax OpsCenter

# yum install opscenter --> 6.0.2.1

DataStax OpsCenter Agent

# yum install datastax-agent --> 6.0.2.1

Enable Search & Analytics

We want to use Search (Solr) and Analytics (Spark) so we need to delete the default datacentre and restart the cluster (if its already running) in SearchAnalytics mode.

Stop the service if it’s running.

#  service dse stop
Stopping DSE daemon : dse                                  [  OK  ]

Enable Solr and Spark by changing the flag from “0” to “1” in:

# vi /etc/default/dse

e.g.:

# Start the node in DSE Search mode
SOLR_ENABLED=1

# Start the node in Spark mode
SPARK_ENABLED=1

Delete the old (Cassandra-only) datacentre databases if they exist:

# rm -rf /var/lib/cassandra/data/*
# rm -rf /var/lib/cassandra/saved_caches/*
# rm -rf /var/lib/cassandra/commitlog/*
# rm -rf /var/lib/cassandra/hints/*

Remove the old system.log if it exists:

# rm -rf /var/log/cassandra/system.log 

Now restart DSE:

$ sudo service DSE restart

After a few minutes use nodetool to check that all is up and running (check for “UN” next to the IP address):

$ nodetool status
Datacenter: SearchAnalytics
===========================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
--  Address    Load       Owns    Host ID                               Token                                    Rack
UN  127.0.0.1  346.89 KB  ?       8e6fa3db-9018-47f0-96df-8c78067fddaa  6840808785095143619                      rack1

Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless

You should also check that you can log into cqlsh:

$ cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.0.7.1159 | DSE 5.0.1 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh> 

Type exist in cqlsh to return to the shell prompt.

Identify Spark Master

We use the new DSE 5.0 format for the dse tool to get the address of the Spark Master in our cluster.
As we are using a single node for our cluster it will be no surprise that the Spark Master is also on our single node!

$ dse client-tool spark master-address
spark://127.0.0.1:7077

OK, let’s go get some data.

Load CSV Data Into Oracle

Our source file is an extract of the Chicago Crime Database 2001 to 2016, described here: http://catalog.data.gov/dataset/crimes-2001-to-present-398a4 that can be found here:https://data.cityofchicago.org/api/views/ijzp-q8t2/rows.csv?accessType=DOWNLOAD

The dataset is structured with the following fields:

– ID
– Case Number
– Date
– Block
– UCR
– Primary Type
– Description
– Location Description
– Arrest
– Domestic
– Beat
– District
– Ward
– Community Area
– FBI Code
– X Coordinate
– Y Coordinate
– Year
– Updated On
– Latitude
– Longitude
– Location

The records in the data file look like this, with a header record:

ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location

5784095,HN594666,09/17/2007 05:10:00 PM,021XX W WARREN BLVD,0486,BATTERY,DOMESTIC BATTERY SIMPLE,CHA APARTMENT,false,true,1332,012,2,28,08B,1162213,1900335,2007,04/15/2016 08:55:02 AM,41.882182894,-87.679812546,"(41.882182894, -87.679812546)"
5784096,HN594881,09/17/2007 06:35:00 PM,013XX S RACINE AVE,0460,BATTERY,SIMPLE,STREET,false,false,1231,012,2,28,08B,1168579,1894058,2007,04/15/2016 08:55:02 AM,41.864822916,-87.656618405,"(41.864822916, -87.656618405)"

We will have to build a table in Oracle for the first import of the data. When we have the data in Oracle we can assess the volume of space occupied by database tables.
We then migrate the data to Cassandra from Oracle using Spark and see how much space is occupied in Cassandra.

Wordcount tells me there are just over 6 million records (about 1.4 GB CSV file):

$ cat Crimes_-_2001_to_present.csv | wc
 6156888 69665305 1449194734

Create An Oracle User

We need an owner of the tables we’re going to create. We’ll create a user called BULK_LOAD.
To do this we need to log into Oracle SQLPlus as a privileged user (SYS or SYSTEM) to get to the SQL prompt

  • depending on your privileges you may be asked for a password when you log in as SYS or SYSTEM
  • If you’re already logged into SQLPlus as a non-system user, just type “connect / as sysdba” at the SQL> prompt
  • Log in to SQLPlus:

    $ sqlplus / AS sysdba
     
    Connected.

    Run this command to allow local User ID’s to be created:

    SQL> ALTER SESSION SET "_ORACLE_SCRIPT"=TRUE;

    Create the user BULK_LOAD and set the password to be the same as the user name:

    SQL> CREATE USER bulk_load IDENTIFIED BY bulk_load;
     
    USER created.

    Grant everything to BULK_LOAD (the old-fashioned way):

    SQL> GRANT CONNECT, resource, dba TO bulk_load;
     
    GRANT succeeded.

    Set default user and temporary tablespaces:

    SQL> ALTER USER bulk_load DEFAULT tablespace "USERS";
     
    USER altered.
     
    SQL> ALTER USER bulk_load TEMPORARY tablespace "TEMP";
     
    USER altered.

    Define An External File Directory

    This is how Oracle reads and manages flat files, allowing them to be manipulated as virtual tables. Great for loading data!

    SQL> CREATE OR REPLACE directory xtern_data_dir AS '/app/oracle/downloads';
     
    Directory created.

    Grant access to our BULK_LOAD user:

    SQL> GRANT READ,WRITE ON directory xtern_data_dir TO bulk_load;
     
    GRANT succeeded.

    Define An External File Table

    This is how Oracle reads a text file and manages it as an external table.
    Connect as our user:

    SQL> CONNECT bulk_load/bulk_load;
    Connected.

    Create the ‘virtual’ table based on the source data file.

    >The data is rather inconsistent in terms of enforcing format and type. I don’t want to have to spend a lifetime data-cleansing all 6.1 m records before starting this project so the CSV load into Oracle is in pure text format.

    In the first test we will see what happens to data when it is transferred into Oracle and then to DSE/Cassandra, all in text format. This will make an interesting baseline comparison.

    The second part of the exercise will be to build a more representative test by storing the data using a wider variety of datatypes, e.g. decimal, binary, date etc.

    DROP TABLE xternal_crime_data;
    CREATE TABLE xternal_crime_data
    (ID             			varchar2(30),
    Case_Number				varchar2(30),
    Incident_Date				varchar2(30),
    Block					varchar2(60),
    IUCR					varchar2(30),
    Primary_Type				varchar2(60),
    Description				varchar2(120),
    Location_Description		varchar2(60),
    Arrest					varchar2(60),
    Domestic				varchar2(60),
    Beat					varchar2(30),
    District				varchar2(30),
    Ward					varchar2(30),
    Community_Area				varchar2(30),
    FBI_Code				varchar2(10),
    X_Coordinate				varchar2(30),
    Y_Coordinate				varchar2(30),
    YEAR					varchar2(30),
    Updated_On				varchar2(30),
    Latitude				varchar2(30),
    Longitude				varchar2(30),
    Location				varchar2(60)
    )
    organization external
    ( DEFAULT directory xtern_data_dir
    access parameters
    ( records delimited BY newline
    FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '(' AND ')' MISSING FIELD VALUES ARE NULL)
    location ('Crimes_-_2001_to_present.csv')  
    ) REJECT LIMIT UNLIMITED;
     
    TABLE created.

    We have to do this next step:

    SQL> ALTER TABLE xternal_crime_data reject LIMIT unlimited;

    To avoid this error:

    SQL> SELECT COUNT(*) FROM xternal_crime_data;
    SELECT COUNT(*) FROM xternal_crime_data
    *
    ERROR at line 1:
    ORA-29913: error IN executing ODCIEXTTABLEFETCH callout
    ORA-30653: reject LIMIT reached

    If we describe the ‘table’ it matches the format of the file it’s based on:

    SQL> DESC xternal_crime_data
     Name					   NULL?    TYPE
     ----------------------------------------- -------- ----------------------------
     Name					   NULL?    TYPE
     ----------------------------------------- -------- ----------------------------
     ID						    VARCHAR2(30)
     CASE_NUMBER					    VARCHAR2(30)
     INCIDENT_DATE					    VARCHAR2(30)
     BLOCK						    VARCHAR2(60)
     IUCR						    VARCHAR2(30)
     PRIMARY_TYPE					    VARCHAR2(60)
     DESCRIPTION					    VARCHAR2(120)
     LOCATION_DESCRIPTION				    VARCHAR2(60)
     ARREST 					    VARCHAR2(60)
     DOMESTIC					    VARCHAR2(60)
     BEAT						    VARCHAR2(30)
     DISTRICT					    VARCHAR2(30)
     WARD						    VARCHAR2(30)
     COMMUNITY_AREA 				    VARCHAR2(30)
     FBI_CODE					    VARCHAR2(10)
     X_COORDINATE					    VARCHAR2(30)
     Y_COORDINATE					    VARCHAR2(30)
     YEAR						    VARCHAR2(30)
     UPDATED_ON					    VARCHAR2(30)
     LATITUDE					    VARCHAR2(30)
     LONGITUDE					    VARCHAR2(30)
     LOCATION					    VARCHAR2(60)

    So far Oracle hasn’t actually looked at the data yet. When we try to read the table the records will be read and validated.

    Check how many records there are in the original (source) external ‘table’ – just over 6 million:

    SQL> SELECT COUNT(*) FROM xternal_crime_data;
     
      COUNT(*)
    ----------
       6156888

    You can watch for errors or bad records while the data is being imported using tail or less on these files – log output will be to the default directory that was specified in the create external table statement:

    $ rm /app/oracle/downloads/*.bad
    $ rm /app/oracle/downloads/*.log
    

    Nothing happens until you attempt to read data from the table. You can track progress by using tail -f or less to view the log and bad files.

    Create A Table, Physically Load Data Into Oracle

    Create a physical table in the Oracle database using the “…as select” SQL syntax:

    SQL> CREATE TABLE crime_data AS SELECT * FROM xternal_crime_data;

    Let’s put a primary key and index on that table:

    SQL> CREATE INDEX crime_id ON crime_data(id);
     
    INDEX created.
    SQL> ALTER TABLE crime_data ADD CONSTRAINT crime_data_pk PRIMARY KEY(id);
     
    TABLE altered.

    We now have an index on our primary key which is now showing as a “NOT NULL” column.
    Of course in the real-world, we would make Crime ID a unique index, but for this exercise I wanted to avoid having to worry about duplicate keys.

    SQL> DESC crime_data
     Name				   NULL?    TYPE
     --------------------- -------- ----------------------------
     ID					   NOT NULL VARCHAR2(30)
     CASE_NUMBER					VARCHAR2(30)
     INCIDENT_DATE					VARCHAR2(30)
     BLOCK						    VARCHAR2(60)
     IUCR						    VARCHAR2(30)
     PRIMARY_TYPE					VARCHAR2(60)
     DESCRIPTION					VARCHAR2(120)
     LOCATION_DESCRIPTION			VARCHAR2(60)
     ARREST 					    VARCHAR2(60)
     DOMESTIC					    VARCHAR2(60)
     BEAT						    VARCHAR2(30)
     DISTRICT					    VARCHAR2(30)
     WARD						    VARCHAR2(30)
     COMMUNITY_AREA 				VARCHAR2(30)
     FBI_CODE					    VARCHAR2(10)
     X_COORDINATE					VARCHAR2(30)
     Y_COORDINATE					VARCHAR2(30)
     YEAR						    VARCHAR2(30)
     UPDATED_ON					    VARCHAR2(30)
     LATITUDE					    VARCHAR2(30)
     LONGITUDE					    VARCHAR2(30)
     LOCATION					    VARCHAR2(60)

    Check Volumes In Oracle

    At this point we have loaded the CSV source data (1.6 GB, 6.1 million rows) into an Oracle table.
    We need to find out how big is it?

    What Objects Have We Created?

    What objects does bulk_load own – the system dictionary table DBA_TABLES will tell us:

    SQL> CONNECT / AS sysdba
    SQL> SELECT TABLE_NAME FROM dba_tables WHERE owner = 'BULK_LOAD';
     
    TABLE_NAME
    --------------------------------------------------------------------------------
    CRIME_DATA
    XTERNAL_CRIME_DATA

    How Much SPace Is Oracle Using?

    Run this next query to determine total storage for a user including indexes, blobs etc. You will be prompted for the Oracle user to check, in uppercase – in my case BULK_LOAD:

    COLUMN TABLE_NAME FORMAT A32
    COLUMN OBJECT_NAME FORMAT A32
    COLUMN OWNER FORMAT A10
     
    SELECT
       owner, 
       TABLE_NAME, 
       TRUNC(SUM(bytes)/1024/1024) Meg,
       ROUND( ratio_to_report( SUM(bytes) ) OVER () * 100) Percent
    FROM
    (SELECT segment_name TABLE_NAME, owner, bytes
     FROM dba_segments
     WHERE segment_type IN ('TABLE', 'TABLE PARTITION', 'TABLE SUBPARTITION')
     UNION ALL
     SELECT i.table_name, i.owner, s.bytes
     FROM dba_indexes i, dba_segments s
     WHERE s.segment_name = i.index_name
     AND   s.owner = i.owner
     AND   s.segment_type IN ('INDEX', 'INDEX PARTITION', 'INDEX SUBPARTITION')
     UNION ALL
     SELECT l.table_name, l.owner, s.bytes
     FROM dba_lobs l, dba_segments s
     WHERE s.segment_name = l.segment_name
     AND   s.owner = l.owner
     AND   s.segment_type IN ('LOBSEGMENT', 'LOB PARTITION')
     UNION ALL
     SELECT l.table_name, l.owner, s.bytes
     FROM dba_lobs l, dba_segments s
     WHERE s.segment_name = l.index_name
     AND   s.owner = l.owner
     AND   s.segment_type = 'LOBINDEX')
    WHERE owner IN UPPER('&owner')
    GROUP BY TABLE_NAME, owner
    HAVING SUM(bytes)/1024/1024 > 10  /* Ignore really small tables */
    ORDER BY SUM(bytes) DESC
    ;
    OWNER	   TABLE_NAME				   MEG	  PERCENT
    ---------- -------------------------------- ---------- ----------
    BULK_LOAD  CRIME_DATA				  1672	      100
    

    So a 1.4GB CSV file has become 1.7GB of storage in Oracle.


    Copy Data From Oracle To Cassandra


    We now have 1.6GB of data in Oracle.
    The next objective is to see how much space that 1.7GB occupies when it is copied to Cassandra, and what impact Cassandra on-disk compression and the new Cassandra 3.0 storage engine will have on those numbers.

    So all is looking good on the Oracle side. Now time to turn to Cassandra and Spark.

    Download Oracle ojdbc7.jar

    We have to add the Oracle JDBC jar file to our Spark classpath so that Spark knows how to talk to the Oracle database.

    Using The Oracle JDBC Driver

    For this test we only need the jdbc driver file on our single (SparkMaster) node.
    In a bigger cluster we would need to distribute it to the slave nodes too.

    Update Executor Path For ojdbc7.jar In spark-defaults.conf

    Add the classpath for the ojdbc7.jar file for the executors (the path for the driver seems be required on the command line at run time as well, see below).

    # vi /etc/dse/spark/spark-defaults.conf
    

    Add the following lines pointing to the location of your ojdbc7.jar file:

    spark.driver.extraClassPath = /app/oracle/downloads/ojdbc7.jarr
    spark.executor.extraClassPath = /app/oracle/downloads/ojdbc7.jar
    

    Restart DSE

    $ sudo service dse stop
    $ sudo service dse start
    

    Start The Spark REPL

    I’m passing to the path to the ojdbc7.jar file on the command line (shouldn’t be needed as the driver path is defined in the spark-defaults.conf file now, but it seems not to work without it).

    $ dse spark --driver-class-path /app/oracle/downloads/ojdbc7.jar -deprecation
    
    scala> val crimes = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:bulk_load/bulk_load@localhost:1521/orcl", "dbtable" -> "crime_data"))
    67: warning: method load in class SQLContext is deprecated: Use read.format(source).options(options).load(). This will be removed in Spark 2.0.
             val crimes = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:bulk_load/bulk_load@localhost:1521/orcl", "dbtable" -> "crime_data"))

    There’s a deprecation warning message that you can ignore.
    Here’s the output we’re looking for, confirming that we established a valid connection to the remote database.

    crimes: org.apache.spark.sql.DataFrame = [ID: string, CASE_NUMBER: string, INCIDENT_DATE: string, BLOCK: string, IUCR: string, PRIMARY_TYPE: string, DESCRIPTION: string, LOCATION_DESCRIPTION: string, ARREST: string, DOMESTIC: string, BEAT: string, DISTRICT: string, WARD: string, COMMUNITY_AREA: string, FBI_CODE: string, X_COORDINATE: string, Y_COORDINATE: string, YEAR: string, UPDATED_ON: string, LATITUDE: string, LONGITUDE: string, LOCATION: string]

    Now that we’ve created a dataframe using the jdbc method shown above, we can use printSchema() to look at the dataframe schema. You’ll notice that it looks a lot like a table. That’s deliberate because it means that we can use it to manipulate large volumes of tabular data:

    scala> crimes.printSchema()
    root
     |-- ID: string (nullable = false)
     |-- CASE_NUMBER: string (nullable = true)
     |-- INCIDENT_DATE: string (nullable = true)
     |-- BLOCK: string (nullable = true)
     |-- IUCR: string (nullable = true)
     |-- PRIMARY_TYPE: string (nullable = true)
     |-- DESCRIPTION: string (nullable = true)
     |-- LOCATION_DESCRIPTION: string (nullable = true)
     |-- ARREST: string (nullable = true)
     |-- DOMESTIC: string (nullable = true)
     |-- BEAT: string (nullable = true)
     |-- DISTRICT: string (nullable = true)
     |-- WARD: string (nullable = true)
     |-- COMMUNITY_AREA: string (nullable = true)
     |-- FBI_CODE: string (nullable = true)
     |-- X_COORDINATE: string (nullable = true)
     |-- Y_COORDINATE: string (nullable = true)
     |-- YEAR: string (nullable = true)
     |-- UPDATED_ON: string (nullable = true)
     |-- LATITUDE: string (nullable = true)
     |-- LONGITUDE: string (nullable = true)
     |-- LOCATION: string (nullable = true)

    We can use the dataframe .show() method to display the first 20 rows in the table:

    scala> crimes.show()
    +--------+-----------+--------------------+--------------------+----+-------------------+--------------------+--------------------+------+---------+-----+--------+----+--------------+--------+------------+------------+-------+--------------------+------------+--------------------+--------------+
    |      ID|CASE_NUMBER|       INCIDENT_DATE|               BLOCK|IUCR|       PRIMARY_TYPE|         DESCRIPTION|LOCATION_DESCRIPTION|ARREST| DOMESTIC| BEAT|DISTRICT|WARD|COMMUNITY_AREA|FBI_CODE|X_COORDINATE|Y_COORDINATE|   YEAR|          UPDATED_ON|    LATITUDE|           LONGITUDE|      LOCATION|
    +--------+-----------+--------------------+--------------------+----+-------------------+--------------------+--------------------+------+---------+-----+--------+----+--------------+--------+------------+------------+-------+--------------------+------------+--------------------+--------------+
    |      ID|Case Number|                Date|               Block|IUCR|       Primary Type|         Description|Location Description|Arrest| Domestic| Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|   Year|          Updated On|    Latitude|           Longitude|      Location|
    | 3666345|   HK764692|11/18/2004 12:00:...|    118XX S LOWE AVE|0890|              THEFT|       FROM BUILDING|           RESIDENCE| false|    false| 0524|     005|  34|            53|      06|     1174110|     1826325|   2004|04/15/2016 08:55:...| 41.67883435|       -87.638323571| "(41.67883435|
    | 3666346|   HK757211|11/17/2004 08:00:...|014XX N MASSASOIT...|0560|            ASSAULT|              SIMPLE|              STREET| false|    false| 2531|     025|  29|            25|     08A|     1137744|     1909068|   2004|04/15/2016 08:55:...|41.906623511|       -87.769453017|"(41.906623511|
    | 3666347|   HK764653|11/21/2004 10:00:...|  130XX S DREXEL AVE|0910|MOTOR VEHICLE THEFT|          AUTOMOBILE|CHA PARKING LOT/G...| false|    false| 0533|     005|   9|            54|      07|     1184304|     1818785|   2004|04/15/2016 08:55:...|41.657911807|       -87.601244288|"(41.657911807|
    | 3666348|   HK761590|11/19/2004 07:45:...|063XX S HAMILTON AVE|0430|            BATTERY|AGGRAVATED: OTHER...|            SIDEWALK| false|    false| 0726|     007|  15|            67|     04B|     1163120|     1862567|   2004|04/15/2016 08:55:...|41.778524374|       -87.677540732|"(41.778524374|
    | 3666350|   HK765337|11/21/2004 06:19:...| 061XX N LINCOLN AVE|1330|  CRIMINAL TRESPASS|             TO LAND|  SMALL RETAIL STORE|  true|    false| 1711|     017|  50|            13|      26|     1152804|     1941021|   2004|04/15/2016 08:55:...|41.994019837|       -87.713281946|"(41.994019837|
    |10504143|   HZ245011|04/29/2016 01:34:...|    002XX W 104TH ST|2820|      OTHER OFFENSE|    TELEPHONE THREAT|           RESIDENCE| false|    false| 0512|     005|  34|            49|      26|     1176442|     1835975|   2016|05/06/2016 03:50:...|41.705263447|       -87.629498947|"(41.705263447|
    | 3666351|   HK766200|11/21/2004 09:00:...|  014XX N STATE PKWY|0820|              THEFT|      $500 AND UNDER|              STREET| false|    false| 1824|     018|  43|             8|      06|     1176025|     1910070|   2004|04/15/2016 08:55:...| 41.90859641|       -87.628801757| "(41.90859641|
    | 3666352|   HK756872|11/17/2004 05:00:...|   045XX W WILCOX ST|1320|    CRIMINAL DAMAGE|          TO VEHICLE|              STREET| false|    false| 1113|     011|  28|            26|      14|     1146197|     1898954|   2004|04/15/2016 08:55:...|41.878712886|       -87.738658886|"(41.878712886|
    | 3666354|   HK765887|11/19/2004 10:30:...|  035XX S PAULINA ST|0610|           BURGLARY|      FORCIBLE ENTRY|             "SCHOOL|PUBLIC|BUILDING"|false|   false|0922|           009|      11|          59|          05|1165596|             1881229|        2004|04/15/2016 08:55:...|  41.829682911|
    | 3666356|   HK766293|11/21/2004 06:00:...|     006XX W 74TH ST|1310|    CRIMINAL DAMAGE|         TO PROPERTY|           APARTMENT| false|    false| 0732|     007|  17|            68|      14|     1173139|     1855820|   2004|04/15/2016 08:55:...|41.759794269|       -87.641009607|"(41.759794269|
    | 3666357|   HK766287|11/21/2004 05:00:...|  083XX S COLFAX AVE|1320|    CRIMINAL DAMAGE|          TO VEHICLE|           RESIDENCE| false|    false| 0423|     004|   7|            46|      14|     1194953|     1850154|   2004|04/15/2016 08:55:...|41.743736757|       -87.561249019|"(41.743736757|
    | 3666358|   HK763951|11/20/2004 11:05:...|082XX S DR MARTIN...|0860|              THEFT|        RETAIL THEFT|         GAS STATION| false|    false| 0631|     006|   6|            44|      06|     1180338|     1850357|   2004|04/15/2016 08:55:...|41.744641174|       -87.614792711|"(41.744641174|
    | 3666359|   HK764785|11/20/2004 09:00:...|   056XX N MOZART ST|1320|    CRIMINAL DAMAGE|          TO VEHICLE|              STREET| false|    false| 2011|     020|  40|             2|      14|     1156332|     1937421|   2004|04/15/2016 08:55:...|41.984070441|       -87.700402289|"(41.984070441|
    | 3666360|   HK755931|11/17/2004 10:00:...|  030XX N MOBILE AVE|0484|            BATTERY|PRO EMP HANDS NO/...|             "SCHOOL|PUBLIC|BUILDING"| true|   false|2511|           025|      36|          19|         08B|1133881|             1919540|        2004|04/15/2016 08:55:...|  41.935428738|
    | 3666362|   HK765213|11/21/2004 04:00:...|  046XX N ELSTON AVE|0486|            BATTERY|DOMESTIC BATTERY ...|           APARTMENT|  true|     true| 1722|     017|  39|            14|     08B|     1146545|     1930640|   2004|04/15/2016 08:55:...|41.965655711|        -87.73657143|"(41.965655711|
    | 3666363|   HK716111|10/29/2004 02:15:...|0000X N LAVERGNE AVE|2095|          NARCOTICS|ATTEMPT POSSESSIO...|            SIDEWALK|  true|    false| 1533|     015|  28|            25|      18|     1143052|     1899700|   2004|04/15/2016 08:55:...|41.880819219|       -87.750188229|"(41.880819219|
    | 3666364|   HK764786|11/21/2004 12:10:...|     005XX N RUSH ST|0810|              THEFT|           OVER $500|PARKING LOT/GARAG...| false|    false| 1834|     018|  42|             8|      06|     1177011|     1903813|   2004|04/15/2016 08:55:...|41.891404632|       -87.625369395|"(41.891404632|
    | 3666365|   HK762241|11/20/2004 03:45:...|  061XX S JUSTINE ST|0486|            BATTERY|DOMESTIC BATTERY ...|           APARTMENT| false|     true| 0713|     007|  15|            67|     08B|     1167059|     1864042|   2004|04/15/2016 08:55:...| 41.78248862|       -87.663057935| "(41.78248862|
    | 3666366|   HK765012|11/21/2004 02:20:...|016XX N SPAULDING...|0486|            BATTERY|DOMESTIC BATTERY ...|           APARTMENT| false|     true| 1422|     014|  26|            23|     08B|     1153891|     1910739|   2004|04/15/2016 08:55:...|41.910902295|       -87.710093524|"(41.910902295|
    +--------+-----------+--------------------+--------------------+----+-------------------+--------------------+--------------------+------+---------+-----+--------+----+--------------+--------+------------+------------+-------+--------------------+------------+--------------------+--------------+
    only showing top 20 rows

    Create Cassandra KeySpace

    The first thing that we need to do in Cassandra is create a keyspace to contain the table that we will create. I’m using a replication factor of 1 because I have one node in my development cluster. For most production deployments we recommend a multi-datacenter Active-Active HA setup across geographical regions using NetworkTopologyStrategy with RF=3:

    Log into cqlsh
    > If you didn’t change the IP defaults in cassandra.yaml then just type ‘cqlsh’ – if you changed the IP to be the host IP then you may need to supply the hostname e.g. ‘cqlsh [hostname]’.

    From the cqlsh prompt, create the keyspace:

    CREATE KEYSPACE IF NOT EXISTS bulk_load WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 };
    USE bulk_load;
    

    Create A Cassandra Table

    The source data had some inconsistencies that made it simpler just to load all the columns as text. To make the comparison accurate we will store the data in Cassandra in text format too.

    We will create a Cassandra table that matches the Oracle columns, with a partition key based on the crime ID.

    In cqlsh:

    cqlsh:bulk_load> CREATE TABLE crimes (
    id                      text,
    case_number             text,
    incident_date           text,
    block                   text,
    iucr                    text,
    primary_type            text,
    description             text,
    location_description    text,
    arrest                  text,
    domestic                text,
    beat                    text,
    district                text,
    ward                    text,
    community_area          text,
    fbi_code                text,
    x_coordinate            text,
    y_coordinate            text,
    YEAR                    text,
    updated_on              text,
    latitude                text,
    longitude               text, 
    location                text, 
    PRIMARY KEY (id));

    Check it’s all there:

    cqlsh:bulk_load> DESC TABLE crimes;
     
    CREATE TABLE bulk_load.crimes (
        id text PRIMARY KEY,
        arrest text,
        beat text,
        block text,
        case_number text,
        community_area text,
        description text,
        district text,
        domestic text,
        fbi_code text,
        incident_date text,
        iucr text,
        latitude text,
        location text,
        location_description text,
        longitude text,
        primary_type text,
        updated_on text,
        ward text,
        x_coordinate text,
        y_coordinate text,
        YEAR text
    ) WITH bloom_filter_fp_chance = 0.01
        AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
        AND comment = ''
        AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
        AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
        AND crc_check_chance = 1.0
        AND dclocal_read_repair_chance = 0.1
        AND default_time_to_live = 0
        AND gc_grace_seconds = 864000
        AND max_index_interval = 2048
        AND memtable_flush_period_in_ms = 0
        AND min_index_interval = 128
        AND read_repair_chance = 0.0
        AND speculative_retry = '99PERCENTILE';

    Back to Spark…

    Change DataFrame Columns To Lower Case

    If you try to save your dataframe to Cassandra now (using the Spark-Cassandra connector) it will fail with an error message saying that columns don’t exist e.g.

    java.util.NoSuchElementException: COLUMNS NOT found IN TABLE bulk_load.crimes: ID, CASE_NUMBER, INCIDENT_DATE

    It does this even though they exist in both tables. The problem is that the connector expects the column title case in the dataframe to match the column title case in the Cassandra table. In Cassandra column names/titles are always in lower case, so the dataframe names must match.

    So we need to modify our dataframe schema…. Here’s a reminder of our raw employees dataframe schema again:
    scala> crimes.printSchema()

    scala> crimes.printSchema()
    root
     |-- ID: string (nullable = false)
     |-- CASE_NUMBER: string (nullable = true)
     |-- INCIDENT_DATE: string (nullable = true)
     |-- BLOCK: string (nullable = true)
     |-- IUCR: string (nullable = true)
     |-- PRIMARY_TYPE: string (nullable = true)
     |-- DESCRIPTION: string (nullable = true)
     |-- LOCATION_DESCRIPTION: string (nullable = true)
     |-- ARREST: string (nullable = true)
     |-- DOMESTIC: string (nullable = true)
     |-- BEAT: string (nullable = true)
     |-- DISTRICT: string (nullable = true)
     |-- WARD: string (nullable = true)
     |-- COMMUNITY_AREA: string (nullable = true)
     |-- FBI_CODE: string (nullable = true)
     |-- X_COORDINATE: string (nullable = true)
     |-- Y_COORDINATE: string (nullable = true)
     |-- YEAR: string (nullable = true)
     |-- UPDATED_ON: string (nullable = true)
     |-- LATITUDE: string (nullable = true)
     |-- LONGITUDE: string (nullable = true)
     |-- LOCATION: string (nullable = true)

    DataFrames are immutable in Spark, so we can’t modify the one we have – we need to create a new one with the correct column titles.

    Create A New Lower-Case Column Name List

    scala> val newNames = Seq("id", "case_number", "incident_date", "block","iucr","primary_type","description","location_description","arrest","domestic","beat","district","ward","community_area","fbi_code","x_coordinate","y_coordinate","year","updated_on","latitude","longitude", "location");

    Response:

    newNames: Seq[String] = List(id, case_number, incident_date, block, iucr, primary_type, description, location_description, arrest, domestic, beat, district, ward, community_area, fbi_code, x_coordinate, y_coordinate, year, updated_on, latitude, longitude, location)

    Create A New DataFrame With Lower-Case Column Names

    Now create a new dataframe based on the old one with lower case column titles:

    scala> val crimes_lc = crimes.toDF(newNames: _*)

    Response:

    crimes_lc: org.apache.spark.sql.DataFrame = [id: string, case_number: string, incident_date: string, block: string, iucr: string, primary_type: string, description: string, location_description: string, arrest: string, domestic: string, beat: string, district: string, ward: string, community_area: string, fbi_code: string, x_coordinate: string, y_coordinate: string, year: string, updated_on: string, latitude: string, longitude: string, location: string]

    And now if we look at the schema of our new datafram – hey presto! – it’s got lower case column titles.

    scala> crimes_lc.printSchema()
    root
     |-- id: string (nullable = false)
     |-- case_number: string (nullable = true)
     |-- incident_date: string (nullable = true)
     |-- block: string (nullable = true)
     |-- iucr: string (nullable = true)
     |-- primary_type: string (nullable = true)
     |-- description: string (nullable = true)
     |-- location_description: string (nullable = true)
     |-- arrest: string (nullable = true)
     |-- domestic: string (nullable = true)
     |-- beat: string (nullable = true)
     |-- district: string (nullable = true)
     |-- ward: string (nullable = true)
     |-- community_area: string (nullable = true)
     |-- fbi_code: string (nullable = true)
     |-- x_coordinate: string (nullable = true)
     |-- y_coordinate: string (nullable = true)
     |-- year: string (nullable = true)
     |-- updated_on: string (nullable = true)
     |-- latitude: string (nullable = true)
     |-- longitude: string (nullable = true)
     |-- location: string (nullable = true)

    Now we can proceed to write this data to Cassandra.

    Write The Crimes Data From Oracle To Cassandra

    Save our new dataframe CRIMES_LC to Cassandra;

    scala> crimes_lc.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "crimes", "keyspace" -> "bulk_load")).save()
     
    scala>

    This will take a while to complete – 32 minutes in my case. While it’s in progress you may be able to check and see records appearing in the table although when it was in full swing I did get time-out errors when trying to query the table in cqlsh :[

    On my single-node VM running Oracle 12c, DSE/Cassandra and DSE/Spark I was averaging about 3000 Oracle->Spark->Cassandra rows written per second, so the import of 6.1m rows finished close to the anticipated time of 33 minutes.
    YMMV

    cqlsh:bulk_load> SELECT COUNT(*) FROM crimes;
     
     COUNT
    --------
     120428
     
    (1 ROWS)

    You can track progress from your Spark Master UI – this should be at
    http://[your-host-name]:7080/

    At the end of the run you can query the data in the Cassandra CRIMES tables by partition key:

    cqlsh:bulk_load> select * from crimes where id='3666345';
    
     id      | arrest | beat | block            | case_number | community_area | description   | district | domestic | fbi_code | incident_date          | iucr | latitude    | location      | location_description | longitude     | primary_type | updated_on             | ward | x_coordinate | y_coordinate | year
    ---------+--------+------+------------------+-------------+----------------+---------------+----------+----------+----------+------------------------+------+-------------+---------------+----------------------+---------------+--------------+------------------------+------+--------------+--------------+------
     3666345 |  false | 0524 | 118XX S LOWE AVE |    HK764692 |             53 | FROM BUILDING |      005 |    false |       06 | 11/18/2004 12:00:00 PM | 0890 | 41.67883435 | "(41.67883435 |            RESIDENCE | -87.638323571 |        THEFT | 04/15/2016 08:55:02 AM |   34 |      1174110 |      1826325 | 2004
    
    

    We can get some information about data size in Cassandra from the nodetool cfstats command:

    $ nodetool cfstats bulk_load
    Keyspace: bulk_load
    	Read Count: 1
    	Read Latency: 0.113 ms.
    	Write Count: 6156888
    	Write Latency: 0.0296818494992925 ms.
    	Pending Flushes: 0
    		Table: crimes
    		SSTable count: 6
    		Space used (live): 876227198
    		Space used (total): 876227198
    		Space used by snapshots (total): 0
    		Off heap memory used (total): 8725758
    		SSTable Compression Ratio: 0.46752276291246503
    		Number of keys (estimate): 6109619
    		Memtable cell count: 19074
    		Memtable data size: 10498992
    

    This indicates that the size of the same dataset stored in ASCII/text format in the three formats is as follows:
    – CSV (text) 1.4 GB
    – Oracle (as text) 1.7 GB
    – Cassandra (as text) 876MB

    In the case of this data, stored with a more realistic real-world Cassandra data replication factor of 3, the volume of data in a multi-node Cassandra datacenter would be 876 MB x 3 = 2,700 MB.
    Assuming the customer has fairly fast machines with fast, locally attached SSD storage, each DSE/Cassandra node might be expected to store 1 TB, so an initial cluster size of 3-5 machines would be recommended.

    Of course, it is also true that this is just one test with one table. in a “real-world” situation there may potentially be only a few tables, or a large number of tables. This data will need to be re-modelled, not just copied, when it moves from an Oracle relational model to a distributed, replicated NoSQL database model. The re-modelling, or transformation, usually involves an element of data de-normalisation and duplication, and in some cases this may have a significant impact on the volume of storage and DSE/Cassandra nodes that are reqiured to accommodate the same data in DSE/Cassandra.

    Appendix 1 – Other Stuff


    An alternative interesting-looking source file is an airline flight performance statistics
    http://www.transtats.bts.gov/DL_SelectFields.asp?Table_ID=236&DB_Short_Name=On-Time
    the file expands to 200MB

    -rw-rw-r--   1 oracle oracle  22646800 Sep 14 09:58 On_Time_On_Time_Performance_2016_1.zip
    -rw-r--r--   1 oracle oracle 200290882 Mar 14  2016 On_Time_On_Time_Performance_2016_1.csv
    

    Leave a Reply

    Your email address will not be published. Required fields are marked *