The CloudTable HBase and MRS HBase can be connected to DLI as data sources.
A datasource connection has been created on the DLI management console.
Hard-coded or plaintext passwords pose significant security risks. To ensure security, encrypt your passwords, store them in configuration files or environment variables, and decrypt them when needed.
1 2 3 | from __future__ import print_function from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType from pyspark.sql import SparkSession |
1 | sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate() |
sparkSession.sql(
"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
'ZKHost' = '192.168.0.189:2181',\
'TableName' = 'hbtest',\
'RowKey' = 'id:5',\
'Cols' = 'location:info.location,city:detail.city')")
sparkSession.sql(
"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
'ZKHost' = '192.168.0.189:2181',\
'TableName' = 'hbtest',\
'RowKey' = 'id:5',\
'Cols' = 'location:info.location,city:detail.city',\
'krb5conf' = './krb5.conf',\
'keytab'='./user.keytab',\
'principal' ='krbtest')")
For details about how to obtain the krb5.conf and keytab files, see Completing Configurations for Enabling Kerberos Authentication.
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
sparkSession.sql("select * from testhbase").show()
1 2 3 4 5 6 7 8 9 | sparkSession.sql(\ "CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG, floatf FLOAT, doublef DOUBLE) using hbase OPTIONS (\ 'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\ cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\ cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\ 'TableName' = 'table_DupRowkey1',\ 'RowKey' = 'id:5,location:6,city:7',\ 'Cols' = 'booleanf:CF1.booleanf, shortf:CF1.shortf, intf:CF1.intf, \ longf:CF1.longf, floatf:CF1.floatf, doublef:CF1.doublef')") |
1 2 3 4 5 6 7 8 9 | schema = StructType([StructField("id", StringType()),\ StructField("location", StringType()),\ StructField("city", StringType()),\ StructField("booleanf", BooleanType()),\ StructField("shortf", ShortType()),\ StructField("intf", IntegerType()),\ StructField("longf", LongType()),\ StructField("floatf", FloatType()),\ StructField("doublef", DoubleType())]) |
1 | dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)]) |
1 | dataFrame = sparkSession.createDataFrame(dataList, schema) |
1 | dataFrame.write.insertInto("test_hbase") |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | // Set cross-source connection parameters TableName = "table_DupRowkey1" RowKey = "id:5,location:6,city:7" Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef" ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181, cloudtable-cf82-zk1- WY09px9l.cloudtable.com:2181" // select jdbcDF = sparkSession.read.schema(schema)\ .format("hbase")\ .option("ZKHost",ZKHost)\ .option("TableName",TableName)\ .option("RowKey",RowKey)\ .option("Cols",Cols)\ .load() jdbcDF.filter("id = '12333' or id='11111'").show() |
The length of id, location, and city parameter is limited. When inserting data, you must set the data values based on the required length. Otherwise, an encoding format error occurs during query.
spark.driver.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
spark.executor.extraClassPath=/usr/share/extension/dli/spark-jar/datasource/hbase/*
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
sparkSession.sql(
"CREATE TABLE testhbase(id STRING, location STRING, city STRING) using hbase OPTIONS (\
'ZKHost' = '192.168.0.189:2181',\
'TableName' = 'hbtest',\
'RowKey' = 'id:5',\
'Cols' = 'location:info.location,city:detail.city')")
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
sparkSession.sql("select * from testhbase").show()
# close session
sparkSession.stop()
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark import SparkFiles
from pyspark.sql import SparkSession
import shutil
import time
import os
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("Test_HBase_SparkSql_Kerberos").getOrCreate()
sc = sparkSession.sparkContext
time.sleep(10)
krb5_startfile = SparkFiles.get("krb5.conf")
keytab_startfile = SparkFiles.get("user.keytab")
path_user = os.getcwd()
krb5_endfile = path_user + "/" + "krb5.conf"
keytab_endfile = path_user + "/" + "user.keytab"
shutil.copy(krb5_startfile, krb5_endfile)
shutil.copy(keytab_startfile, keytab_endfile)
time.sleep(20)
sparkSession.sql(
"CREATE TABLE testhbase(id string,booleanf boolean,shortf short,intf int,longf long,floatf float,doublef double) " +
"using hbase OPTIONS(" +
"'ZKHost'='10.0.0.146:2181'," +
"'TableName'='hbtest'," +
"'RowKey'='id:100'," +
"'Cols'='booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF2.longf,floatf:CF1.floatf,doublef:CF2.doublef'," +
"'krb5conf'='" + path_user + "/krb5.conf'," +
"'keytab'='" + path_user+ "/user.keytab'," +
"'principal'='krbtest') ")
sparkSession.sql("insert into testhbase values('95274','abc','Jinan')")
sparkSession.sql("select * from testhbase").show()
# close session
sparkSession.stop()
# _*_ coding: utf-8 _*_
from __future__ import print_function
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType, ShortType, LongType, FloatType, DoubleType
from pyspark.sql import SparkSession
if __name__ == "__main__":
# Create a SparkSession session.
sparkSession = SparkSession.builder.appName("datasource-hbase").getOrCreate()
# Createa data table for DLI-associated ct
sparkSession.sql(\
"CREATE TABLE test_hbase(id STRING, location STRING, city STRING, booleanf BOOLEAN, shortf SHORT, intf INT, longf LONG,floatf FLOAT,doublef DOUBLE) using hbase OPTIONS ( \
'ZKHost' = 'cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,\
cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,\
cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181',\
'TableName' = 'table_DupRowkey1',\
'RowKey' = 'id:5,location:6,city:7',\
'Cols' = 'booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef')")
# Create a DataFrame and initialize the DataFrame data.
dataList = sparkSession.sparkContext.parallelize([("11111", "aaa", "aaa", False, 4, 3, 23, 2.3, 2.34)])
# Setting schema
schema = StructType([StructField("id", StringType()),
StructField("location", StringType()),
StructField("city", StringType()),
StructField("booleanf", BooleanType()),
StructField("shortf", ShortType()),
StructField("intf", IntegerType()),
StructField("longf", LongType()),
StructField("floatf", FloatType()),
StructField("doublef", DoubleType())])
# Create a DataFrame from RDD and schema
dataFrame = sparkSession.createDataFrame(dataList, schema)
# Write data to the cloudtable-hbase
dataFrame.write.insertInto("test_hbase")
# Set cross-source connection parameters
TableName = "table_DupRowkey1"
RowKey = "id:5,location:6,city:7"
Cols = "booleanf:CF1.booleanf,shortf:CF1.shortf,intf:CF1.intf,longf:CF1.longf,floatf:CF1.floatf,doublef:CF1.doublef"
ZKHost = "cloudtable-cf82-zk3-pa6HnHpf.cloudtable.com:2181,cloudtable-cf82-zk2-weBkIrjI.cloudtable.com:2181,
cloudtable-cf82-zk1-WY09px9l.cloudtable.com:2181"
# Read data on CloudTable-HBase
jdbcDF = sparkSession.read.schema(schema)\
.format("hbase")\
.option("ZKHost", ZKHost)\
.option("TableName",TableName)\
.option("RowKey", RowKey)\
.option("Cols", Cols)\
.load()
jdbcDF.filter("id = '12333' or id='11111'").show()
# close session
sparkSession.stop()