Using Pig to Bulk load data into Cassandra CQL3 tables

Cassandra is a distributed Key Value Store maintained by Apache Software Foundation.  http://cassandra.apache.org/
Pig is a Map/Reduce Scripting language maintained by Apache Software Foundation.  Pig takes scripts written in PigLatin and converts them to Hadoop Jobs. http://pig.apache.org/
As of the release of Cassandra 1.2.8, you can now read/LOAD and write/STORE with PIG to Cassandra CQL3 Tables.  It has been possible to use the native Cassandra structure, but this was limiting, if you wanted to use CQL3 anywhere in your environment.

Note there is a change the the UPDATE CQL Text, when you move from Cassandra 1.2.8 to 1.2.9 or 2.0.0

 

Setup
Install Cassandra, Hadoop and Pig.
Add the following to your .profile file

export PIG_HOME=<PATH TO PIG INSTALL>
export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160

# the partitioner must match your cassandra partitioner
export PIG_PARTITIONER=org.apache.cassandra.dht.Murmur3Partitioner

 

Create Test Schema in Cassandra

With Cassandra Running,

  1. launch cqlsh
  2. Create a schema
  3. Create a table in that schema

$ <CASSANDRA_HOME>/bin/cqlsh -3
$
CREATE SCHEMA myschema
WITH replication = {
‘class’ : ‘SimpleStrategy’,
‘replication_factor’ : 1
};

use myschema;
CREATE TABLE example ( row_id text PRIMARY KEY, value1 text, value2 int );
exit;
In <CASSANDRA_HOME>, go to examples/pig
$ cd <CASSANDRA_HOME>/examples/pig
$ #### You may need to set the execute bit on bin/pig_cassandra
$ chmod +x bin/pig_cassandra

Create a sample data file:

################ example.csv ######################
da1fdd20-0028-11e3-b778-0800200c9a66,popree,8154968
da1fdd21-0028-11e3-b778-0800200c9a66,adee,2049834
da1fdd22-0028-11e3-b778-0800200c9a66,aaree,248734
da1fdd23-0028-11e3-b778-0800200c9a66,fme,098687
da1fdd24-0028-11e3-b778-0800200c9a66,fxee,2099876
da1fdd25-0028-11e3-b778-0800200c9a66,ree,239209
da1fdd26-0028-11e3-b778-0800200c9a66,qwee,934190
da1fdd27-0028-11e3-b778-0800200c9a66,frue,1905
da1fdd28-0028-11e3-b778-0800200c9a66,exee,8767
da1fdd29-0028-11e3-b778-0800200c9a66,nmee,1235
da1fdd2a-0028-11e3-b778-0800200c9a66,brrjee,26657654
da1fdd2b-0028-11e3-b778-0800200c9a66,rree,9025987
da1fdd2c-0028-11e3-b778-0800200c9a66,orrxee,98675987
da1fdd2d-0028-11e3-b778-0800200c9a66,oree,88705
da1fdd2e-0028-11e3-b778-0800200c9a66,grrmee,96557886
################ example.csv ######################

Create a new Pig Script: cassandra_write.pig

################ cassandra_write.pig ####################
data = LOAD ‘example.csv’ using
PigStorage(‘,’) AS
(row_id: chararray, value1: chararray, value2: int);

data_to_insert =
FOREACH data GENERATE
TOTUPLE(
TOTUPLE(‘row_id’,row_id)
),
TOTUPLE(value1, value2) ;

— dump data_to_insert;
— Example Output Schema
–(((row_id,eedaf059-8bac-42c7-af92-4bf6f4bb7945)),(free,81549682))
–(((row_id,b6660321-9c17-41d8-b795-5abc82472df1)),(free,2049834))

— 1.2.8 UPDATE CMD STORE data_to_insert INTO
‘cql://myschema/example?output_query=update example set value1 @ #,value2 @ #’ USING CqlStorage();

–Corrected for 2.0

STORE data_to_insert INTO ‘cql://myschema/example?output_query=update example set value1 %3D%3F,value2 %3D%3F’ USING CqlStorage();

 

################ cassandra_write.pig ####################

NOTE: When you upgrade to Cassandra 2.0, the you now us URL Encoded parameters for the UPDATE Statement.

value = ? –> value%3D%3F
If you are familiar with PigLatin most of this code should make sense.  The important information is when you are creating the data_to_insert set.
The Schema CqlStorage Expects is:
((name,value),(name,value),(value,value…value))
In english this means:
((PrimaryKeyOne,value),(PrimaryKeyTwo,value),
(BindValueOne,BindValueTwo,…,BindValueN))

If you need additional Primary Key values, you can add them after row_id, for example:

TOTUPLE(
TOTUPLE(‘row_id’,row_id)
,TOTUPLE(‘second_row_id’,second_row_id)
),

These get converted in the WHERE clause which is automatically appended to your output_query.
The last set of values is your Bind Values.  These are used to replace the #’s in your output_query statement.

“update example set value1 @ #,value2 @ #”
@ → = @ become =
#→ ? # become ?, so this now reads,

“update example set value1 = ?,value2 = ?”
And the bind values are replaced in order.
Putting it all together the final CQL Statement would be:

update example
set
value1 = ‘free’,
value2 = ‘81549682’
WHERE
row_id=’eedaf059-8bac-42c7-af92-4bf6f4bb7945’;

You can then run this by executing:
$ bin/pig_cassandra -x local cassandra_write.pig
To verify your script worked, start cql again.

$ <CASSANDRA_HOME>/bin/cqlsh -3 -k myschema

SELECT * FROM example;
row_id | value1 | value2
————————————–+——–+———-
971a1b95-05ba-45a3-944f-011c5945e917 | fyee | 934190
4cfcf8f5-bf6a-405c-b5a2-0a46b46bce58 | free | 88705
9c41e975-e29f-41d0-a093-ed4068e11854 | free | 81549682
743930c4-8bbf-483c-8628-317583321202 | frrjee | 26657654
778e8303-58fc-47cf-81f7-03027ddece17 | frrmee | 96557886
64bb214c-1e29-4ccd-be97-638e9fa78f4e | ftee | 2099876
97430a02-2a61-40dc-853a-8603ef5ff1b8 | fee | 98687
1a526310-5e3a-4417-9e11-64f86aab7bce | free | 9025987
3ff4d9e0-5e40-4f68-bc11-e0ba1a0b52e0 | free | 239209
22109870-78a2-49ca-8bda-9c1845042e12 | free | 248734
c2902b7f-b454-455a-90cf-5f1b33e700c2 | free | 1235
52f6605a-1fc1-4f73-ad36-70d10a3ad12f | free | 2049834
d494adb1-ac53-40a5-a59f-944669d6b29a | fxee | 8767
f31281b3-7e3f-4f1a-b238-ae0109822aeb | frrxee | 98675987
68950590-9ec1-4cfb-80b8-2a2c6e2496b0 | free | 1905