Create a source stream to obtain data from Redis as input for jobs.
CREATE TABLE redisSource ( redisKey string, order_id string, score1 double, order_channel string, score2 double, order_time string, score3 double, pay_amount double, score4 double, real_pay double, score5 double, pay_time string, score6 double, user_id string, score7 double, user_name string, score8 double, area_id string, score9 double, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'fields-scores' );
This non-primary key field must be of the map type. The map value of the field must be of the double type, indicating the score. The map key of the field indicates the value in the Redis set.
CREATE TABLE redisSink ( order_id string, arrayField Array<String>, arrayScore array<double>, primary key (order_id) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'sorted-set', "default-score" = '3', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'array-scores' );
1 2 3 4 5 6 7 8 9 10 | create table dwsSource ( attr_name attr_type (',' attr_name attr_type)* (',' watermark for rowtime_column_name as watermark-strategy_expression) ,PRIMARY KEY (attr_name, ...) NOT ENFORCED ) with ( 'connector' = 'redis', 'host' = '' ); |
Parameter |
Mandatory |
Default Value |
Data Type |
Description |
---|---|---|---|---|
connector |
Yes |
None |
String |
Connector to be used. Set this parameter to redis. |
host |
Yes |
None |
String |
Redis connector address. |
port |
No |
6379 |
Integer |
Redis connector port. |
password |
No |
None |
String |
Redis authentication password. |
namespace |
No |
None |
String |
Redis key namespace. |
delimiter |
No |
: |
String |
Delimiter between the Redis key and namespace. |
data-type |
No |
hash |
String |
Redis data type. Available values are as follows:
For details about the constraints, see Constraints on data-type. |
schema-syntax |
No |
fields |
String |
Redis schema semantics. Available values are as follows (for details, see Precautions and FAQ):
For details about the constraints, see Constraints on schema-syntax. |
deploy-mode |
No |
standalone |
String |
Deployment mode of the Redis cluster. The value can be standalone, master-replica, or cluster. The default value is standalone. |
retry-count |
No |
5 |
Integer |
Number of attempts to connect to the Redis cluster. |
connection-timeout-millis |
No |
10000 |
Integer |
Maximum timeout for connecting to the Redis cluster. |
commands-timeout-millis |
No |
2000 |
Integer |
Maximum time for waiting for a completion response. |
rebalancing-timeout-millis |
No |
15000 |
Integer |
Sleep time when the Redis cluster fails. |
scan-keys-count |
No |
1000 |
Integer |
Number of data records read in each scan. |
default-score |
No |
0 |
Double |
Default score when data-type is sorted-set. |
deserialize-error-policy |
No |
fail-job |
Enum |
Policy of how to process a data parsing failure. Available values are as follows:
|
skip-null-values |
No |
true |
Boolean |
Whether null values will be skipped. |
In this example, data is read from the DCS Redis data source and written to the Print result table. The procedure is as follows:
HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106 HMSET redisSource1 order_id 202103241606060001 order_channel appShop order_time "2021-03-24 16:06:06" pay_amount 200.00 real_pay 180.00 pay_time "2021-03-24 16:10:06" user_id 0001 user_name Alice area_id 330106 HMSET redisSource2 order_id 202103251202020001 order_channel miniAppShop order_time "2021-03-25 12:02:02" pay_amount 60.00 real_pay 60.00 pay_time "2021-03-25 12:03:00" user_id 0002 user_name Bob area_id 330110
CREATE TABLE redisSource ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, primary key (redisKey) not enforced --Obtains the key value from Redis. ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); CREATE TABLE printSink ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from redisSource;
The data result is as follows:
+I(redisSource1,202103241606060001,appShop,2021-03-24 16:06:06,200.0,180.0,2021-03-24 16:10:06,0001,Alice,330106) +I(redisSource,202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106) +I(redisSource2,202103251202020001,miniAppShop,2021-03-25 12:02:02,60.0,60.0,2021-03-25 12:03:00,0002,Bob,330110)
Caused by: org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: RealLine:36;Usage of 'set' data-type and 'fields' schema syntax in source Redis connector with multiple non-key column types. As 'set' in Redis is not sorted, it's not possible to map 'set's values to table schema with different types.
A: If data-type is set, the data types of non-primary key fields in Flink are different. As a result, this error is reported. When data-type is set, the types of non-primary keys defined in Flink must be the same.
A: When schema-syntax is set to fields, the hash value in the Redis key is assigned to the field with the same name in Flink. When schema-syntax is set to map, the hash key and hash value of each hash in Redis are put into a map, which represents the value of the corresponding Flink field. Specifically, this map contains all hash keys and hash values of a key in Redis.
HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106
CREATE TABLE redisSource ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica' ); CREATE TABLE printSink ( redisKey string, order_id string, order_channel string, order_time string, pay_amount double, real_pay double, pay_time string, user_id string, user_name string, area_id string ) WITH ( 'connector' = 'print' ); insert into printSink select * from redisSource;
+I(redisSource,202103241000000001,webShop,2021-03-24 10:00:00,100.0,100.0,2021-03-24 10:02:03,0001,Alice,330106)
HMSET redisSource order_id 202103241000000001 order_channel webShop order_time "2021-03-24 10:00:00" pay_amount 100.00 real_pay 100.00 pay_time "2021-03-24 10:02:03" user_id 0001 user_name Alice area_id 330106
CREATE TABLE redisSource ( redisKey string, order_result map<string, string>, primary key (redisKey) not enforced ) WITH ( 'connector' = 'redis', 'host' = 'RedisIP', 'password' = 'RedisPassword', 'data-type' = 'hash', 'deploy-mode' = 'master-replica', 'schema-syntax' = 'map' ); CREATE TABLE printSink ( redisKey string, order_result map<string, string> ) WITH ( 'connector' = 'print' ); insert into printSink select * from redisSource;
+I(redisSource,{user_id=0001, user_name=Alice, pay_amount=100.00, real_pay=100.00, order_time=2021-03-24 10:00:00, area_id=330106, order_id=202103241000000001, order_channel=webShop, pay_time=2021-03-24 10:02:03})