Create a Doris dimension table to connect to the source streams for wide table generation.
For details, see "Modifying Host Information" in Data Lake Insight User Guide.
After connecting to Doris as user admin, create a role with administrator permissions, and bind the role to the user.
create table hbaseSource (
attr_name attr_type
(',' attr_name attr_type)*
)
with (
'connector' = 'doris',
'fenodes' = 'FE_IP:PORT,FE_IP:PORT,FE_IP:PORT',
'table.identifier' = 'database.table',
'username' = 'dorisUsername',
'password' = 'dorisPassword'
);
Shared configuration
Parameter |
Default Value |
Mandatory |
Parameter Type Description |
|---|---|---|---|
fenodes |
-- |
Y |
IP address and port number of the Doris FE. Use commas (,) to separate them for multiple instances. To obtain the port number, log in to MRS Manager, choose Cluster > Services > Doris > Configurations, and search for http. Search for https instead if HTTPS is enabled. |
table.identifier |
-- |
Y |
Doris table name, for example, db.tbl. |
username |
-- |
Y |
User name for accessing Doris. |
password |
-- |
Y |
Password for accessing Doris. |
lookup.cache.max-rows |
-1L |
N |
Maximum number of rows to search in the cache, where the oldest row will be deleted if this value is exceeded. To enable cache configuration, both the cache.max-rows and cache.ttl options must be specified. |
lookup.cache.ttl |
10s |
N |
Cache lifespan. |
lookup.max-retries |
3 |
N |
Maximum number of retry attempts when a database lookup fails. |
This example reads data from a Doris source table and inputs it into the Print connector.
CREATE TABLE IF NOT EXISTS dorisdemo
(
`user_id` varchar(10) NOT NULL,
`city` varchar(10),
`age` int,
`gender` int
)
DISTRIBUTED BY HASH(`user_id`) BUCKETS 10;
INSERT INTO dorisdemo VALUES ('user1', 'city1', 20, 1);
INSERT INTO dorisdemo VALUES ('user2', 'city2', 21, 0);
INSERT INTO dorisdemo VALUES ('user3', 'city3', 22, 1);
INSERT INTO dorisdemo VALUES ('user4', 'city4', 23, 0);
INSERT INTO dorisdemo VALUES ('user5', 'city5', 24, 1);
INSERT INTO dorisdemo VALUES ('user6', 'city6', 25, 0);
INSERT INTO dorisdemo VALUES ('user7', 'city7', 26, 1);
INSERT INTO dorisdemo VALUES ('user8', 'city8', 27, 0);
INSERT INTO dorisdemo VALUES ('user9', 'city9', 28, 1);
INSERT INTO dorisdemo VALUES ('user10', 'city10', 29, 0);
CREATE TABLE ordersSource ( user_id string, user_name string, proctime as Proctime() ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka-topic', 'properties.bootstrap.servers' = 'kafkaIp:port,kafkaIp:port,kafkaIp:port', 'properties.group.id' = 'GroupId', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); CREATE TABLE dorisDemo ( `user_id` String NOT NULL, `city` String, `age` int, `gender` int ) with ( 'connector' = 'doris', 'fenodes' = 'IP address of the FE instance:Port number', 'table.identifier' = 'demo.dorisdemo', 'username' = 'dorisUsername', 'password' = 'dorisPassword', 'lookup.cache.ttl'='10 m', 'lookup.cache.max-rows' = '100' ); CREATE TABLE print ( user_id string, user_name string, `city` String, `age` int, `gender` int ) WITH ( 'connector' = 'print' ); insert into print select orders.user_id, orders.user_name, dim.city, dim.age, dim.sex from ordersSource orders left join dorisDemo for system_time as of orders.proctime as dim on orders.user_id = dim.user_id;
{"user_id": "user1", "user_name": "name1"}
{"user_id": "user2", "user_name": "name2"}
+I[user1, name1, city1, 20, 1] +I[user2, name2, city2, 21, 0]