For partitioned tables that change over time, we can read them as unbounded streams. If each partition contains a complete set of data for a certain version, the partition can be considered as a version of the temporal table, which retains the data of the partition. Flink supports automatically tracking the latest partition (version) of the temporal table in processing-time joins.
The latest partition (version) is defined by the streaming-source.partition-order parameter.
This is the most common use case for using Hive tables as dimension tables in Flink streaming applications.
Using Temporal join to associate the latest partition of a dimension table is only supported in Flink STREAMING mode.
The following example shows a classic business pipeline where the dimension table comes from Hive and is updated once a day through batch processing or Flink jobs. The Kafka stream comes from real-time online business data or logs and needs to be joined with the dimension table to expand the stream.
CREATE TABLE if not exists dimension_hive_table (
product_id STRING,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP,
update_user STRING
)
STORED AS PARQUET
LOCATION 'obs://demo/spark.db/dimension_hive_table'
PARTITIONED BY (
create_time STRING
);
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
CREATE CATALOG myhive WITH (
'type' = 'hive' ,
'default-database' = 'demo',
'hive-conf-dir' = '/opt/flink/conf'
);
USE CATALOG myhive;
CREATE TABLE if not exists ordersSource (
product_id STRING,
user_name string,
proctime as Proctime()
) WITH (
'connector' = 'kafka',
'topic' = 'TOPIC',
'properties.bootstrap.servers' = 'KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT',
'properties.group.id' = 'GroupId',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
create table if not exists print (
product_id STRING,
user_name string,
product_name STRING,
unit_price DECIMAL(10, 4),
pv_count BIGINT,
like_count BIGINT,
comment_count BIGINT,
update_time TIMESTAMP,
update_user STRING,
create_time STRING
) with (
'connector' = 'print'
);
insert into print
select
orders.product_id,
orders.user_name,
dim.product_name,
dim.unit_price,
dim.pv_count,
dim.like_count,
dim.comment_count,
dim.update_time,
dim.update_user,
dim.create_time
from ordersSource orders
left join dimension_hive_table /*+ OPTIONS('streaming-source.enable'='true',
'streaming-source.partition.include' = 'latest', 'streaming-source.monitor-interval' = '10 m') */
for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;
{"product_id": "product_id_11", "user_name": "name11"}
{"product_id": "product_id_12", "user_name": "name12"}
+I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1] +I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_21', 'product_name_21', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_22', 'product_name_22', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_23', 'product_name_23', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_24', 'product_name_24', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_25', 'product_name_25', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_26', 'product_name_26', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_27', 'product_name_27', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_28', 'product_name_28', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_29', 'product_name_29', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9');
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_2') VALUES ('product_id_20', 'product_name_20', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');
{"product_id": "product_id_13", "user_name": "name13"}
+I[product_id_13, name13, null, null, null, null, null, null, null, null]
{"product_id": "product_id_21", "user_name": "name21"}
+I[product_id_21, name21, product_name_21, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_2]