Create a Redis table to connect to source streams for wide table generation.
CREATE TABLE redisSource ( redisKey string, order_id string, score1 double, order_channel string, score2 double, order_time string, score3 double, pay_amount double, score4 double, real_pay double, score5 double, pay_time string, score6 double, user_id string, score7 double, user_name string, score8 double, area_id string, score9 double, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields-scores' );
CREATE TABLE redisSink ( order_id string, arrayField Array<String>, arrayScore array<double>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', "default-score" = '3', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'array-scores' );
create table dwsSource (
attr_name attr_type
(',' attr_name attr_type)*
(',' watermark for rowtime_column_name as watermark-strategy_expression)
,PRIMARY KEY (attr_name, ...) NOT ENFORCED
)
with (
'connector' = 'redis',
'host' = ''
);
Parameter |
Mandatory |
Default Value |
Data Types |
Description |
|---|---|---|---|---|
connector |
Yes |
None |
String |
Connector type. Set this parameter to redis. |
host |
Yes |
None |
String |
Redis connector address |
port |
No |
6379 |
Integer |
Redis connector port |
password |
No |
None |
String |
Redis authentication password |
namespace |
No |
None |
String |
Redis key namespace |
delimiter |
No |
: |
String |
Delimiter between the Redis key and namespace |
data-type |
No |
hash |
String |
Redis data type. Available values are as follows:
For details about the constraints, see Constraints on data-type. |
schema-syntax |
No |
fields |
String |
Redis schema semantics. Available values are as follows:
For details about the constraints, see Constraints on schema-syntax. |
deploy-mode |
No |
standalone |
String |
Deployment mode of the Redis cluster. The value can be standalone, master-replica, or cluster. The default value is standalone. |
retry-count |
Yes |
5 |
Integer |
Size of each connection request queue. If the number of connection requests in a queue exceeds the queue size, command calling will cause RedisException. Setting requestQueueSize to a small value will cause exceptions to occur earlier during overload or disconnection. A larger value indicates more time required to reach the boundary, but more requests may be queued and more heap space may be used. The default value is 2147483647. |
connection-timeout-millis |
No |
10000 |
Integer |
Maximum timeout for connecting to the Redis cluster |
commands-timeout-millis |
No |
2000 |
Integer |
Maximum time for waiting for a completion response |
rebalancing-timeout-millis |
No |
15000 |
Integer |
Sleep time when the Redis cluster fails |
scan-keys-count |
No |
1000 |
Integer |
Number of data records read in each scan |
default-score |
No |
0 |
Double |
Default score when data-type is sorted-set |
deserialize-error-policy |
No |
fail-job |
Enum |
How to process a data parsing failure Available values are as follows:
|
skip-null-values |
No |
true |
Boolean |
Whether null values will be skipped |
lookup.async |
No |
false |
Boolean |
Whether asynchronous I/O will be used when this table is used as a dimension table |
lookup.parallelism |
No |
None |
int |
Defines the custom parallelism of the lookup join operator. If this parameter is not defined, the planner will derive the parallelism by considering the global configuration (if the lookup.parallelism parameter is defined) or the parallelism of the input operator. |
lookup.batch.interval |
No |
1s |
Duration |
Batch lookup join can buffer input records with a maximum delay. Batch lookup join can buffer input records with a maximum delay. |
lookup.batch.size |
No |
100L |
long |
Maximum number of input records that can be buffered for batch lookup join. |
lookup.batch |
No |
false |
Boolean |
Whether to enable batch lookup optimization. If enabled, the user must set both the lookup.batch.interval and lookup.batch.size parameters. Additionally, due to the implementation of the underlying batch processing interval interference mechanism, the user must explicitly enable the table.exec.batch-lookup.enabled parameter in the Flink configuration. |
ignore-retractions |
No |
false |
Boolean |
The connector should ignore retraction messages in the update insert/withdraw flow mode. |
Read data from a Kafka source table, use a Redis table as the dimension table. Write wide table information generated by the source and dimension tables to a Kafka result table. The procedure is as follows:
HMSET 330102 area_province_name a1 area_province_name b1 area_county_name c1 area_street_name d1 region_name e1 HMSET 330106 area_province_name a1 area_province_name b1 area_county_name c2 area_street_name d2 region_name e1 HMSET 330108 area_province_name a1 area_province_name b1 area_county_name c3 area_street_name d3 region_name e1 HMSET 330110 area_province_name a1 area_province_name b1 area_county_name c4 area_street_name d4 region_name e1
CREATE TABLE orders (
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string,
proctime as Proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'kafkaSourceTopic',
'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
'properties.group.id' = 'GroupId',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
-- Create an address dimension table
create table area_info (
area_id string,
area_province_name string,
area_city_name string,
area_county_name string,
area_street_name string,
region_name string,
primary key (area_id) not enforced -- Redis key
) WITH (
'connector' = 'redis',
'host' = 'RedisIP',
'password' = 'RedisPassword',
'data-type' = 'hash',
'deploy-mode' = 'master-replica'
);
-- Generate a wide table based on the address dimension table containing detailed order information.
create table order_detail(
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string,
area_province_name string,
area_city_name string,
area_county_name string,
area_street_name string,
region_name string
) with (
'connector' = 'kafka',
'topic' = 'kafkaSinkTopic',
'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
'format' = 'json'
);
insert into order_detail
select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
area.area_street_name, area.region_name from orders
left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
{"order_id":"202103251505050001", "order_channel":"appShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
{"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"}
{"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"}
{"order_id":"202103251505050001","order_channel":"appshop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}