The Kafka connector allows for reading data from and writing data into Kafka topics.
Apache Kafka is a fast, scalable, and fault-tolerant distributed message publishing and subscription system. It delivers high throughput and built-in partitions and provides data replicas and fault tolerance. Apache Kafka is applicable to scenarios of handling massive messages.
Type |
Description |
|---|---|
Supported Table Types |
Source table and result table |
Supported Data Formats |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | create table kafkaSource( attr_name attr_type (',' attr_name attr_type)* (','PRIMARY KEY (attr_name, ...) NOT ENFORCED) (',' WATERMARK FOR rowtime_column_name AS watermark-strategy_expression) ) with ( 'connector' = 'kafka', 'topic' = '', 'properties.bootstrap.servers' = '', 'properties.group.id' = '', 'scan.startup.mode' = '', 'format' = '' ); |
Parameter |
Mandatory |
Default Value |
Data Type |
Description |
|---|---|---|---|---|
connector |
Yes |
None |
String |
Specify what connector to use, for Kafka use kafka. |
topic |
No |
None |
String |
Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like topic-1;topic-2. Note, only one of topic-pattern and topic can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks. |
topic-pattern |
No |
None |
String |
The regular expression for a pattern of topic names to read from. All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running. Note, only one of topic-pattern and topic can be specified for sources. For more information, see Topic and Partition Discovery. |
properties.bootstrap.servers |
Yes |
None |
String |
Comma separated list of Kafka brokers. |
properties.group.id |
optional for source, not applicable for sink |
None |
String |
The ID of the consumer group for Kafka source. If group ID is not specified, an automatically generated ID KafkaSource-{tableIdentifier} will be used. |
properties.* |
No |
None |
String |
This can set and pass arbitrary Kafka configurations.
|
format |
Yes |
None |
String |
The format used to deserialize and serialize the value part of Kafka messages. Either this parameter or the value.format parameter is required.
|
key.format |
No |
None |
String |
The format used to deserialize and serialize the key part of Kafka messages.
|
key.fields |
No |
[] |
List<String> |
Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like field1;field2. |
key.fields-prefix |
No |
None |
String |
Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and key.fields will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Note that this parameter requires that value.fields-include must be set to EXCEPT_KEY. |
value.format |
No |
None |
String |
The format used to deserialize and serialize the value part of Kafka messages.
|
value.fields-include |
No |
ALL |
Enum Possible values: [ALL, EXCEPT_KEY] |
Defines a strategy how to deal with key columns in the data type of the value format. By default, ALL physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format. |
scan.startup.mode |
No |
group-offsets |
String |
Startup mode for Kafka consumer. Valid values are:
|
scan.startup.specific-offsets |
No |
None |
String |
Specify offsets for each partition in case of specific-offsets startup mode, e.g. partition:0,offset:42;partition:1,offset:300. |
scan.startup.timestamp-millis |
No |
None |
Long |
Start from the specified epoch timestamp (milliseconds) used in case of timestamp startup mode. |
scan.topic-partition-discovery.interval |
No |
None |
Duration |
Interval for consumer to discover dynamically created Kafka topics and partitions periodically. |
Parameter |
Mandatory |
Default Value |
Data Type |
Description |
|---|---|---|---|---|
connector |
Yes |
None |
String |
Specify what connector to use, for Kafka use kafka. |
topic |
No |
None |
String |
Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like topic-1;topic-2. Note, only one of topic-pattern and topic can be specified for sources. When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks. |
properties.bootstrap.servers |
Yes |
None |
String |
Comma separated list of Kafka brokers. |
properties.* |
No |
None |
String |
This can set and pass arbitrary Kafka configurations.
|
format |
Yes |
None |
String |
The format used to deserialize and serialize the value part of Kafka messages. Note, either this parameter or the value.format parameter is required.
|
key.format |
No |
None |
String |
The format used to deserialize and serialize the key part of Kafka messages.
|
key.fields |
No |
[] |
List<String> |
Defines an explicit list of physical columns from the table schema that configure the data type for the key format. By default, this list is empty and thus a key is undefined. The list should look like field1;field2. |
key.fields-prefix |
No |
None |
String |
Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty. If a custom prefix is defined, both the table schema and key.fields will work with prefixed names. When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Note that this parameter requires that value.fields-include must be set to EXCEPT_KEY. |
value.format |
No |
None |
String |
The format used to deserialize and serialize the value part of Kafka messages.
|
value.fields-include |
No |
ALL |
Enum Possible values: [ALL, EXCEPT_KEY] |
Defines a strategy how to deal with key columns in the data type of the value format. By default, ALL physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format. |
sink.partitioner |
No |
'default' |
String |
Output partitioning from Flink's partitions into Kafka's partitions. Valid values are:
|
sink.semantic |
No |
at-least-once |
String |
Defines the delivery semantic for the Kafka sink. Valid enumerationns are at-least-once, exactly-once, and none. |
sink.parallelism |
No |
None |
Integer |
Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework: using the same parallelism as the upstream chained operator. |
You can define metadata in the source table to obtain the metadata of Kafka messages.
For example, if multiple topics are defined in the WITH parameter and metadata is defined in the Kafka source table, the data read by Flink is labeled with the topic from which the data is read.
Key |
Data Type |
R/W |
Description |
|---|---|---|---|
topic |
STRING NOT NULL |
R |
Topic name of the Kafka record. |
partition |
INT NOT NULL |
R |
Partition ID of the Kafka record. |
headers |
MAP<STRING, BYTES> NOT NULL |
R/W |
Headers of the Kafka record as a map of raw bytes. |
leader-epoch |
INT NULL |
R |
Leader epoch of the Kafka record if available. |
offset |
BIGINT NOT NULL |
R |
Offset of the Kafka record in the partition. |
timestamp |
TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL |
R/W |
Timestamp of the Kafka record. |
timestamp-type |
STRING NOT NULL |
R |
Timestamp type of the Kafka record.
|
Both the key and value part of a Kafka record can be serialized to and deserialized from raw bytes using one of the given formats.
Since a key is optional in Kafka records, the following statement reads and writes records with a configured value format but without a key format. The format parameter is a synonym for value.format. All format options are prefixed with the format identifier.
CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp', `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', ... 'format' = 'json', 'json.ignore-parse-errors' = 'true' )
The value format will be configured with the following data type:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
The following example shows how to specify and configure key and value formats. The format options are prefixed with either the key or value plus format identifier.
CREATE TABLE KafkaTable ( `ts` TIMESTAMP(3) METADATA FROM 'timestamp', `user_id` BIGINT, `item_id` BIGINT, `behavior` STRING ) WITH ( 'connector' = 'kafka', ... 'key.format' = 'json', 'key.json.ignore-parse-errors' = 'true', 'key.fields' = 'user_id;item_id', 'value.format' = 'json', 'value.json.fail-on-missing-field' = 'false', 'value.fields-include' = 'ALL' )
The key format includes the fields listed in key.fields (using ; as the delimiter) in the same order. Thus, it will be configured with the following data type:
ROW<`user_id` BIGINT, `item_id` BIGINT>
Since the value format is configured with 'value.fields-include' = 'ALL', key fields will also end up in the value format's data type:
ROW<`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING>
The connector cannot split the table's columns into key and value fields based on schema information if both key and value formats contain fields of the same name. The key.fields-prefix parameter allows to give key columns a unique name in the table schema while keeping the original names when configuring the key format.
The following example shows a key and value format that both contain a version field:
CREATE TABLE KafkaTable ( `k_version` INT, `k_user_id` BIGINT, `k_item_id` BIGINT, `version` INT, `behavior` STRING ) WITH ( 'connector' = 'kafka', ... 'key.format' = 'json', 'key.fields-prefix' = 'k_', 'key.fields' = 'k_version;k_user_id;k_item_id', 'value.format' = 'json', 'value.fields-include' = 'EXCEPT_KEY' )
The value format must be configured in EXCEPT_KEY mode. The formats will be configured with the following data types:
Key format: ROW<`version` INT, `user_id` BIGINT, `item_id` BIGINT> Value format: ROW<`version` INT, `behavior` STRING>
The config parameters topic and topic-pattern specify the topics or topic pattern to consume for source. The config parameter topic can accept topic list using semicolon separator like topic-1;topic-2. The config parameter topic-pattern will use regular expression to discover the matched topic. For example, if the topic-pattern is test-topic-[0-9], then all topics with names that match the specified regular expression (starting with test-topic- and ending with a single digit)) will be subscribed by the consumer when the job starts running.
To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for scan.topic-partition-discovery.interval. This allows the consumer to discover partitions of new topics with names that also match the specified pattern.
Note that topic list and topic pattern only work in sources. In sinks, Flink currently only supports a single topic.
CREATE TABLE kafkaSource( `topic` String metadata virtual, `partition` int metadata virtual, `headers` MAP<STRING, BYTES> metadata virtual, `leader-epoch` INT metadata virtual, `offset` bigint metadata virtual, `timestamp-type` string metadata virtual, `event_time` TIMESTAMP(3) metadata FROM 'timestamp', `message` string ) WITH ( 'connector' = 'kafka', 'topic' = 'SourceKafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', 'csv.field-delimiter' = '\u0001', 'csv.quote-character' = '''' ); CREATE TABLE kafkaSink ( `topic` String, `partition` int, `headers` MAP<STRING, BYTES>, `leader-epoch` INT, `offset` bigint, `timestampType` string, `event_time` TIMESTAMP(3), `message` string -- Indicates that data written by users is read from Kafka. ) WITH ( 'connector' = 'kafka', 'topic' = 'SinkKafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into kafkaSink select * from kafkaSource;
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"topic":"kafkaSource","partition":1,"headers":{},"leader-epoch":0,"offset":4,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.369","message":"{\"order_id\":\"202103251202020001\", \"order_channel\":\"miniAppShop\", \"order_time\":\"2021-03-25 12:02:02\", \"pay_amount\":\"60.00\", \"real_pay\":\"60.00\", \"pay_time\":\"2021-03-25 12:03:00\", \"user_id\":\"0002\", \"user_name\":\"Bob\", \"area_id\":\"330110\"}"}
{"topic":"kafkaSource","partition":0,"headers":{},"leader-epoch":0,"offset":6,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.367","message":"{\"order_id\":\"202103241000000001\",\"order_channel\":\"webShop\",\"order_time\":\"2021-03-24 10:00:00\",\"pay_amount\":100.0,\"real_pay\":100.0,\"pay_time\":\"2021-03-24 10:02:03\",\"user_id\":\"0001\",\"user_name\":\"Alice\",\"area_id\":\"330106\"}"}
{"topic":"kafkaSource","partition":2,"headers":{},"leader-epoch":0,"offset":5,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.368","message":"{\"order_id\":\"202103241606060001\",\"order_channel\":\"appShop\",\"order_time\":\"2021-03-24 16:06:06\",\"pay_amount\":200.0,\"real_pay\":180.0,\"pay_time\":\"2021-03-24 16:10:06\",\"user_id\":\"0001\",\"user_name\":\"Alice\",\"area_id\":\"330106\"}"}
CREATE TABLE kafkaSource( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSourceTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE kafkaSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaSinkTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'format' = 'json' ); insert into kafkaSink select * from kafkaSource;
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
Create a Kafka cluster for DMS, enable SASL_SSL, download the SSL certificate, and upload the downloaded certificate client.jks to an OBS bucket.
The properties.sasl.jaas.config field contains account passwords encrypted using DEW.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:9093,KafkaAddress2:9093', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/client.jks', -- Location where the user uploads the certificate to 'properties.sasl.mechanism' = 'PLAIN', 'properties.security.protocol' = 'SASL_SSL', 'properties.sasl.jaas.config' = 'xx', -- Key in DEW secret management, whose value is like org.apache.kafka.common.security.plain.PlainLoginModule required username=xx password=xx; 'format' = 'json', 'dew.endpoint' = 'kms.xx.com', --Endpoint information for the DEW service being used 'dew.csms.secretName' = 'xx', --Name of the DEW shared secret 'dew.csms.decrypt.fields' = 'properties.sasl.jaas.config', --The properties.sasl.jaas.config field value must be decrypted and replaced using DEW secret management. 'dew.csms.version' = 'v1' ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;
Obtain the truststore.jks file using the authentication credential and store the credential and truststore.jks file in OBS.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', -- Value configured in the MRS cluster 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', --Username 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_SSL', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- Key in the DEW secret 'properties.sasl.mechanism' = 'GSSAPI', 'format' = 'json', 'dew.endpoint'='kms.xx.xx.com', --Endpoint information for the DEW service being used 'dew.csms.secretName'='xx', --Name of the DEW shared secret 'dew.csms.decrypt.fields'='properties.ssl.truststore.password', --The properties.ssl.truststore.password field value must be decrypted and replaced using DEW secret management. 'dew.csms.version'='v1' ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'KafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.sasl.kerberos.service.name' = 'kafka', -- Configured in the MRS cluster 'properties.connector.auth.open' = 'true', 'properties.connector.kerberos.principal' = 'xx', 'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf', 'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.sasl.mechanism' = 'GSSAPI', 'format' = 'json' ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;
Obtain the truststore.jks file using the authentication credential and store the credential and truststore.jks file in OBS.
CREATE TABLE ordersSource ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'kafka', 'topic' = 'kafkaTopic', 'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'properties.connector.auth.open' = 'true', 'properties.ssl.truststore.location' = 'obs://xx/truststore.jks', 'properties.ssl.truststore.password' = 'xx', -- Key for DEW secret management, whose value is the password set when generating truststore.jks 'properties.security.protocol' = 'SSL', 'format' = 'json', 'dew.endpoint' = 'kms.xx.com', --Endpoint information for the DEW service being used 'dew.csms.secretName' = 'xx', --Name of the DEW shared secret 'dew.csms.decrypt.fields' = 'properties.ssl.truststore.password', --The properties.ssl.truststore.password field value must be decrypted and replaced using DEW secret management. 'dew.csms.version' = 'v1' ); CREATE TABLE ordersSink ( order_id string, order_channel string, order_time timestamp(3), pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into ordersSink select * from ordersSource;
org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata
A: The datasource connection is not bound, the binding fails, or the security group of the Kafka cluster is not configured to allow access from the network segment of the DLI queue. Reconfigure the datasource connection or configure the security group of the Kafka cluster to allow access from the DLI queue.
Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.
A: The metadata type is defined in the sink table, but the Print connector does not support deletion of matadata from the sink table.