This section describes how to create a Kafka table to automatically synchronize Kafka data to the ClickHouse cluster.
Currently, ClickHouse cannot interconnect with Kafka clusters with security mode enabled.
CREATE TABLE [IF NOT EXISTS] [db.]table_name [ON CLUSTER cluster] ( name1 [type1] [DEFAULT|MATERIALIZED|ALIAS expr1], name2 [type2] [DEFAULT|MATERIALIZED|ALIAS expr2], ... ) ENGINE = Kafka() SETTINGS kafka_broker_list = 'host1:port1,host2:port2', kafka_topic_list = 'topic1,topic2,...', kafka_group_name = 'group_name', kafka_format = 'data_format'; [kafka_row_delimiter = 'delimiter_symbol',] [kafka_schema = '',] [kafka_num_consumers = N]
Parameter |
Mandatory |
Description |
---|---|---|
kafka_broker_list |
Yes |
A list of Kafka broker instances, separated by comma (,). For example, IP address 1 of the Kafka broker instance:9092,IP address 2 of the Kafka broker instance:9092,IP address 3 of the Kafka broker instance:9092. NOTE:
If the Kerberos authentication is enabled, parameter allow.everyone.if.no.acl.found must be set to true if port 21005 is used. Otherwise, an error will be reported. To obtain the IP address of the Kafka broker instance, perform the following steps: Log in to FusionInsight Manager and choose Cluster > Name of the desired cluster > Services > Kafka. Click Instances to query the IP addresses of the Kafka instances. |
kafka_topic_list |
Yes |
A list of Kafka topics. |
kafka_group_name |
Yes |
A group of Kafka consumers, which can be customized. |
kafka_format |
Yes |
Kafka message format, for example, JSONEachRow, CSV, and XML. |
kafka_row_delimiter |
No |
Delimiter character, which ends a message. |
kafka_schema |
No |
Parameter that must be used if the format requires a schema definition. |
kafka_num_consumers |
No |
Number of consumers in per table. The default value is 1. If the throughput of a consumer is insufficient, more consumers are required. The total number of consumers cannot exceed the number of partitions in a topic because only one consumer can be allocated to each partition. |
kafka-topics.sh --topic kafkacktest2 --create --zookeeper IP address of the Zookeeper role instance:Port used by ZooKeeper to listen to client/kafka --partitions 2 --replication-factor 1
Log in to FusionInsight Manager. For details, see Accessing FusionInsight Manager. Choose Cluster > Name of the desired cluster > Services > ZooKeeper > Instance. View the IP addresses of the ZooKeeper role instances.
kinit Component service user
Example: kinit clickhouseuser
clickhouse client --host IP address of the ClickHouse instance --user Login username --password --port ClickHouse port number --database Database name --multiline
Enter the user password.
create table kafka_src_tbl3 on cluster default_cluster (id UInt32, age UInt32, msg String) ENGINE=Kafka() SETTINGS kafka_broker_list='IP address 1 of the Kafka broker instance:9092,IP address 2 of the Kafka broker instance:9092,IP address 3 of the Kafka broker instance:9092', kafka_topic_list='kafkacktest2', kafka_group_name='cg12', kafka_format='JSONEachRow';
create table kafka_dest_tbl3 on cluster default_cluster ( id UInt32, age UInt32, msg String ) engine = ReplicatedMergeTree('/clickhouse/tables/{shard}/default/kafka_dest_tbl3', '{replica}') partition by age order by id;
create materialized view consumer3 on cluster default_cluster to kafka_dest_tbl3 as select * from kafka_src_tbl3;
>{"id":31, "age":30, "msg":"31 years old"} >{"id":32, "age":30, "msg":"31 years old"} >{"id":33, "age":30, "msg":"31 years old"} >{"id":35, "age":30, "msg":"31 years old"}
select * from kafka_dest_tbl3;