Mongo can be connected only through enhanced datasource connections.
DDS is compatible with the MongoDB protocol.
An enhanced datasource connection has been created on the DLI management console and bound to a queue in packages.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession
1 | sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate() |
1 2 3 4 5 6 | url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin" uri = "mongodb://username:pwd@host:8635/db" user = "rwuser" database = "test" collection = "test" password = "######" |
1 2 3 4 5 | dataList = sparkSession.sparkContext.parallelize([(1, "Katie", 19),(2,"Tom",20)]) schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False), StructField("age", IntegerType(), False)]) dataFrame = sparkSession.createDataFrame(dataList, schema) |
1 2 3 4 5 6 7 8 9 | dataFrame.write.format("mongo") .option("url", url)\ .option("uri", uri)\ .option("user",user)\ .option("password",password)\ .option("database",database)\ .option("collection",collection)\ .mode("Overwrite")\ .save() |
1 2 3 4 5 6 7 8 9 10 | jdbcDF = sparkSession.read .format("mongo")\ .option("url", url)\ .option("uri", uri)\ .option("user",user)\ .option("password",password)\ .option("database",database)\ .option("collection",collection)\ .load() jdbcDF.show() |
sparkSession.sql(
"create table test_dds(id string, name string, age int) using mongo options(
'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',
'uri' = 'mongodb://username:pwd@host:8635/db',
'database' = 'test',
'collection' = 'test',
'user' = 'rwuser',
'password' = '######')")
1 | sparkSession.sql("insert into test_dds values('3', 'Ann',23)") |
1 | sparkSession.sql("select * from test_dds").show() |
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/mongo/*
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
# Create a DataFrame and initialize the DataFrame data.
dataList = sparkSession.sparkContext.parallelize([("1", "Katie", 19),("2","Tom",20)])
# Setting schema
schema = StructType([StructField("id", IntegerType(), False),StructField("name", StringType(), False), StructField("age", IntegerType(), False)])
# Create a DataFrame from RDD and schema
dataFrame = sparkSession.createDataFrame(dataList, schema)
# Setting connection parameters
url = "192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin"
uri = "mongodb://username:pwd@host:8635/db"
user = "rwuser"
database = "test"
collection = "test"
password = "######"
# Write data to the mongodb table
dataFrame.write.format("mongo")
.option("url", url)\
.option("uri", uri)\
.option("user",user)\
.option("password",password)\
.option("database",database)\
.option("collection",collection)
.mode("Overwrite").save()
# Read data
jdbcDF = sparkSession.read.format("mongo")
.option("url", url)\
.option("uri", uri)\
.option("user",user)\
.option("password",password)\
.option("database",database)\
.option("collection",collection)\
.load()
jdbcDF.show()
# close session
sparkSession.stop()
from __future__ import print_function
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-mongo").getOrCreate()
# Create a data table for DLI - associated mongo
sparkSession.sql(
"create table test_dds(id string, name string, age int) using mongo options(\
'url' = '192.168.4.62:8635,192.168.5.134:8635/test?authSource=admin',\
'uri' = 'mongodb://username:pwd@host:8635/db',\
'database' = 'test',\
'collection' = 'test', \
'user' = 'rwuser', \
'password' = '######')")
# Insert data into the DLI-table
sparkSession.sql("insert into test_dds values('3', 'Ann',23)")
# Read data from DLI-table
sparkSession.sql("select * from test_dds").show()
# close session
sparkSession.stop()