Integrating Apache Spark with MongoDB

In this tutorial, we will learn how to integrate Apache Spark with MongoDB database. We will be using spark-shell for interacting with MongoDB database and will perform read from MongoDB and write to MongoDB using spark-shell.

Tools Used:

  • Apache Spark 2.1.0
  • MongoDB 3.4.2

Pre-requisites:

I am assuming you have downloaded and extracted Apache Spark and MongoDB. In terms of MongoDB, I am running my mongod daemon at default port and I have created a db named prashant and collection named cars with the document in it. Following commands were used to create db and document

use prashant
db.cars.insert({
"name" : "Hyundai",
"make" : "Santro",
"year" : "2010"
})

In case you want me to write a detailed blog on installing and configuring MongoDB, please drop a comment !!! 🙂

Steps to follow:

Step 1: Launch the Spark Shell with the following command. Assuming your terminal is pointing to the home folder of spark, perform the following command

bin/spark-shell --conf "spark.mongodb.input.uri=mongodb://127.0.0.1:27017/prashant.cars?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1:27017/prashant.cars" --packages org.mongodb.spark:mongo-spark-connector_2.11:2.1.0

The above command will download the spark connector from the mvn repository. Please ensure your internet connection is active while starting the spark-shell. It will also set the MongoDB input  and output URI as prashant.cars where prashant is db name and cars is the collection name. Once download is done, you will get the scala shell prompt.

Step2: Import the Mongo Spark package

scala> import com.mongodb.spark._

Step3: Read the data from MongoDB. Based on the conf parameter set in spark-shell, it will read the data from cars collection which is stored in prashant db.

scala> val readFromMongo = MongoSpark.load(spark)

The above command will create a Dataframe. To see the contents of Dataframe

scala> readFromMongo.show

+--------------------+------+-------+----+
|                 _id|  make|   name|year|
+--------------------+------+-------+----+
|[582de7df3e4060bf...|Santro|Hyundai|2010|
+--------------------+------+-------+----+

 

Step4: To write a document in MongoDB at car collection which is present at prashant db,

Lets first create a JSON collection using the below command

scala> val carsCollection = sc.parallelize(
| """{"name":"Maruti","make":"Alto","year":"2010"}""" ::
| """{"name":"Hyundai","make":"Elantra","year":"2013"}""" ::
| Nil
| )

The expected output of this command is shown below. This collection will hold two JSON documents. Technically its a string.

scala> carsCollection.collect.foreach(println)

{"name":"Maruti","make":"Alto","year":"2010"}
{"name":"Hyundai","make":"Elantra","year":"2013"}

Now to convert the JSON string to JSON documents, we will use the bson package which comes with mongo spark connector,

scala> import org.bson.Document

import org.bson.Document

Now lets provide the JSON strings to parse and convert the same into Document

scala> val bsonDocs = carsCollection.map(data => Document.parse(data))

bsonDocs: org.apache.spark.rdd.RDD[org.bson.Document] = MapPartitionsRDD[27] at map at <console>:30

Lets verify the document contents

scala> bsonDocs.collect.foreach(println)

Document{{name=Maruti, make=Alto, year=2010}}
Document{{name=Hyundai, make=Elantra, year=2013}}

Before we insert the documents in MongoDB using Spark, lets first check the contents of existing collection in MongoDB client shell.

> db.cars.find().pretty()

{
"_id" : ObjectId("582de7df3e4060bf04155997"),
"name" : "Hyundai",
"make" : "Santro",
"year" : "2010"
}

Now to write the documents from Spark to MongoDB perform the following command in spark shell.

scala> MongoSpark.save(bsonDocs)

The above command will write the documents in MongoDB. To check whether write is successful or not, perform the following command in mongo client shell.

> db.cars.find().pretty()

{
"_id" : ObjectId("582de7df3e4060bf04155997"),
"name" : "Hyundai",
"make" : "Santro",
"year" : "2010"
}

{
"_id" : ObjectId("59af0adcf2464e44c7a0a3ca"),
"name" : "Maruti",
"make" : "Alto",
"year" : "2010"
}

{
"_id" : ObjectId("59af0adcf2464e44c7a0a3cb"),
"name" : "Hyundai",
"make" : "Elantra",
"year" : "2013"
}

You can also verify in spark-shell by typing the below command if you are following my read tutorial above,

scala> readFromMongo.show

+--------------------+-------+-------+----+
|                 _id|   make|   name|year|
+--------------------+-------+-------+----+
|[582de7df3e4060bf...| Santro|Hyundai|2010|
|[59af0adcf2464e44...|   Alto| Maruti|2010|
|[59af0adcf2464e44...|Elantra|Hyundai|2013|
+--------------------+-------+-------+----+

 

Now you know how to deal with Apache Spark and MongoDB together. Hope you liked this tutorial.

 

Prashant Nair

Bigdata Consultant | Author | Corporate Trainer | Technical Reviewer Passionate about new trends and technologies. More Geeky. Contact me for training and consulting !!!

5 thoughts on “Integrating Apache Spark with MongoDB

  1. Dear Prashant,

    Can you please help me on below issue while connecting spark with MongoDb getting below exception.

    Exception:
    17/09/19 13:07:38 ERROR Schema: Failed initialising database.
    Failed to start database ‘metastore_db’ with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@161fee, see the next exception for details.
    org.datanucleus.exceptions.NucleusDataStoreException: Failed to start database ‘metastore_db’ with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@161fee, see the next exception for details.
    at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:516)
    at org.datanucleus.store.rdbms.RDBMSStoreManager.(RDBMSStoreManager.java:298)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.datanucleus.plugin.NonManagedPluginRegistry.createExecutableExtension(NonManagedPluginRegistry.java:631)
    at org.datanucleus.plugin.PluginManager.createExecutableExtension(PluginManager.java:301)
    at org.datanucleus.NucleusContext.createStoreManagerForProperties(NucleusContext.java:1187)
    at org.datanucleus.NucleusContext.initialise(NucleusContext.java:356)
    at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.freezeConfiguration(JDOPersistenceManagerFactory.java:775)
    at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.createPersistenceManagerFactory(JDOPersistenceManagerFactory.java:333)
    at org.datanucleus.api.jdo.JDOPersistenceManagerFactory.getPersistenceManagerFactory(JDOPersistenceManagerFactory.java:202)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at javax.jdo.JDOHelper$16.run(JDOHelper.java:1965)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.jdo.JDOHelper.invoke(JDOHelper.java:1960)
    at javax.jdo.JDOHelper.invokeGetPersistenceManagerFactoryOnImplementation(JDOHelper.java:1166)
    at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:808)
    at javax.jdo.JDOHelper.getPersistenceManagerFactory(JDOHelper.java:701)
    at org.apache.hadoop.hive.metastore.ObjectStore.getPMF(ObjectStore.java:365)
    at org.apache.hadoop.hive.metastore.ObjectStore.getPersistenceManager(ObjectStore.java:394)
    at org.apache.hadoop.hive.metastore.ObjectStore.initialize(ObjectStore.java:291)
    at org.apache.hadoop.hive.metastore.ObjectStore.setConf(ObjectStore.java:258)
    at org.apache.hadoop.util.ReflectionUtils.setConf(ReflectionUtils.java:73)
    at org.apache.hadoop.util.ReflectionUtils.newInstance(ReflectionUtils.java:133)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.(RawStoreProxy.java:57)
    at org.apache.hadoop.hive.metastore.RawStoreProxy.getProxy(RawStoreProxy.java:66)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.newRawStore(HiveMetaStore.java:593)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.getMS(HiveMetaStore.java:571)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.createDefaultDB(HiveMetaStore.java:620)
    at org.apache.hadoop.hive.metastore.HiveMetaStore$HMSHandler.init(HiveMetaStore.java:461)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.(RetryingHMSHandler.java:66)
    at org.apache.hadoop.hive.metastore.RetryingHMSHandler.getProxy(RetryingHMSHandler.java:72)
    at org.apache.hadoop.hive.metastore.HiveMetaStore.newRetryingHMSHandler(HiveMetaStore.java:5762)
    at org.apache.hadoop.hive.metastore.HiveMetaStoreClient.(HiveMetaStoreClient.java:199)
    at org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient.(SessionHiveMetaStoreClient.java:74)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.hadoop.hive.metastore.MetaStoreUtils.newInstance(MetaStoreUtils.java:1521)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.(RetryingMetaStoreClient.java:86)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:132)
    at org.apache.hadoop.hive.metastore.RetryingMetaStoreClient.getProxy(RetryingMetaStoreClient.java:104)
    at org.apache.hadoop.hive.ql.metadata.Hive.createMetaStoreClient(Hive.java:3005)
    at org.apache.hadoop.hive.ql.metadata.Hive.getMSC(Hive.java:3024)
    at org.apache.hadoop.hive.ql.metadata.Hive.getAllDatabases(Hive.java:1234)
    at org.apache.hadoop.hive.ql.metadata.Hive.reloadFunctions(Hive.java:174)
    at org.apache.hadoop.hive.ql.metadata.Hive.(Hive.java:166)
    at org.apache.hadoop.hive.ql.session.SessionState.start(SessionState.java:503)
    at org.apache.spark.sql.hive.client.ClientWrapper.(ClientWrapper.scala:204)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    at org.apache.spark.sql.hive.client.IsolatedClientLoader.createClient(IsolatedClientLoader.scala:249)
    at org.apache.spark.sql.hive.HiveContext.metadataHive$lzycompute(HiveContext.scala:327)
    at org.apache.spark.sql.hive.HiveContext.metadataHive(HiveContext.scala:237)
    at org.apache.spark.sql.hive.HiveContext.setConf(HiveContext.scala:441)
    at org.apache.spark.sql.hive.HiveContext.defaultOverrides(HiveContext.scala:226)
    at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:229)
    at org.apache.spark.sql.hive.HiveContext.(HiveContext.scala:101)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:27)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:32)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.(:34)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC$$iwC.(:36)
    at $line15.$read$$iwC$$iwC$$iwC$$iwC.(:38)
    at $line15.$read$$iwC$$iwC$$iwC.(:40)
    at $line15.$read$$iwC$$iwC.(:42)
    at $line15.$read$$iwC.(:44)
    at $line15.$read.(:46)
    at $line15.$read$.(:50)
    at $line15.$read$.()
    at $line15.$eval$.(:7)
    at $line15.$eval$.()
    at $line15.$eval.$print()
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065)
    at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1346)
    at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871)
    at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819)
    at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857)
    at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902)
    at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814)
    at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657)
    at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945)
    at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
    at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945)
    at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059)
    at org.apache.spark.repl.Main$.main(Main.scala:31)
    at org.apache.spark.repl.Main.main(Main.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    Caused by: java.sql.SQLException: Failed to start database ‘metastore_db’ with class loader org.apache.spark.sql.hive.client.IsolatedClientLoader$$anon$1@161fee, see the next exception for details.
    at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
    at org.apache.derby.impl.jdbc.SQLExceptionFactory.getSQLException(Unknown Source)
    at org.apache.derby.impl.jdbc.Util.seeNextException(Unknown Source)
    at org.apache.derby.impl.jdbc.EmbedConnection.bootDatabase(Unknown Source)
    at org.apache.derby.impl.jdbc.EmbedConnection.(Unknown Source)
    at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
    at org.apache.derby.jdbc.InternalDriver$1.run(Unknown Source)
    at java.security.AccessController.doPrivileged(Native Method)
    at org.apache.derby.jdbc.InternalDriver.getNewEmbedConnection(Unknown Source)
    at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
    at org.apache.derby.jdbc.InternalDriver.connect(Unknown Source)
    at org.apache.derby.jdbc.AutoloadedDriver.connect(Unknown Source)
    at java.sql.DriverManager.getConnection(DriverManager.java:664)
    at java.sql.DriverManager.getConnection(DriverManager.java:208)
    at org.apache.commons.dbcp.DriverManagerConnectionFactory.createConnection(DriverManagerConnectionFactory.java:78)
    at org.apache.commons.dbcp.PoolableConnectionFactory.makeObject(PoolableConnectionFactory.java:582)
    at org.apache.commons.pool.impl.GenericObjectPool.borrowObject(GenericObjectPool.java:1148)
    at org.apache.commons.dbcp.PoolingDataSource.getConnection(PoolingDataSource.java:106)
    at org.datanucleus.store.rdbms.ConnectionFactoryImpl$ManagedConnectionImpl.getConnection(ConnectionFactoryImpl.java:501)
    … 111 more

    1. I guess in your system, you have installed Apache Hive. If that is the case, ensure you copy hive-site.xml in the conf folder of apache spark . While starting Spark shell, ensure you add the MySQL jar file (Assuming MySQL is the Hive’s metastore). The below command is an example of using –jar for linking mySQL jar file

      bin/spark-shell –jars mysql-connector-jar-file –conf “spark.mongodb.input.uri=mongodb://127.0.0.1:27017/prashant.cars?readPreference=primaryPreferred” –conf “spark.mongodb.output.uri=mongodb://127.0.0.1:27017/prashant.cars” –packages org.mongodb.spark:mongo-spark-connector_2.11:2.1.0

      Let me know if it works or not !

  2. Hi Prashant,

    Kindly tell how to do it in Scala IDE. I have setup Scala IDE as Maven project as you told earlier.

    Regards,
    Deepak

    1. I have written the following code:

      package com.deepak.spark.SparkLearning

      import org.apache.spark.sql.SparkSession
      import com.mongodb.spark._
      import org.apache.spark.sql._
      import org.apache.spark._
      import com.mongodb.spark.config.{ReadConfig, WriteConfig}
      import org.apache.log4j.Level

      import org.apache.log4j.{Level, Logger}

      object TestMongo {
      def main(args: Array[String]): Unit = {
      Logger.getLogger(“org”).setLevel(Level.ERROR)
      Logger.getLogger(“akka”).setLevel(Level.ERROR)
      val ss = SparkSession.builder().appName(“firstprogram”).master(“local”).getOrCreate()
      val sc= ss.sparkContext
      val sqlContext = new SQLContext(sc)

      val readConfig = ReadConfig(Map(“uri” -> “mongodb://127.0.0.1:27017/deepak.cars?readPreference=primaryPreferred”))
      val writeConfig = WriteConfig(Map(“uri” -> “mongodb://127.0.0.1:27017/deepak.cars”))

      val userId = 0

      // Load the movie rating data
      val cars = MongoSpark.load(sc, readConfig)
      cars.collect

      }
      }

      But nothing is showing in the out put
      Output:
      Using Spark’s default log4j profile: org/apache/spark/log4j-defaults.properties
      17/10/13 15:55:16 INFO MongoClientCache: Creating MongoClient: [127.0.0.1:27017]
      17/10/13 15:55:17 INFO MongoClientCache: Closing MongoClient: [127.0.0.1:27017]

Leave a Reply

Your email address will not be published. Required fields are marked *