DLI outputs the Flink job output data to Kafka through the Kafka result table.
Apache Kafka is a fast, scalable, and fault-tolerant distributed message publishing and subscription system. It delivers high throughput and built-in partitions and provides data replicas and fault tolerance. Apache Kafka is applicable to scenarios of handling massive messages.
1 2 3 4 5 6 7 8 9 10 11 | create table kafkaSink( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'format' = '' ); |
Parameter |
Mandatory |
Default Value |
Data Type |
Description |
---|---|---|---|---|
connector |
Yes |
None |
string |
Connector to be used. Set this parameter to kafka. |
topic |
Yes |
None |
string |
Topic name of the Kafka result table. |
properties.bootstrap.servers |
Yes |
None |
string |
Kafka broker address. The value is in the format of host:port,host:port,host:port. Multiple host:port pairs are separated with commas (,). |
format |
Yes |
None |
string |
Format used by the Flink Kafka connector to serialize Kafka messages. Either this parameter or the value.format parameter is required. The following formats are supported:
Refer to Format for more details and format parameters. |
topic-pattern |
No |
None |
String |
Regular expression for matching the Kafka topic name. Only one of topic and topic-pattern can be specified. Example: 'topic.*' '(topic-c|topic-d)' '(topic-a|topic-b|topic-\\d*)' '(topic-a|topic-b|topic-[0-9]*)' |
properties.* |
No |
None |
String |
This parameter can set and pass arbitrary Kafka configurations. Note:
|
key.format |
No |
None |
String |
Format used to deserialize and serialize the key part of Kafka messages. Note:
|
key.fields |
No |
[] |
List<String> |
Defines the columns in the table as the list of keys. This parameter must be configured in pair with key.format. This parameter is left empty by default. Therefore, no key is defined. The format is like field1;field2. |
key.fields-prefix |
No |
None |
String |
Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. |
value.format |
Yes |
None |
String |
Format used to deserialize and serialize the value part of Kafka messages. Note:
|
value.fields-include |
No |
ALL |
Enum Possible values: [ALL, EXCEPT_KEY] |
Whether to contain the key field when parsing the message body. Possible values are:
|
sink.partitioner |
No |
None |
string |
Mapping from Flink's partitions into Kafka's partitions. Valid values are as follows:
|
sink.semantic |
No |
at-least-once |
String |
Defines the delivery semantic for the Kafka sink. Valid values are as follows:
|
sink.parallelism |
No |
None |
Integer |
Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator. |
In this example, data is read from a Kafka topic and written to another using a Kafka result table.
CREATE TABLE kafkaSource ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', "format" = "json" ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', "format" = "json" ); insert into kafkaSink select * from kafkaSource;
{"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
{"order_id":"202103241000000001","order_channel":"webShop","order_time":"2021-03-24 10:00:00","pay_amount":100.0,"real_pay":100.0,"pay_time":"2021-03-24 10:02:03","user_id":"0001","user_name":"Alice","area_id":"330106"} {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106"}
Create a Kafka cluster for DMS, enable SASL_SSL, download the SSL certificate, and upload the downloaded certificate client.jks to an OBS bucket.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', -- Location where the user uploads the certificate to 'properties.sasl.mechanism' = 'PLAIN', -- Value format: SASL_PLAINTEXT 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', -- Account and password set when the Kafka cluster is created "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/xx.jks', 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"xx\" password=\"xx\";', "format" = "json" ); insert into ordersSink select * from ordersSource;
Obtain the truststore.jks file using the authentication credential and store the credential and truststore.jks file in OBS.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', --Username 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_SSL', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- Password set for generating truststore.jks 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21009,xx:21009', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
CREATE TABLE ordersSources ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:21007,xx:21007', 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', "format" = "json" ); insert into ordersSink select * from ordersSource;
Obtain the truststore.jks file using the authentication credential and store the credential and truststore.jks file in OBS.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'xx', 'properties.bootstrap.servers' = 'xx:9093,xx:9093,xx:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- Password set for generating truststore.jks 'properties.security.protocol' = 'SSL', "format" = "json" ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;