Integrating Apache Spark with Apache Cassandra

In this blog, we will see how can we make Apache Spark communicate with Apache Cassandra.

Tools Used:

  • Apache Spark 2.1.1
  • Apache Cassandra 2.2.10


Part 1: Adding some data in Cassandra

Connect to cqlsh and add some sample data as shown below:

cqlsh> CREATE KEYSPACE bigdataclassmumbai WITH replication = {'class': 'SimpleStrategy', 'replication_factor': 1};
cqlsh> use bigdataclassmumbai ;
cqlsh:bigdataclassmumbai> create TABLE emp
... (eid int,
... ename text,
... esal int,
... PRIMARY KEY(eid));
cqlsh:bigdataclassmumbai> INSERT INTO emp (eid , ename , esal ) VALUES ( 1,'Prashant',1000);
cqlsh:bigdataclassmumbai> INSERT INTO emp (eid , ename , esal ) VALUES ( 2,'Utkarsha',2000);
cqlsh:bigdataclassmumbai> select * from emp;

eid | ename    | esal
1 | Prashant | 1000
2 | Utkarsha | 2000

(2 rows)


Part 2: Initializing Spark shell to interact with Cassandra

Start your spark-shell using the following command

bin/spark-shell --packages com.datastax.spark:spark-cassandra-connector_2.11:2.0.0

The above command will ensure it will download all the dependencies required for Apache Spark to connect to Apache Cassandra.  This will take some time depending on the internet speed. Once your spark shell is initialized, perform the following script to get the data from Cassandra

Reading data from Cassandra

scala> import com.datastax.spark.connector._
import com.datastax.spark.connector._

Lets load the data from cassandra table bigdataclassmumbai.emp

scala> val cassConnectRDD = sc.cassandraTable("bigdataclassmumbai","emp")

cassConnectRDD: com.datastax.spark.connector.rdd.CassandraTableScanRDD[com.datastax.spark.connector.CassandraRow] = CassandraTableScanRDD[0] at RDD at CassandraRDD.scala:19

Lets fetch the first record

scala> cassConnectRDD.first

17/09/10 22:24:12 WARN NettyUtil: Found Netty's native epoll transport, but not running on linux-based operating system. Using NIO instead.

res0: com.datastax.spark.connector.CassandraRow = CassandraRow{eid: 1, ename: Prashant, esal: 1000}


Loading the data in a DataFrame

Let’s load the data in a data frame !!!

scala> val empDF ="org.apache.spark.sql.cassandra").options(Map("keyspace"->"bigdataclassmumbai" , "table"->"emp")).load

empDF: org.apache.spark.sql.DataFrame = [eid: int, ename: string ... 1 more field]

Let’s check the data and the schema


|eid|   ename|esal|
|  1|Prashant|1000|
|  2|Utkarsha|2000|

scala> empDF.printSchema

 |-- eid: integer (nullable = true)
 |-- ename: string (nullable = true)
 |-- esal: integer (nullable = true)

Writing Data to Cassandra

Check CQLSH for existing data

cqlsh:bigdataclassmumbai> select * from emp;

eid | ename    | esal
1 | Prashant | 1000
2 | Utkarsha | 2000
(2 rows)

In Spark shell perform the following,

scala> val recordRDD = sc.parallelize(Seq((3,"Arun",1299),(4,"Akhil",2222)))

recordRDD: org.apache.spark.rdd.RDD[(Int, String, Int)] = ParallelCollectionRDD[9] at parallelize at <console>:27

Lets save the data to cassandra

scala> recordRDD.saveToCassandra("bigdataclassmumbai","emp",SomeColumns("eid","ename","esal"))

Verify from CQLSH

cqlsh:bigdataclassmumbai> select * from emp;

 eid | ename    | esal
   1 | Prashant | 1000
   2 | Utkarsha | 2000
   4 |    Akhil | 2222
   3 |     Arun | 1299


Hope you liked this quickstart tutorial. If you want me to cover tutorial on Installing Apache Cassandra, feel free to comment the same !!!!

Prashant Nair

Bigdata Consultant | Author | Corporate Trainer | Technical Reviewer Passionate about new trends and technologies. More Geeky. Contact me for training and consulting !!!

Leave a Reply

Your email address will not be published. Required fields are marked *