This section describes how to interconnect FlinkServer with Hudi through Flink SQL jobs.
Table 1 lists the read and write operations supported by Flink on Hudi COW and MOR tables.
Flink SQL |
COW table |
MOR table |
---|---|---|
Batch write |
Supported |
Supported |
Batch read |
Supported |
Supported |
Stream write |
Supported |
Supported |
Stream read |
Supported |
Supported |
Currently, Flink SQL allows you to read data from Hudi tables only in snapshot mode and read optimized mode.
Select Enable CheckPoint in Running Parameter and set Time Interval (ms) to 60000.
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_mor select * from kafka;
CREATE TABLE stream_write_cow( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_cow' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into stream_write_cow select * from kafka;
CREATE TABLE hudi_read_spark_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/default/tb_hudimor', 'table.type' = 'MERGE_ON_READ' ); CREATE TABLE kafka( uuid VARCHAR(20), name VARCHAR(10), age INT, ts timestamp(6)INT, `p` VARCHAR(20) ) WITH ( 'connector' = 'kafka', 'topic' = 'writehudi', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup1', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); insert into hudi_read_spark_mor select * from kafka;
Kafka port number
Log in to FusionInsight Manager and choose Cluster > Services > Kafka. On the displayed page, click Configurations and then All Configurations, search for allow.everyone.if.no.acl.found, set its value to true, and click Save.
Ensure that no partition is added before the synchronization. After the synchronization, new partitions cannot be read.
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = 'Name of the table to be synchronized to Hive', 'hive_sync.db' = 'Name of the database to be synchronized to Hive', 'hive_sync.metastore.uris' = 'Value of hive.metastore.uris in the hive-site.xml file on the Hive client', 'hive_sync.jdbc_url' = 'Value of CLIENT_HIVE_URI in the component_env file on the Hive client' );
CREATE TABLE stream_mor( uuid VARCHAR(20), name VARCHAR(10), age INT, ts INT, `p` VARCHAR(20) ) PARTITIONED BY (`p`) WITH ( 'connector' = 'hudi', 'path' = 'hdfs://hacluster/tmp/hudi/stream_mor', 'table.type' = 'MERGE_ON_READ', 'hive_sync.enable' = 'true', 'hive_sync.table' = 'Name of the table to be synchronized to Hive', 'hive_sync.db' = 'Name of the database to be synchronized to Hive', 'hive_sync.mode' = 'hms', 'hive_sync.metastore.uris' = 'Value of hive.metastore.uris in the hive-site.xml file on the Hive client', 'properties.hive.metastore.kerberos.principal' = 'Value of hive.metastore.kerberos.principal in the hive-site.xml file on the Hive client' );