Mongo can be connected only through enhanced datasource connections.
DDS is compatible with the MongoDB protocol.
An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
1 2 3 4 5 | <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> |
import org.apache.spark.sql.SparkSession import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
val sparkSession = SparkSession.builder().appName("datasource-mongo").getOrCreate()
sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')")
sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
sparkSession.sql("select * from test_dds").show()
val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######"
1 | val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType))) |
val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema)
1 2 3 4 5 6 7 8 9 | dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save() |
The options of mode are Overwrite, Append, ErrorIfExis, and Ignore.
1 2 3 4 5 6 7 8 | val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load() |
Operation result
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
1 2 3 4 5 | <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.11</artifactId> <version>2.3.2</version> </dependency> |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | import org.apache.spark.sql.SparkSession object TestMongoSql { def main(args: Array[String]): Unit = { val sparkSession = SparkSession.builder().getOrCreate() sparkSession.sql( "create table test_dds(id string, name string, age int) using mongo options( 'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin', 'uri' = 'mongodb://username:pwd@host:8635/db', 'database' = 'test', 'collection' = 'test', 'user' = 'rwuser', 'password' = '######')") sparkSession.sql("insert into test_dds values('3', 'Ann',23)") sparkSession.sql("select * from test_dds").show() sparkSession.close() } } |
import org.apache.spark.sql.{Row, SaveMode, SparkSession} import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType} object Test_Mongo_SparkSql { def main(args: Array[String]): Unit = { // Create a SparkSession session. val spark = SparkSession.builder().appName("mongodbTest").getOrCreate() // Set the connection configuration parameters. val url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" val uri = "mongodb://username:pwd@host:8635/db" val user = "rwuser" val database = "test" val collection = "test" val password = "######" // Setting up the schema val schema = StructType(List(StructField("id", StringType), StructField("name", StringType), StructField("age", IntegerType))) // Setting up the DataFrame val rdd = spark.sparkContext.parallelize(Seq(Row("1", "John", 23), Row("2", "Bob", 32))) val dataFrame = spark.createDataFrame(rdd, schema) // Write data to mongo dataFrame.write.format("mongo") .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .mode(SaveMode.Overwrite) .save() // Reading data from mongo val jdbcDF = spark.read.format("mongo").schema(schema) .option("url", url) .option("uri", uri) .option("database", database) .option("collection", collection) .option("user", user) .option("password", password) .load() jdbcDF.show() spark.close() } }