Redis Dimension Table

Function

Create a Redis table to connect to source streams for wide table generation.

Prerequisites

Precautions

Syntax

create table dwsSource (
  attr_name attr_type 
  (',' attr_name attr_type)* 
  (',' watermark for rowtime_column_name as watermark-strategy_expression)
  ,PRIMARY KEY (attr_name, ...) NOT ENFORCED
)
with (
  'connector' = 'redis',
  'host' = ''
);

Parameters

Table 1 Parameter description

Parameter

Mandatory

Default Value

Data Types

Description

connector

Yes

None

String

Connector type. Set this parameter to redis.

host

Yes

None

String

Redis connector address

port

No

6379

Integer

Redis connector port

password

No

None

String

Redis authentication password

namespace

No

None

String

Redis key namespace

delimiter

No

:

String

Delimiter between the Redis key and namespace

data-type

No

hash

String

Redis data type. Available values are as follows:

  • hash
  • list
  • set
  • sorted-set
  • string

For details about the constraints, see Constraints on data-type.

schema-syntax

No

fields

String

Redis schema semantics. Available values are as follows:

  • fields: applicable to all data types
  • fields-scores: applicable to sorted set data
  • array: applicable to list, set, and sorted set data
  • array-scores: applicable to sorted set data
  • map: applicable to hash and sorted set data

For details about the constraints, see Constraints on schema-syntax.

deploy-mode

No

standalone

String

Deployment mode of the Redis cluster. The value can be standalone, master-replica, or cluster. The default value is standalone.

retry-count

Yes

5

Integer

Size of each connection request queue. If the number of connection requests in a queue exceeds the queue size, command calling will cause RedisException. Setting requestQueueSize to a small value will cause exceptions to occur earlier during overload or disconnection. A larger value indicates more time required to reach the boundary, but more requests may be queued and more heap space may be used. The default value is 2147483647.

connection-timeout-millis

No

10000

Integer

Maximum timeout for connecting to the Redis cluster

commands-timeout-millis

No

2000

Integer

Maximum time for waiting for a completion response

rebalancing-timeout-millis

No

15000

Integer

Sleep time when the Redis cluster fails

scan-keys-count

No

1000

Integer

Number of data records read in each scan

default-score

No

0

Double

Default score when data-type is sorted-set

deserialize-error-policy

No

fail-job

Enum

How to process a data parsing failure

Available values are as follows:

  • fail-job: Fail the job
  • skip-row: Skip the current data.
  • null-field: Set the current data to null.

skip-null-values

No

true

Boolean

Whether null values will be skipped

lookup.async

No

false

Boolean

Whether asynchronous I/O will be used when this table is used as a dimension table

Example

Read data from a Kafka source table, use a Redis table as the dimension table. Write wide table information generated by the source and dimension tables to a Kafka result table. The procedure is as follows:

  1. Create an enhanced datasource connection in the VPC and subnet where Redis and Kafka locates, and bind the connection to the required Flink elastic resource pool.
  2. Set Redis and Kafka security groups and add inbound rules to allow access from the Flink queue. Test the connectivity using the Redis address. If the connection passes the test, it is bound to the queue.
  3. Run the following commands on the Redis client to send data to Redis:
    HMSET 330102  area_province_name a1 area_province_name b1 area_county_name c1 area_street_name d1 region_name e1
    
    HMSET 330106  area_province_name a1 area_province_name b1 area_county_name c2 area_street_name d2 region_name e1
    
    HMSET 330108  area_province_name a1 area_province_name b1 area_county_name c3 area_street_name d3 region_name e1
    
    HMSET 330110  area_province_name a1 area_province_name b1 area_county_name c4 area_street_name d4 region_name e1
  4. Create a Flink OpenSource SQL job Enter the following job script and submit the job. The job script uses Kafka as the data source and a Redis table as the dimension table. Data is output to a Kafka result table.
    When you create a job, set Flink Version to 1.12 in the Running Parameters tab. Select Save Job Log, and specify the OBS bucket for saving job logs. Set the values of the parameters in bold in the following script as needed.
    CREATE TABLE orders (
      order_id string,
      order_channel string,
      order_time string,
      pay_amount double,
      real_pay double,
      pay_time string,
      user_id string,
      user_name string,
      area_id string,
      proctime as Proctime()
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'kafkaSourceTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'properties.group.id' = 'GroupId',
      'scan.startup.mode' = 'latest-offset',
      'format' = 'json'
    );
    
    -- Create an address dimension table
    create table area_info (
        area_id string, 
        area_province_name string,
        area_city_name string,
        area_county_name string, 
        area_street_name string, 
        region_name string, 
        primary key (area_id) not enforced -- Redis key
    ) WITH (
      'connector' = 'redis',
      'host' = 'RedisIP',
      'password' = 'RedisPassword',
      'data-type' = 'hash',
      'deploy-mode' = 'master-replica'
    );
    
    -- Generate a wide table based on the address dimension table containing detailed order information.
    create table order_detail(
        order_id string,
        order_channel string,
        order_time string,
        pay_amount double,
        real_pay double,
        pay_time string,
        user_id string,
        user_name string,
        area_id string,
        area_province_name string,
        area_city_name string,
        area_county_name string,
        area_street_name string,
        region_name string
    ) with (
      'connector' = 'kafka',
      'topic' = 'kafkaSinkTopic',
      'properties.bootstrap.servers' = 'KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort',
      'format' = 'json'
    );
    
    insert into order_detail
        select orders.order_id, orders.order_channel, orders.order_time, orders.pay_amount, orders.real_pay, orders.pay_time, orders.user_id, orders.user_name,
               area.area_id, area.area_province_name, area.area_city_name, area.area_county_name,
               area.area_street_name, area.region_name  from orders
        left join area_info for system_time as of orders.proctime as area on orders.area_id = area.area_id;
  5. Connect to the Kafka cluster and insert the following test data into the source topic in Kafka:
    {"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
    
    {"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}
    
    {"order_id":"202103251505050001", "order_channel":"qqShop", "order_time":"2021-03-25 15:05:05", "pay_amount":"500.00", "real_pay":"400.00", "pay_time":"2021-03-25 15:10:00", "user_id":"0003", "user_name":"Cindy", "area_id":"330108"}
  6. Connect to the Kafka cluster and read data from the sink topic of Kafka. The result data is as follows:
    {"order_id":"202103241606060001","order_channel":"appShop","order_time":"2021-03-24 16:06:06","pay_amount":200.0,"real_pay":180.0,"pay_time":"2021-03-24 16:10:06","user_id":"0001","user_name":"Alice","area_id":"330106","area_province_name":"a1","area_city_name":"b1","area_county_name":"c2","area_street_name":"d2","region_name":"e1"}
    
    {"order_id":"202103251202020001","order_channel":"miniAppShop","order_time":"2021-03-25 12:02:02","pay_amount":60.0,"real_pay":60.0,"pay_time":"2021-03-25 12:03:00","user_id":"0002","user_name":"Bob","area_id":"330110","area_province_name":"a1","area_city_name":"b1","area_county_name":"c4","area_street_name":"d4","region_name":"e1"}
    
    {"order_id":"202103251505050001","order_channel":"qqShop","order_time":"2021-03-25 15:05:05","pay_amount":500.0,"real_pay":400.0,"pay_time":"2021-03-25 15:10:00","user_id":"0003","user_name":"Cindy","area_id":"330108","area_province_name":"a1","area_city_name":"b1","area_county_name":"c3","area_street_name":"d3","region_name":"e1"}