A datasource connection has been created on the DLI management console.
1 2 3 | from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row from pyspark.sql import SparkSession |
1 | sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() |
1 2 | resource = "/mytest" nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200" |
resource indicates the name of the resource associated with the CSS. You can specify the resource location in /index/type format. (The index can be the database and type the table.)
1 2 3 | schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")]) |
1 | dataFrame = sparkSession.createDataFrame(rdd, schema) |
1 | dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save() |
The options of mode can be one of the following:
1 2 | jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load() jdbcDF.show() |
1 2 3 4 5 | sparkSession.sql( "create table css_table(id long, name string) using css options( 'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200', 'es.nodes.wan.only'='true', 'resource'='/mytest')") |
1 | sparkSession.sql("insert into css_table values(3,'tom')") |
1 2 | jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show() |
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/css/*
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() # Setting cross-source connection parameters resource = "/mytest" nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200" # Setting schema schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) # Construction data rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(rdd, schema) # Write data to the CSS dataFrame.write.format("css").option("resource", resource).option("es.nodes", nodes).mode("Overwrite").save() # Read data jdbcDF = sparkSession.read.format("css").option("resource", resource).option("es.nodes", nodes).load() jdbcDF.show() # close session sparkSession.stop() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() # Create a DLI data table for DLI-associated CSS sparkSession.sql( "create table css_table(id long, name string) using css options( \ 'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\ 'es.nodes.wan.only'='true',\ 'resource'='/mytest')") # Insert data into the DLI data table sparkSession.sql("insert into css_table values(3,'tom')") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show() # close session sparkSession.stop() |
1 2 3 | from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, Row from pyspark.sql import SparkSession |
Hard-coded or plaintext AK and SK pose significant security risks. To ensure security, encrypt your AK and SK, store them in configuration files or environment variables, and decrypt them when needed.
1 2 3 4 5 | sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak) sparkSession.conf.set("fs.obs.secret.key", sk) sparkSession.conf.set("fs.obs.endpoint", enpoint) sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false") |
1 2 | resource = "/mytest"; nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200" |
resource indicates the name of the resource associated with the CSS. You can specify the resource location in /index/type format. (The index can be the database and type the table.)
1 2 3 | schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")]) |
1 | dataFrame = sparkSession.createDataFrame(rdd, schema) |
1 2 3 4 5 6 7 8 9 10 11 12 | dataFrame.write.format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode("Overwrite") .save() |
The options of mode can be one of the following:
1 2 3 4 5 6 7 8 9 10 11 12 | jdbcDF = sparkSession.read.format("css")\ .option("resource", resource)\ .option("es.nodes", nodes)\ .option("es.net.ssl", "true")\ .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks")\ .option("es.net.ssl.keystore.pass", "***")\ .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks")\ .option("es.net.ssl.truststore.pass", "***")\ .option("es.net.http.auth.user", "admin")\ .option("es.net.http.auth.pass", "***")\ .load() jdbcDF.show() |
1 2 3 4 5 6 7 8 9 10 11 12 | sparkSession.sql( "create table css_table(id long, name string) using css options(\ 'es.nodes'='to-css-1174404953-hDTx3UPK.datasource.com:9200',\ 'es.nodes.wan.only'='true',\ 'resource'='/mytest',\ 'es.net.ssl'='true',\ 'es.net.ssl.keystore.location'='obs://Bucket name/path/transport-keystore.jks',\ 'es.net.ssl.keystore.pass'='***',\ 'es.net.ssl.truststore.location'='obs://Bucket name/path/truststore.jks',\ 'es.net.ssl.truststore.pass'='***',\ 'es.net.http.auth.user'='admin',\ 'es.net.http.auth.pass'='***')") |
1 | sparkSession.sql("insert into css_table values(3,'tom')") |
1 2 | jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show() |
Hard-coded or plaintext AK and SK pose significant security risks. To ensure security, encrypt your AK and SK, store them in configuration files or environment variables, and decrypt them when needed.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 | # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql.types import Row, StructType, StructField, IntegerType, StringType from pyspark.sql import SparkSession if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() sparkSession.conf.set("fs.obs.access.key", ak) sparkSession.conf.set("fs.obs.secret.key", sk) sparkSession.conf.set("fs.obs.endpoint", enpoint) sparkSession.conf.set("fs.obs.connecton.ssl.enabled", "false") # Setting cross-source connection parameters resource = "/mytest"; nodes = "to-css-1174404953-hDTx3UPK.datasource.com:9200" # Setting schema schema = StructType([StructField("id", IntegerType(), False), StructField("name", StringType(), False)]) # Construction data rdd = sparkSession.sparkContext.parallelize([Row(1, "John"), Row(2, "Bob")]) # Create a DataFrame from RDD and schema dataFrame = sparkSession.createDataFrame(rdd, schema) # Write data to the CSS dataFrame.write.format("css") .option("resource", resource) .option("es.nodes", nodes) .option("es.net.ssl", "true") .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks") .option("es.net.ssl.keystore.pass", "***") .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***") .option("es.net.http.auth.user", "admin") .option("es.net.http.auth.pass", "***") .mode("Overwrite") .save() # Read data jdbcDF = sparkSession.read.format("css")\ .option("resource", resource)\ .option("es.nodes", nodes)\ .option("es.net.ssl", "true")\ .option("es.net.ssl.keystore.location", "obs://Bucket name/path/transport-keystore.jks")\ .option("es.net.ssl.keystore.pass", "***")\ .option("es.net.ssl.truststore.location", "obs://Bucket name/path/truststore.jks") .option("es.net.ssl.truststore.pass", "***")\ .option("es.net.http.auth.user", "admin")\ .option("es.net.http.auth.pass", "***")\ .load() jdbcDF.show() # close session sparkSession.stop() |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 | # _*_ coding: utf-8 _*_ from __future__ import print_function from pyspark.sql import SparkSession import os if __name__ == "__main__": # Create a SparkSession session. sparkSession = SparkSession.builder.appName("datasource-css").getOrCreate() # Create a DLI data table for DLI-associated CSS sparkSession.sql("create table css_table(id int, name string) using css options(\ 'es.nodes'='192.168.6.204:9200',\ 'es.nodes.wan.only'='true',\ 'resource'='/mytest',\ 'es.net.ssl'='true',\ 'es.net.ssl.keystore.location' = 'obs://xietest1/lzq/keystore.jks',\ 'es.net.ssl.keystore.pass' = '**',\ 'es.net.ssl.truststore.location'='obs://xietest1/lzq/truststore.jks',\ 'es.net.ssl.truststore.pass'='**',\ 'es.net.http.auth.user'='admin',\ 'es.net.http.auth.pass'='**')") # Insert data into the DLI data table sparkSession.sql("insert into css_table values(3,'tom')") # Read data from DLI data table jdbcDF = sparkSession.sql("select * from css_table") jdbcDF.show() # close session sparkSession.stop() |