Here is the high-level plan:
- Start out with a 1.4 GB CSV data file containing 6.1 million crime records from the Chicago Police Department for the period 2001-2016
- Define this file as an external data file in Oracle 12c
- Read the external data file into a physical Oracle database table
- Use the Spark DataFrame capability introduced in Apache Spark 1.3 to load data from tables in the Oracle database via Oracle's JDBC thin driver
- Save the data in the dataframe to Cassandra
- Compare the volumes used by the data in Oracle versus DSE/Cassandra
- Ubuntu/Debian - https://docs.datastax.com/en/datastax_enterprise/5.0/datastax_enterprise/install/installDEBdse.html
- Red Hat/Fedora/CentOS/Oracle Linux - https://docs.datastax.com/en/datastax_enterprise/5.0/datastax_enterprise/install/installRHELdse.html
To setup your environment, you'll also need the following resources:
- Python 2.7
- Java 8
- For Red Hat, CentOS and Fedora, install EPEL (Extra Packages for Enterprise Linux).
- An Oracle database:
- In my example the database name is orcl
- A running Oracle tns listener. In my example I'm using the default port of 1521.
- You are able to make a tns connection to the Oracle database e.g. "sqlplus user/password@service_name".
- The Oracle thin JDBC driver. You can download the ojdbc JAR file from: http://www.oracle.com/technetwork/database/features/jdbc/jdbc-drivers-12c-download-1958347.html I've used ojdbc7.jar which is certified for use with both JDK7 and JDK8 In my 12c Oracle VM Firefox this downloaded to /app/oracle/downloads/ so you'll see the path referenced in the instructions below.
As we will connect from Spark, using the Oracle jdbc driver, to the "orcl" database on TNS port 1521, all these components must be working correctly.
Now on to installing DataStax Enterprise and playing with some data!
Installation instructions for DSE are provided at the top of this doc. I'll show the instructions for Red Hat/CentOS/Fedora here. I'm using an Oracle Enterprise Linux VirtualBox instance.
As root create ```/etc/yum.repos.d/datastax.repo```# vi /etc/yum.repos.d/datastax.repo
Paste in these lines:
[datastax] name = DataStax Repo for DataStax Enterprise baseurl=https://datastaxrepo_gmail.com:[email protected]/enterprise enabled=1 gpgcheck=0
rpm --import http://rpm.datastax.com/rpm/repo_key
# yum install dse-full-5.0.1-1
# yum install opscenter --> 6.0.2.1
# yum install datastax-agent --> 6.0.2.1
We want to use Search (Solr) and Analytics (Spark) so we need to delete the default datacentre and restart the cluster (if its already running) in SearchAnalytics mode.
Stop the service if it's running.
# service dse stop Stopping DSE daemon : dse [ OK ]
Enable Solr and Spark by changing the flag from "0" to "1" in:
# vi /etc/default/dsee.g.:
# Start the node in DSE Search mode SOLR_ENABLED=1 # Start the node in Spark mode SPARK_ENABLED=1
Delete the old (Cassandra-only) datacentre databases if they exist:
# rm -rf /var/lib/cassandra/data/* # rm -rf /var/lib/cassandra/saved_caches/* # rm -rf /var/lib/cassandra/commitlog/* # rm -rf /var/lib/cassandra/hints/*
Remove the old system.log if it exists:
# rm -rf /var/log/cassandra/system.log
Now restart DSE:
$ sudo service DSE restart
After a few minutes use nodetool to check that all is up and running (check for "UN" next to the IP address):
$ nodetool status Datacenter: SearchAnalytics =========================== Status=Up/Down |/ State=Normal/Leaving/Joining/Moving -- Address Load Owns Host ID Token Rack UN 127.0.0.1 346.89 KB ? 8e6fa3db-9018-47f0-96df-8c78067fddaa 6840808785095143619 rack1 Note: Non-system keyspaces don't have the same replication settings, effective ownership information is meaningless
You should also check that you can log into cqlsh:
$ cqlsh Connected to Test Cluster at 127.0.0.1:9042. [cqlsh 5.0.1 | Cassandra 3.0.7.1159 | DSE 5.0.1 | CQL spec 3.4.0 | Native protocol v4] Use HELP for help. cqlsh>
Type exit in cqlsh to return to the shell prompt.
$ dse client-tool spark master-address spark://127.0.0.1:7077OK, let's go get some data.
The dataset is structured with the following fields:
- ID
- Case Number
- Date
- Block
- UCR
- Primary Type
- Description
- Location Description
- Arrest
- Domestic
- Beat
- District
- Ward
- Community Area
- FBI Code
- X Coordinate
- Y Coordinate
- Year
- Updated On
- Latitude
- Longitude
- Location
The records in the data file look like this, with a header record:
ID,Case Number,Date,Block,IUCR,Primary Type,Description,Location Description,Arrest,Domestic,Beat,District,Ward,Community Area,FBI Code,X Coordinate,Y Coordinate,Year,Updated On,Latitude,Longitude,Location 5784095,HN594666,09/17/2007 05:10:00 PM,021XX W WARREN BLVD,0486,BATTERY,DOMESTIC BATTERY SIMPLE,CHA APARTMENT,false,true,1332,012,2,28,08B,1162213,1900335,2007,04/15/2016 08:55:02 AM,41.882182894,-87.679812546,"(41.882182894, -87.679812546)" 5784096,HN594881,09/17/2007 06:35:00 PM,013XX S RACINE AVE,0460,BATTERY,SIMPLE,STREET,false,false,1231,012,2,28,08B,1168579,1894058,2007,04/15/2016 08:55:02 AM,41.864822916,-87.656618405,"(41.864822916, -87.656618405)"
We will have to build a table in Oracle for the first import of the data. When we have the data in Oracle we can assess the volume of space occupied by database tables. We then migrate the data to Cassandra from Oracle using Spark and see how much space is occupied in Cassandra.
Wordcount tells me there are just over 6 million records (about 1.4 GB CSV file):
$ cat Crimes_-_2001_to_present.csv | wc 6156888 69665305 1449194734
We need an owner of the tables we're going to create. We'll create a user called BULK_LOAD. To do this we need to log into Oracle SQLPlus as a privileged user (SYS or SYSTEM) to get to the SQL prompt
Log in to SQLPlus:
$ sqlplus / as sysdba
Connected.
Run this command to allow local User ID's to be created:
SQL> alter session set "_ORACLE_SCRIPT"=true;
Create the user BULK_LOAD and set the password to be the same as the user name:
SQL> create user bulk_load identified by bulk_load;
User created.
Grant everything to BULK_LOAD (the old-fashioned way):
SQL> grant connect, resource, dba to bulk_load;
Grant succeeded.
Set default user and temporary tablespaces:
SQL> alter user bulk_load default tablespace "USERS";
User altered.
SQL> alter user bulk_load temporary tablespace "TEMP";
User altered.
This is how Oracle reads and manages flat files, allowing them to be manipulated as virtual tables. Great for loading data!
SQL> create or replace directory xtern_data_dir as '/app/oracle/downloads';
Directory created.
Grant access to our BULK_LOAD user:
SQL> grant read,write on directory xtern_data_dir to bulk_load;
Grant succeeded.
SQL> connect bulk_load/bulk_load;
Connected.
The data is rather inconsistent in terms of enforcing format and type. I don't want to have to spend a lifetime data-cleansing all 6.1 m records before starting this project so the CSV load into Oracle is in pure text format.
In the first test we will see what happens to data when it is transferred into Oracle and then to DSE/Cassandra, all in text format. This will make an interesting baseline comparison.
The second part of the exercise will be to build a more representative test by storing the data using a wider variety of datatypes, e.g. decimal, binary, date etc. First of all, we need to create the external table that we defined. What this does is provide Oracle with a template to read the external file:
drop table xternal_crime_data;
create table xternal_crime_data
(ID varchar2(30),
Case_Number varchar2(30),
Incident_Date varchar2(30),
Block varchar2(60),
IUCR varchar2(30),
Primary_Type varchar2(60),
Description varchar2(120),
Location_Description varchar2(60),
Arrest varchar2(60),
Domestic varchar2(60),
Beat varchar2(30),
District varchar2(30),
Ward varchar2(30),
Community_Area varchar2(30),
FBI_Code varchar2(10),
X_Coordinate varchar2(30),
Y_Coordinate varchar2(30),
Year varchar2(30),
Updated_On varchar2(30),
Latitude varchar2(30),
Longitude varchar2(30),
Location varchar2(60)
)
organization external
( default directory xtern_data_dir
access parameters
( records delimited by newline
fields terminated by ',' optionally enclosed by '(' and ')' MISSING FIELD VALUES ARE NULL)
location ('Crimes_-_2001_to_present.csv')
) REJECT LIMIT UNLIMITED;
Table created.
We have to do this next step:
SQL> alter table xternal_crime_data reject limit unlimited;
To avoid this error:
SQL> select count(*) from xternal_crime_data;
select count(*) from xternal_crime_data
*
ERROR at line 1:
ORA-29913: error in executing ODCIEXTTABLEFETCH callout
ORA-30653: reject limit reached
If we describe the 'table' it matches the format of the physical file it's based on:
SQL> desc xternal_crime_data
Name Null? Type
----------------------------------------- -------- ----------------------------
ID VARCHAR2(30)
CASE_NUMBER VARCHAR2(30)
INCIDENT_DATE VARCHAR2(30)
BLOCK VARCHAR2(60)
IUCR VARCHAR2(30)
PRIMARY_TYPE VARCHAR2(60)
DESCRIPTION VARCHAR2(120)
LOCATION_DESCRIPTION VARCHAR2(60)
ARREST VARCHAR2(60)
DOMESTIC VARCHAR2(60)
BEAT VARCHAR2(30)
DISTRICT VARCHAR2(30)
WARD VARCHAR2(30)
COMMUNITY_AREA VARCHAR2(30)
FBI_CODE VARCHAR2(10)
X_COORDINATE VARCHAR2(30)
Y_COORDINATE VARCHAR2(30)
YEAR VARCHAR2(30)
UPDATED_ON VARCHAR2(30)
LATITUDE VARCHAR2(30)
LONGITUDE VARCHAR2(30)
LOCATION VARCHAR2(60)
So far Oracle hasn't actually looked at the data yet. When we try to read the table the records will be read and validated.
Check how many records there are in the original (source) external 'table' - just over 6 million:
SQL> select count(*) from xternal_crime_data;
COUNT(*)
----------
6156888
You can watch for errors or bad records while the data is being imported using tail or less on these files - log output will be to the default directory that was specified in the create external table statement:
$ rm /app/oracle/downloads/*.bad $ rm /app/oracle/downloads/*.log
Nothing happens until you attempt to read data from the table. You can track progress by using tail -f or less to view the log and bad files.
Now we're going to create a real table in the Oracle database and copy the data in from the external file via the external table definition. We can create a physical table in the Oracle database using the "...as select" SQL syntax e.g.:SQL> create table crimes as select * from xternal_crime_data;
We now have the data in the crimes tables.
The header record is in the table and we don't want it there. Delete it (where ID is not numeric) with the following statement:
SQL> delete from crimes where length(trim(translate(ID, ' +-.0123456789', ' '))) is not null;
Let's put a primary key and index on that table:
SQL> create index crime_id on crimes(id);
Index created.
SQL> alter table crimes add constraint crimes_pk primary key(id);
Table altered.
We now have an index on our primary key which (showing as a "NOT NULL" column). In the real-world, we would probably make Crime ID a unique index.
SQL> desc crimes
Name Null? Type
--------------------- -------- ----------------------------
ID NOT NULL VARCHAR2(30)
CASE_NUMBER VARCHAR2(30)
INCIDENT_DATE VARCHAR2(30)
BLOCK VARCHAR2(60)
IUCR VARCHAR2(30)
PRIMARY_TYPE VARCHAR2(60)
DESCRIPTION VARCHAR2(120)
LOCATION_DESCRIPTION VARCHAR2(60)
ARREST VARCHAR2(60)
DOMESTIC VARCHAR2(60)
BEAT VARCHAR2(30)
DISTRICT VARCHAR2(30)
WARD VARCHAR2(30)
COMMUNITY_AREA VARCHAR2(30)
FBI_CODE VARCHAR2(10)
X_COORDINATE VARCHAR2(30)
Y_COORDINATE VARCHAR2(30)
YEAR VARCHAR2(30)
UPDATED_ON VARCHAR2(30)
LATITUDE VARCHAR2(30)
LONGITUDE VARCHAR2(30)
LOCATION VARCHAR2(60)
At this point we have loaded the CSV source data (1.6 GB, 6.1 million rows) into a table in Oracle. We need to find out - how big is it? What objects does bulk_load own - the system dictionary table DBA_TABLES will tell us:
SQL> connect / as sysdba
SQL> select table_name from dba_tables where owner = 'BULK_LOAD';
TABLE_NAME
--------------------------------------------------------------------------------
CRIMES
XTERNAL_CRIME_DATA
Run this next query to determine total storage for a user including indexes, blobs etc. You will be prompted for the username of the Oracle user that you want to check, in uppercase - in this case it's BULK_LOAD:
COLUMN TABLE_NAME FORMAT A32
COLUMN OBJECT_NAME FORMAT A32
COLUMN OWNER FORMAT A10
SELECT
owner,
table_name,
TRUNC(sum(bytes)/1024/1024) Meg,
ROUND( ratio_to_report( sum(bytes) ) over () * 100) Percent
FROM
(SELECT segment_name table_name, owner, bytes
FROM dba_segments
WHERE segment_type IN ('TABLE', 'TABLE PARTITION', 'TABLE SUBPARTITION')
UNION ALL
SELECT i.table_name, i.owner, s.bytes
FROM dba_indexes i, dba_segments s
WHERE s.segment_name = i.index_name
AND s.owner = i.owner
AND s.segment_type IN ('INDEX', 'INDEX PARTITION', 'INDEX SUBPARTITION')
UNION ALL
SELECT l.table_name, l.owner, s.bytes
FROM dba_lobs l, dba_segments s
WHERE s.segment_name = l.segment_name
AND s.owner = l.owner
AND s.segment_type IN ('LOBSEGMENT', 'LOB PARTITION')
UNION ALL
SELECT l.table_name, l.owner, s.bytes
FROM dba_lobs l, dba_segments s
WHERE s.segment_name = l.index_name
AND s.owner = l.owner
AND s.segment_type = 'LOBINDEX')
WHERE owner in UPPER('&owner')
GROUP BY table_name, owner
HAVING SUM(bytes)/1024/1024 > 10 /* Ignore really small tables */
ORDER BY SUM(bytes) desc
;
OWNER TABLE_NAME MEG PERCENT ---------- -------------------------------- ---------- ---------- BULK_LOAD CRIMES 1672 100
So a 1.4GB CSV file has become 1.7GB of storage in Oracle.
So all is looking good on the Oracle side. Now time to turn to Cassandra and Spark. We're going to use Spark to read the data from the Oracle table via JDBC using the Oracle JDBC driver.
We have to add the Oracle JDBC jar file to our Spark classpath so that Spark knows how to talk to the Oracle database.For this test we only need the jdbc driver file on our single (SparkMaster) node. In a bigger cluster we would need to distribute it to the slave nodes too. Add the classpath for the ojdbc7.jar file for the executors (the path for the driver seems be required on the command line at run time as well, see below).
# vi /etc/dse/spark/spark-defaults.confAdd the following lines pointing to the location of your ojdbc7.jar file:
spark.driver.extraClassPath = /app/oracle/downloads/ojdbc7.jarr spark.executor.extraClassPath = /app/oracle/downloads/ojdbc7.jar
$ sudo service dse stop $ sudo service dse startBefore we start pulling data out of Oracle we need somewhere to put it. So we need to create a table in Cassandra.
The first thing that we need to do in Cassandra is create a keyspace to contain the table that will hold the data. I'm using a replication factor of 1 for my data because I have just one node in my development cluster. For most production deployments we recommend a multi-datacenter Active-Active HA setup, across geographical regions where necessary, using NetworkTopologyStrategy with RF=3:
We create the Cassandra table in the CQL shell. Log into cqlsh
If you didn't change the IP defaults in cassandra.yaml then just type 'cqlsh' - if you changed the IP to be the host IP then you may need to supply the hostname e.g. 'cqlsh [hostname]'.
From the cqlsh prompt, create the keyspace:
CREATE KEYSPACE IF NOT EXISTS bulk_load WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 }; USE bulk_load;
The source data had some inconsistencies that made it simpler to load all the data into Oracle as text columns. To make the comparison accurate we'll store the data in Cassandra in text format too.
We'll create a Cassandra table that matches the Oracle columns, with a partition key based on the crime ID. We're going to turn compression off in this table, again in the interests of making it a valid comparison with Oracle.
Compression is on by default in Cassandra 3.0 - however the massive storage engine improvements for Cassandra 3.0 means that even uncompressed data is still significantly smaller than compressed data in Cassandra 2.x. The compression overhead is not an issue but it obviously helps if you don't need compression. Consequently the general recommendation is to disable compression in Cassandra 3.x deployments.
In cqlsh create the crimes table:
cqlsh:bulk_load> drop table if exists crimes;
create table crimes (
id text,
case_number text,
incident_date text,
block text,
iucr text,
primary_type text,
description text,
location_description text,
arrest text,
domestic text,
beat text,
district text,
ward text,
community_area text,
fbi_code text,
x_coordinate text,
y_coordinate text,
year text,
updated_on text,
latitude text,
longitude text,
location text,
PRIMARY KEY (id))
WITH compression = { 'sstable_compression' : '' };
You can check it's all there in Cassandra:
cqlsh:bulk_load> desc table crimes;
CREATE TABLE bulk_load.crimes (
id text PRIMARY KEY,
arrest text,
beat text,
block text,
case_number text,
community_area text,
description text,
district text,
domestic text,
fbi_code text,
incident_date text,
iucr text,
latitude text,
location text,
location_description text,
longitude text,
primary_type text,
updated_on text,
ward text,
x_coordinate text,
y_coordinate text,
year text
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'enabled': 'false'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
Note that there are no settings for compression shown for this table - the data will not be compressed.
No we can go back to Spark to start moving the data to our new Cassandra table...
We'll use the Spark REPL (shell) to move the data from Oracle to Cassandra. We can use Spark Dataframes to read from Oracle via JDBC and then write back out to Cassandra. The beauty of processing with Spark is that you can do so much with the data while it's in flight - perfect for migrating and de-normalising relational schemas.I'm passing to the path to the ojdbc7.jar file on the command line (shouldn't be needed as the driver path is defined in the spark-defaults.conf file now, but it seems not to work without it).
$ dse spark --driver-class-path /app/oracle/downloads/ojdbc7.jar -deprecation Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.6.1 /_/ Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_77) Type in expressions to have them evaluated. Type :help for more information. Initializing SparkContext with MASTER: spark://127.0.0.1:7077 Created spark context.. Spark context available as sc. Hive context available as sqlContext. Will be initialized on first use. scala>
Now import some classes we might need:
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}
import java.io._
For Spark versions below 1.4:
scala> val crimes = sqlContext.load("jdbc", Map("url" -> "jdbc:oracle:thin:bulk_load/bulk_load@localhost:1521/orcl", "dbtable" -> "crime_data"))
scala> val crimes = sqlContext.read.format("jdbc").option("url", "jdbc:oracle:thin:bulk_load/bulk_load@localhost:1521/orcl").option("driver", "oracle.jdbc.OracleDriver").option("dbtable", "crimes").load()
crimes: org.apache.spark.sql.DataFrame = [ID: string, CASE_NUMBER: string, INCIDENT_DATE: string, BLOCK: string, IUCR: string, PRIMARY_TYPE: string, DESCRIPTION: string, LOCATION_DESCRIPTION: string, ARREST: string, DOMESTIC: string, BEAT: string, DISTRICT: string, WARD: string, COMMUNITY_AREA: string, FBI_CODE: string, X_COORDINATE: string, Y_COORDINATE: string, YEAR: string, UPDATED_ON: string, LATITUDE: string, LONGITUDE: string, LOCATION: string]
There are some options available that allow you to tune how Spark uses the JDBC driver. The JDBC datasource supports partitionning so that you can specify how Spark will parallelize the load operation from the JDBC source. By default a JDBC load will be sequential (e.g. 1 partition, 1 worker) which is much less efficient where multiple workers are available.
The options describe how to partition the table when reading in parallel from multiple workers:
- partitionColumn
- lowerBound
- upperBound
- numPartitions.
These options must all be specified if any of them is specified.
- partitionColumn must be a numeric column from the table in question used to partition the table.
- Notice that lowerBound and upperBound are just used to decide the partition stride, not for filtering the rows in table. So all rows in the table will be partitioned and returned.
- fetchSize is the JDBC fetch size, which determines how many rows to fetch per round trip. This can help performance on JDBC drivers which default to low fetch size (eg. Oracle with 10 rows).
An example load command using those options might look like this (Note: if you partition by column then the partitioning column must be a number columne in the source table):
scala> val departments = sqlContext.read.format("jdbc")
.option("url", "jdbc:oracle:thin:br/hr@localhost:1521/orcl")
.option("driver", "oracle.jdbc.OracleDriver")
.option("dbtable", "departments")
.option("partitionColumn", "DEPARTMENT_ID")
.option("lowerBound", "1")
.option("upperBound", "10000")
.option("numPartitions", "4")
.option("fetchsize", "1000")
.load()
Don’t create too many partitions in parallel on a large cluster, otherwise Spark might crash the external database.
In our case we can't partition because the ID collumn is a text column but I can set the fetchsize:
val crimes = sqlContext.read.format("jdbc")
.option("url", "jdbc:oracle:thin:bulk_load/bulk_load@localhost:1521/orcl")
.option("driver", "oracle.jdbc.OracleDriver")
.option("dbtable", "crimes")
.option("fetchsize", "1000")
.load()
You should experiment with these options when you load your data on your infrastructure. You can read more about this here: http://spark.apache.org/docs/latest/sql-programming-guide.html#jdbc-to-other-databases
At this point the JDBC statement has been validated but Spark hasn't yet checked the physical data (e.g. if you provide an invalid partitioning column you won't get an error message until you try to read the data).
Now that we've created a dataframe using the jdbc method shown above, we can use the dataframe method printSchema() to look at the dataframe schema. You'll notice that it looks a lot like a table. That's great because it means that we can use it to manipulate large volumes of tabular data:
scala> crimes.printSchema()
root
|-- ID: string (nullable = false)
|-- CASE_NUMBER: string (nullable = true)
|-- INCIDENT_DATE: string (nullable = true)
|-- BLOCK: string (nullable = true)
|-- IUCR: string (nullable = true)
|-- PRIMARY_TYPE: string (nullable = true)
|-- DESCRIPTION: string (nullable = true)
|-- LOCATION_DESCRIPTION: string (nullable = true)
|-- ARREST: string (nullable = true)
|-- DOMESTIC: string (nullable = true)
|-- BEAT: string (nullable = true)
|-- DISTRICT: string (nullable = true)
|-- WARD: string (nullable = true)
|-- COMMUNITY_AREA: string (nullable = true)
|-- FBI_CODE: string (nullable = true)
|-- X_COORDINATE: string (nullable = true)
|-- Y_COORDINATE: string (nullable = true)
|-- YEAR: string (nullable = true)
|-- UPDATED_ON: string (nullable = true)
|-- LATITUDE: string (nullable = true)
|-- LONGITUDE: string (nullable = true)
|-- LOCATION: string (nullable = true)
We can use the dataframe .show() method to display the first x:int rows in the table:
scala> crimes.show(5)
+-------+-----------+--------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------+
| ID|CASE_NUMBER| INCIDENT_DATE| BLOCK|IUCR| PRIMARY_TYPE| DESCRIPTION|LOCATION_DESCRIPTION|ARREST|DOMESTIC|BEAT|DISTRICT|WARD|COMMUNITY_AREA|FBI_CODE|X_COORDINATE|Y_COORDINATE|YEAR| UPDATED_ON| LATITUDE| LONGITUDE| LOCATION|
+-------+-----------+--------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------+
| ID|Case Number| Date| Block|IUCR| Primary Type| Description|Location Description|Arrest|Domestic|Beat|District|Ward|Community Area|FBI Code|X Coordinate|Y Coordinate|Year| Updated On| Latitude| Longitude| Location|
|3666345| HK764692|11/18/2004 12:00:...| 118XX S LOWE AVE|0890| THEFT| FROM BUILDING| RESIDENCE| false| false|0524| 005| 34| 53| 06| 1174110| 1826325|2004|04/15/2016 08:55:...| 41.67883435|-87.638323571| "(41.67883435|
|3666346| HK757211|11/17/2004 08:00:...|014XX N MASSASOIT...|0560| ASSAULT| SIMPLE| STREET| false| false|2531| 025| 29| 25| 08A| 1137744| 1909068|2004|04/15/2016 08:55:...|41.906623511|-87.769453017|"(41.906623511|
|3666347| HK764653|11/21/2004 10:00:...| 130XX S DREXEL AVE|0910|MOTOR VEHICLE THEFT| AUTOMOBILE|CHA PARKING LOT/G...| false| false|0533| 005| 9| 54| 07| 1184304| 1818785|2004|04/15/2016 08:55:...|41.657911807|-87.601244288|"(41.657911807|
|3666348| HK761590|11/19/2004 07:45:...|063XX S HAMILTON AVE|0430| BATTERY|AGGRAVATED: OTHER...| SIDEWALK| false| false|0726| 007| 15| 67| 04B| 1163120| 1862567|2004|04/15/2016 08:55:...|41.778524374|-87.677540732|"(41.778524374|
+-------+-----------+--------------------+--------------------+----+-------------------+--------------------+--------------------+------+--------+----+--------+----+--------------+--------+------------+------------+----+--------------------+------------+-------------+--------------+
only showing top 5 rows
Before we can move the data to Cassandra we have to work around a 'feature' of the spark-cassandra connector. If you try to save your dataframe to Cassandra now it will fail with an error message saying that none of the columns exist e.g.
java.util.NoSuchElementException: Columns not found in table bulk_load.crimes: ID, CASE_NUMBER, INCIDENT_DATE
It does this even though they do exist in both tables. The problem is that the connector expects the column title text case in the dataframe to match the column title text case in the Cassandra table. In Cassandra, column names/titles are always in lower case, so the dataframe names must match.
So we need to modify our dataframe schema....
DataFrames are immutable in Spark, so we can't modify the one we have - we need to create a new dataframe with the correct column titles.
scala> val newNames = Seq("id", "case_number", "incident_date", "block","iucr","primary_type","description","location_description","arrest","domestic","beat","district","ward","community_area","fbi_code","x_coordinate","y_coordinate","year","updated_on","latitude","longitude", "location");
Response:
newNames: Seq[String] = List(id, case_number, incident_date, block, iucr, primary_type, description, location_description, arrest, domestic, beat, district, ward, community_area, fbi_code, x_coordinate, y_coordinate, year, updated_on, latitude, longitude, location)
scala> val crimes_lc = crimes.toDF(newNames: _*)
Response:
crimes_lc: org.apache.spark.sql.DataFrame = [id: string, case_number: string, incident_date: string, block: string, iucr: string, primary_type: string, description: string, location_description: string, arrest: string, domestic: string, beat: string, district: string, ward: string, community_area: string, fbi_code: string, x_coordinate: string, y_coordinate: string, year: string, updated_on: string, latitude: string, longitude: string, location: string]
And now if we look at the schema of our new dataframe - hey presto! - it's got lower case column titles.
scala> crimes_lc.printSchema()
root
|-- id: string (nullable = false)
|-- case_number: string (nullable = true)
|-- incident_date: string (nullable = true)
|-- block: string (nullable = true)
|-- iucr: string (nullable = true)
|-- primary_type: string (nullable = true)
|-- description: string (nullable = true)
|-- location_description: string (nullable = true)
|-- arrest: string (nullable = true)
|-- domestic: string (nullable = true)
|-- beat: string (nullable = true)
|-- district: string (nullable = true)
|-- ward: string (nullable = true)
|-- community_area: string (nullable = true)
|-- fbi_code: string (nullable = true)
|-- x_coordinate: string (nullable = true)
|-- y_coordinate: string (nullable = true)
|-- year: string (nullable = true)
|-- updated_on: string (nullable = true)
|-- latitude: string (nullable = true)
|-- longitude: string (nullable = true)
|-- location: string (nullable = true)
Now we can proceed to write this data to Cassandra.
Save our new dataframe CRIMES_LC to Cassandra;scala> crimes_lc.write.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "crimes", "keyspace" -> "bulk_load")).save()
scala>
This will take a while to complete - 32 minutes in my case. While it's in progress you may be able to check and see records appearing in the table although when it was in full swing I did get time-out errors when trying to query the table in cqlsh :[
On my single-node VM running Oracle 12c, DSE/Cassandra and DSE/Spark I was averaging about 3000 Oracle->Spark->Cassandra rows written per second, so the import of 6.1m rows finished close to the anticipated time of 33 minutes. YMMV
cqlsh:bulk_load> select count(*) from crimes;
count
--------
120428
(1 rows)
You can track progress from your Spark Master UI - this should be at http://[your-host-name]:7080/
At the end of the run you can query the data in the Cassandra CRIMES tables by partition key:
cqlsh:bulk_load> select * from crimes where id='3666345'; id | arrest | beat | block | case_number | community_area | description | district | domestic | fbi_code | incident_date | iucr | latitude | location | location_description | longitude | primary_type | updated_on | ward | x_coordinate | y_coordinate | year ---------+--------+------+------------------+-------------+----------------+---------------+----------+----------+----------+------------------------+------+-------------+---------------+----------------------+---------------+--------------+------------------------+------+--------------+--------------+------ 3666345 | false | 0524 | 118XX S LOWE AVE | HK764692 | 53 | FROM BUILDING | 005 | false | 06 | 11/18/2004 12:00:00 PM | 0890 | 41.67883435 | "(41.67883435 | RESIDENCE | -87.638323571 | THEFT | 04/15/2016 08:55:02 AM | 34 | 1174110 | 1826325 | 2004
At the end of the run I used nodetool to tell me how much space had been occupied in Cassandra:
$ nodetool tablestats bulk_load Keyspace: bulk_load Read Count: 0 Read Latency: NaN ms. Write Count: 6156888 Write Latency: 0.027655199509882267 ms. Pending Flushes: 0 Table: crimes SSTable count: 6 Space used (live): 1704215924 Space used (total): 1704215924 Space used by snapshots (total): 0 Off heap memory used (total): 8543995 SSTable Compression Ratio: 0.0 Number of keys (estimate): 6111179 Memtable cell count: 14452 Memtable data size: 7936758 Memtable off heap memory used: 0We now have the sizes of the data in each of the three formats:
- CSV (text) 1.4 GB
- Oracle (as text) 1.7 GB
- Cassandra (as text) 1.7 GB - uncompressed
- Cassandra (as text) 876 MB - compressed
In the case of compressed data, stored with a real-world Cassandra data replication factor of 3, the volume of data in a multi-node Cassandra datacenter would be 876 MB x 3 = 2,700 MB. Assuming the customer has fairly fast machines with fast, locally attached SSD storage, each DSE/Cassandra node might be expected to store 1 TB, so an initial cluster size of 3-5 machines would be recommended.
In a use case with 1.7 GB of uncompressed data the total volume when replicated 3 times would be 5.1 GB, suggesting a cluster size of minimum 5 nodes.
Remember that this is just one test, with just one table. In a "real-world" situation there may potentially be only a few tables, or a large number of tables. This data will almost certainly need to be re-modelled, not just copied, when it moves from an Oracle relational model to a distributed, replicated NoSQL database model. The re-modelling, or transformation, usually involves an element of data de-normalisation and duplication in order to optimise query efficiency, and in some cases this may have a significant impact on the volume of storage and the consequent number of DSE/Cassandra nodes that are required to accommodate the same volume of data in DSE/Cassandra.
-rw-rw-r-- 1 oracle oracle 22646800 Sep 14 09:58 On_Time_On_Time_Performance_2016_1.zip -rw-r--r-- 1 oracle oracle 200290882 Mar 14 2016 On_Time_On_Time_Performance_2016_1.csv