Using Pig to Bulk load data into Cassandra CQL3 tables

Cassandra is a distributed Key Value Store maintained by Apache Software Foundation.  http://cassandra.apache.org/
Pig is a Map/Reduce Scripting language maintained by Apache Software Foundation.  Pig takes scripts written in PigLatin and converts them to Hadoop Jobs. http://pig.apache.org/
As of the release of Cassandra 1.2.8, you can now read/LOAD and write/STORE with PIG to Cassandra CQL3 Tables.  It has been possible to use the native Cassandra structure, but this was limiting, if you wanted to use CQL3 anywhere in your environment.

Note there is a change the the UPDATE CQL Text, when you move from Cassandra 1.2.8 to 1.2.9 or 2.0.0

 

Setup
Install Cassandra, Hadoop and Pig.
Add the following to your .profile file

export PIG_HOME=<PATH TO PIG INSTALL>
export PIG_INITIAL_ADDRESS=localhost
export PIG_RPC_PORT=9160

# the partitioner must match your cassandra partitioner
export PIG_PARTITIONER=org.apache.cassandra.dht.Murmur3Partitioner

 

Create Test Schema in Cassandra

With Cassandra Running,

  1. launch cqlsh
  2. Create a schema
  3. Create a table in that schema

$ <CASSANDRA_HOME>/bin/cqlsh -3
$
CREATE SCHEMA myschema
WITH replication = {
‘class’ : ‘SimpleStrategy’,
‘replication_factor’ : 1
};

use myschema;
CREATE TABLE example ( row_id text PRIMARY KEY, value1 text, value2 int );
exit;
In <CASSANDRA_HOME>, go to examples/pig
$ cd <CASSANDRA_HOME>/examples/pig
$ #### You may need to set the execute bit on bin/pig_cassandra
$ chmod +x bin/pig_cassandra

Create a sample data file:

################ example.csv ######################
da1fdd20-0028-11e3-b778-0800200c9a66,popree,8154968
da1fdd21-0028-11e3-b778-0800200c9a66,adee,2049834
da1fdd22-0028-11e3-b778-0800200c9a66,aaree,248734
da1fdd23-0028-11e3-b778-0800200c9a66,fme,098687
da1fdd24-0028-11e3-b778-0800200c9a66,fxee,2099876
da1fdd25-0028-11e3-b778-0800200c9a66,ree,239209
da1fdd26-0028-11e3-b778-0800200c9a66,qwee,934190
da1fdd27-0028-11e3-b778-0800200c9a66,frue,1905
da1fdd28-0028-11e3-b778-0800200c9a66,exee,8767
da1fdd29-0028-11e3-b778-0800200c9a66,nmee,1235
da1fdd2a-0028-11e3-b778-0800200c9a66,brrjee,26657654
da1fdd2b-0028-11e3-b778-0800200c9a66,rree,9025987
da1fdd2c-0028-11e3-b778-0800200c9a66,orrxee,98675987
da1fdd2d-0028-11e3-b778-0800200c9a66,oree,88705
da1fdd2e-0028-11e3-b778-0800200c9a66,grrmee,96557886
################ example.csv ######################

Create a new Pig Script: cassandra_write.pig

################ cassandra_write.pig ####################
data = LOAD ‘example.csv’ using
PigStorage(‘,’) AS
(row_id: chararray, value1: chararray, value2: int);

data_to_insert =
FOREACH data GENERATE
TOTUPLE(
TOTUPLE(‘row_id’,row_id)
),
TOTUPLE(value1, value2) ;

— dump data_to_insert;
— Example Output Schema
–(((row_id,eedaf059-8bac-42c7-af92-4bf6f4bb7945)),(free,81549682))
–(((row_id,b6660321-9c17-41d8-b795-5abc82472df1)),(free,2049834))

— 1.2.8 UPDATE CMD STORE data_to_insert INTO
‘cql://myschema/example?output_query=update example set value1 @ #,value2 @ #’ USING CqlStorage();

–Corrected for 2.0

STORE data_to_insert INTO ‘cql://myschema/example?output_query=update example set value1 %3D%3F,value2 %3D%3F’ USING CqlStorage();

 

################ cassandra_write.pig ####################

NOTE: When you upgrade to Cassandra 2.0, the you now us URL Encoded parameters for the UPDATE Statement.

value = ? –> value%3D%3F
If you are familiar with PigLatin most of this code should make sense.  The important information is when you are creating the data_to_insert set.
The Schema CqlStorage Expects is:
((name,value),(name,value),(value,value…value))
In english this means:
((PrimaryKeyOne,value),(PrimaryKeyTwo,value),
(BindValueOne,BindValueTwo,…,BindValueN))

If you need additional Primary Key values, you can add them after row_id, for example:

TOTUPLE(
TOTUPLE(‘row_id’,row_id)
,TOTUPLE(‘second_row_id’,second_row_id)
),

These get converted in the WHERE clause which is automatically appended to your output_query.
The last set of values is your Bind Values.  These are used to replace the #’s in your output_query statement.

“update example set value1 @ #,value2 @ #”
@ → = @ become =
#→ ? # become ?, so this now reads,

“update example set value1 = ?,value2 = ?”
And the bind values are replaced in order.
Putting it all together the final CQL Statement would be:

update example
set
value1 = ‘free’,
value2 = ‘81549682’
WHERE
row_id=’eedaf059-8bac-42c7-af92-4bf6f4bb7945’;

You can then run this by executing:
$ bin/pig_cassandra -x local cassandra_write.pig
To verify your script worked, start cql again.

$ <CASSANDRA_HOME>/bin/cqlsh -3 -k myschema

SELECT * FROM example;
row_id | value1 | value2
————————————–+——–+———-
971a1b95-05ba-45a3-944f-011c5945e917 | fyee | 934190
4cfcf8f5-bf6a-405c-b5a2-0a46b46bce58 | free | 88705
9c41e975-e29f-41d0-a093-ed4068e11854 | free | 81549682
743930c4-8bbf-483c-8628-317583321202 | frrjee | 26657654
778e8303-58fc-47cf-81f7-03027ddece17 | frrmee | 96557886
64bb214c-1e29-4ccd-be97-638e9fa78f4e | ftee | 2099876
97430a02-2a61-40dc-853a-8603ef5ff1b8 | fee | 98687
1a526310-5e3a-4417-9e11-64f86aab7bce | free | 9025987
3ff4d9e0-5e40-4f68-bc11-e0ba1a0b52e0 | free | 239209
22109870-78a2-49ca-8bda-9c1845042e12 | free | 248734
c2902b7f-b454-455a-90cf-5f1b33e700c2 | free | 1235
52f6605a-1fc1-4f73-ad36-70d10a3ad12f | free | 2049834
d494adb1-ac53-40a5-a59f-944669d6b29a | fxee | 8767
f31281b3-7e3f-4f1a-b238-ae0109822aeb | frrxee | 98675987
68950590-9ec1-4cfb-80b8-2a2c6e2496b0 | free | 1905

  • Jimmy Schappet

    Updating page to use Cassandra 2.0

  • Joyabrata Das

    Hi Jimmy,

    I’m very new to Cassandra, Hadoop.

    I’ve installed apache-cassandra-1.2.8-bin.tar.gz on Ubuntu 4 node cluster which is also running hadoop-0.20.2-cdh3u4.tar.gz.

    However, the examples/pig directory couldn’t be found under CASSANDRA_HOME, please refer below output.

    ~/apache-cassandra-1.2.8$ ls -lrt
    total 256
    drwxr-x— 4 apacas apache 4096 Jul 28 2013 tools
    -rwxr-x— 1 apacas apache 3569 Jul 28 2013 README.txt
    -rwxr-x— 1 apacas apache 1820 Jul 28 2013 NOTICE.txt
    -rwxr-x— 1 apacas apache 48724 Jul 28 2013 NEWS.txt
    -rwxr-x— 1 apacas apache 11609 Jul 28 2013 LICENSE.txt
    -rwxr-x— 1 apacas apache 160590 Jul 28 2013 CHANGES.txt
    drwxr-x— 2 apacas apache 4096 Mar 4 14:28 interface
    drwxr-x— 4 apacas apache 4096 Mar 4 14:28 javadoc
    drwxr-x— 3 apacas apache 4096 Mar 4 14:28 pylib
    drwxr-x— 3 apacas apache 4096 Mar 4 14:28 lib
    drwxr-x— 2 apacas apache 4096 Apr 7 14:50 bin
    drwxr-x— 2 apacas apache 4096 May 7 07:35 conf

    ~/apache-cassandra-1.2.8$ find . -name *example*

    ~/apache-cassandra-1.2.8$ find . -name *pig*
    ./javadoc/org/apache/cassandra/hadoop/pig

    ~/apache-cassandra-1.2.8$ ls -lrt ./javadoc/org/apache/cassandra/hadoop/pig
    total 184
    -rwxr-x— 1 apacas apache 5774 Jul 28 2013 package-use.html
    -rwxr-x— 1 apacas apache 5764 Jul 28 2013 package-tree.html
    -rwxr-x— 1 apacas apache 6511 Jul 28 2013 package-summary.html
    -rwxr-x— 1 apacas apache 1335 Jul 28 2013 package-frame.html
    -rwxr-x— 1 apacas apache 30350 Jul 28 2013 CqlStorage.html
    -rwxr-x— 1 apacas apache 35376 Jul 28 2013 CassandraStorage.html
    -rwxr-x— 1 apacas apache 14846 Jul 28 2013 AbstractCassandraStorage.MarshallerType.html
    -rwxr-x— 1 apacas apache 66404 Jul 28 2013 AbstractCassandraStorage.html
    drwxr-x— 2 apacas apache 4096 Mar 4 14:28 class-use

    1] Is it only available with Apache Cassandra source?
    2] Is it also possible to integrate Hive+apache-cassandra-1.2.8 without building from Source?

    Thanks,
    Joy

    • jschappet

      Yes, you need to download the src, and build from that to get PIG and Cassandra integration.

      As for hive, i’m not sure.