forked from docs/doc-exports
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com> Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com> Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
126 lines
9.9 KiB
HTML
126 lines
9.9 KiB
HTML
<a name="dli_08_15053"></a><a name="dli_08_15053"></a>
|
|
|
|
<h1 class="topictitle1">Using Temporal Join to Associate the Latest Version of a Dimension Table</h1>
|
|
<div id="body0000001762912141"><div class="section" id="dli_08_15053__section17209510185415"><h4 class="sectiontitle">Function</h4><p id="dli_08_15053__p3784144716406">For Hive tables, we can read them as bounded streams. In this case, the Hive table can only track its latest version when queried. The latest version of the table retains all the data of the Hive table.</p>
|
|
</div>
|
|
<div class="section" id="dli_08_15053__section2093155184317"><h4 class="sectiontitle">Caveats</h4><ul id="dli_08_15053__ul724024225219"><li id="dli_08_15053__li172401542165210">Each joining subtask needs to keep its own cache of the Hive table. Make sure the Hive table can fit into the memory of a TM task slot.</li><li id="dli_08_15053__li1524004218522">It is encouraged to set a relatively large value both for <strong id="dli_08_15053__b43121032155415">streaming-source.monitor-interval</strong> (latest partition as temporal table) or <strong id="dli_08_15053__b137101941205412">lookup.join.cache.ttl</strong> (all partitions as temporal table). Otherwise, jobs are prone to performance issues as the table needs to be updated and reloaded too frequently.</li><li id="dli_08_15053__li32401942185217">Currently we simply load the whole Hive table whenever the cache needs refreshing. There's no way to differentiate new data from the old.</li></ul>
|
|
</div>
|
|
<div class="section" id="dli_08_15053__section1226317311930"><h4 class="sectiontitle">Parameter Description</h4><p id="dli_08_15053__p1285111118506">When performing the temporal join the latest Hive table, the Hive table will be cached in Slot memory and each record from the stream is joined against the table by key to decide whether a match is found. Using the latest Hive table as a temporal table does not require any additional configuration. Optionally, you can configure the TTL of the Hive table cache with the following property. After the cache expires, the Hive table will be scanned again to load the latest data.</p>
|
|
|
|
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15053__table8922853195015" frame="border" border="1" rules="all"><thead align="left"><tr id="dli_08_15053__row393611536501"><th align="left" class="cellrowborder" valign="top" width="15.840000000000002%" id="mcps1.3.3.3.1.5.1.1"><p id="dli_08_15053__p14936105355019">Parameter</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="8.63%" id="mcps1.3.3.3.1.5.1.2"><p id="dli_08_15053__p39361534502">Default Value</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="9.01%" id="mcps1.3.3.3.1.5.1.3"><p id="dli_08_15053__p199361953105017">Data Type</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="66.52%" id="mcps1.3.3.3.1.5.1.4"><p id="dli_08_15053__p1936165385018">Description</p>
|
|
</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody><tr id="dli_08_15053__row2936105317501"><td class="cellrowborder" valign="top" width="15.840000000000002%" headers="mcps1.3.3.3.1.5.1.1 "><p id="dli_08_15053__p1793665316509">lookup.join.cache.ttl</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.63%" headers="mcps1.3.3.3.1.5.1.2 "><p id="dli_08_15053__p293715385016">60 min</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="9.01%" headers="mcps1.3.3.3.1.5.1.3 "><p id="dli_08_15053__p14937205345015">Duration</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="66.52%" headers="mcps1.3.3.3.1.5.1.4 "><p id="dli_08_15053__p116871421161817">The cache TTL (e.g. 10 min) for the build table in lookup join. By default the TTL is 60 minutes.</p>
|
|
<p id="dli_08_15053__p584721815111">The option only works when looking up bounded hive table source, if you are using streaming hive source as temporal table, use <strong id="dli_08_15053__b446812715171">streaming-source.monitor-interval</strong> to configure the interval of data update.</p>
|
|
</td>
|
|
</tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
</div>
|
|
<div class="section" id="dli_08_15053__section9498161303"><h4 class="sectiontitle">Example</h4><p id="dli_08_15053__p5801172117424">The example shows a classic business pipeline where the dimension table comes from Hive and is updated once a day through batch processing or Flink jobs. The Kafka stream comes from real-time online business data or logs and needs to be joined with the dimension table to expand the stream.</p>
|
|
<ol id="dli_08_15053__ol2249164792417"><li id="dli_08_15053__li142491747142411">Create a Hive OBS external table using Spark SQL and insert data.<pre class="screen" id="dli_08_15053__screen7111832173410">CREATE TABLE if not exists dimension_hive_table (
|
|
product_id STRING,
|
|
product_name STRING,
|
|
unit_price DECIMAL(10, 4),
|
|
pv_count BIGINT,
|
|
like_count BIGINT,
|
|
comment_count BIGINT,
|
|
update_time TIMESTAMP,
|
|
update_user STRING
|
|
)
|
|
STORED AS PARQUET
|
|
LOCATION 'obs://demo/spark.db/dimension_hive_table'
|
|
PARTITIONED BY (
|
|
create_time STRING
|
|
);</pre>
|
|
<div class="p" id="dli_08_15053__p43071726181517"><pre class="screen" id="dli_08_15053__screen8596135117201">INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_11', 'product_name_11', 1.2345, 100, 50, 20, '2023-11-25 02:10:58', 'update_user_1');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_12', 'product_name_12', 2.3456, 200, 100, 40, '2023-11-25 02:10:58', 'update_user_2');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_13', 'product_name_13', 3.4567, 300, 150, 60, '2023-11-25 02:10:58', 'update_user_3');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_14', 'product_name_14', 4.5678, 400, 200, 80, '2023-11-25 02:10:58', 'update_user_4');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_15', 'product_name_15', 5.6789, 500, 250, 100, '2023-11-25 02:10:58', 'update_user_5');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_16', 'product_name_16', 6.7890, 600, 300, 120, '2023-11-25 02:10:58', 'update_user_6');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_17', 'product_name_17', 7.8901, 700, 350, 140, '2023-11-25 02:10:58', 'update_user_7');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_18', 'product_name_18', 8.9012, 800, 400, 160, '2023-11-25 02:10:58', 'update_user_8');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_19', 'product_name_19', 9.0123, 900, 450, 180, '2023-11-25 02:10:58', 'update_user_9');
|
|
INSERT INTO dimension_hive_table PARTITION (create_time='create_time_1') VALUES ('product_id_10', 'product_name_10', 10.1234, 1000, 500, 200, '2023-11-25 02:10:58', 'update_user_10');</pre>
|
|
</div>
|
|
</li></ol><ol start="2" id="dli_08_15053__ol487455518246"><li id="dli_08_15053__li208742552247">Create a Flink OpenSource SQL job. Enter the following job script and submit the job. This job simulates reading data from Kafka, performs a join with a Hive dimension table to denormalize the data, and outputs it to Print.<div class="p" id="dli_08_15053__p1896991015915"><a name="dli_08_15053__li208742552247"></a><a name="li208742552247"></a>Change the values of the parameters in bold as needed in the following script.<pre class="screen" id="dli_08_15053__screen7459733164314">CREATE CATALOG myhive WITH (
|
|
'type' = 'hive' ,
|
|
'default-database' = 'demo',
|
|
'hive-conf-dir' = '/opt/flink/conf'
|
|
);
|
|
|
|
USE CATALOG myhive;
|
|
|
|
CREATE TABLE if not exists ordersSource (
|
|
product_id STRING,
|
|
user_name string,
|
|
proctime as Proctime()
|
|
) WITH (
|
|
'connector' = '<em id="dli_08_15053__i450911302499"><strong id="dli_08_15053__b6488130154911">kafka</strong></em>',
|
|
'topic' = '<strong id="dli_08_15053__b5683193316497"><em id="dli_08_15053__i144861733124914">TOPIC</em></strong>',
|
|
'properties.bootstrap.servers' = '<strong id="dli_08_15053__b18715438114914"><em id="dli_08_15053__i1248416389493">KafkaIP:PROT,KafkaIP:PROT,KafkaIP:PROT</em></strong>',
|
|
'properties.group.id' = '<strong id="dli_08_15053__b3398114110497"><em id="dli_08_15053__i1717919415491">GroupId</em></strong>',
|
|
'scan.startup.mode' = '<strong id="dli_08_15053__b17521184411497"><em id="dli_08_15053__i3286444194917">latest-offset</em></strong>',
|
|
'format' = '<strong id="dli_08_15053__b1347134716497"><em id="dli_08_15053__i148131146164917">json</em></strong>'
|
|
);
|
|
|
|
create table if not exists print (
|
|
product_id STRING,
|
|
user_name string,
|
|
product_name STRING,
|
|
unit_price DECIMAL(10, 4),
|
|
pv_count BIGINT,
|
|
like_count BIGINT,
|
|
comment_count BIGINT,
|
|
update_time TIMESTAMP,
|
|
update_user STRING,
|
|
create_time STRING
|
|
) with (
|
|
'connector' = 'print'
|
|
);
|
|
|
|
insert into print
|
|
select
|
|
orders.product_id,
|
|
orders.user_name,
|
|
dim.product_name,
|
|
dim.unit_price,
|
|
dim.pv_count,
|
|
dim.like_count,
|
|
dim.comment_count,
|
|
dim.update_time,
|
|
dim.update_user,
|
|
dim.create_time
|
|
from ordersSource orders
|
|
left join dimension_hive_table /*+ OPTIONS('lookup.join.cache.ttl'='60 m') */
|
|
for system_time as of orders.proctime as dim on orders.product_id = dim.product_id;</pre>
|
|
</div>
|
|
</li><li id="dli_08_15053__li857117179254">Connect to the Kafka cluster and insert the following test data into the source topic in Kafka:<pre class="screen" id="dli_08_15053__screen871542733219">{"product_id": "product_id_11", "user_name": "name11"}
|
|
{"product_id": "product_id_12", "user_name": "name12"}</pre>
|
|
</li><li id="dli_08_15053__li189961521122510">View the data in the Print result table.<pre class="screen" id="dli_08_15053__screen321591810294">+I[product_id_11, name11, product_name_11, 1.2345, 100, 50, 20, 2023-11-24T18:10:58, update_user_1, create_time_1]
|
|
+I[product_id_12, name12, product_name_12, 2.3456, 200, 100, 40, 2023-11-24T18:10:58, update_user_2, create_time_1]</pre>
|
|
</li></ol>
|
|
</div>
|
|
</div>
|
|
<div>
|
|
<div class="familylinks">
|
|
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_08_15046.html">Hive</a></div>
|
|
</div>
|
|
</div>
|
|
|