forked from docs/doc-exports
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com> Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com> Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
415 lines
37 KiB
HTML
415 lines
37 KiB
HTML
<a name="dli_08_15020"></a><a name="dli_08_15020"></a>
|
|
|
|
<h1 class="topictitle1">Debezium</h1>
|
|
<div id="body0000001310215797"><div class="section" id="dli_08_15020__section9536315265"><h4 class="sectiontitle">Function</h4><p id="dli_08_15020__p4754154145319">Debezium is a Changelog Data Capture (CDC) tool that can stream changes in real-time from other databases into Kafka. Debezium provides a unified format schema for changelog and supports to serialize messages using JSON and Apache Avro.</p>
|
|
<p id="dli_08_15020__p12357153614535">Flink supports to interpret Debezium JSON and Avro messages as INSERT/UPDATE/DELETE messages into Flink SQL system. This is useful in many cases to leverage this feature, such as</p>
|
|
<ul id="dli_08_15020__ul203571936155312"><li id="dli_08_15020__li1335753610532">Synchronizing incremental data from databases to other systems</li><li id="dli_08_15020__li13357173620536">Auditing logs</li><li id="dli_08_15020__li1035753665315">Real-time materialized views on databases</li><li id="dli_08_15020__li63571836145314">Temporal join changing history of a database table</li></ul>
|
|
<p id="dli_08_15020__p1835743614531">Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Debezium JSON or Avro messages, and emit to external systems like Kafka. However, currently Flink cannot combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UDPATE_AFTER as DELETE and INSERT Debezium messages.</p>
|
|
<p id="dli_08_15020__p689892485816">For details, see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/debezium/" target="_blank" rel="noopener noreferrer">Debezium Format</a>.</p>
|
|
</div>
|
|
<div class="section" id="dli_08_15020__section193656483268"><h4 class="sectiontitle">Supported Connectors</h4><ul id="dli_08_15020__ul1118274610361"><li id="dli_08_15020__li13182154663618">Kafka</li><li id="dli_08_15020__li61361046205416">FileSystem</li></ul>
|
|
</div>
|
|
<div class="section" id="dli_08_15020__section194507465911"><h4 class="sectiontitle">Caveats</h4><ul id="dli_08_15020__ul11710191518590"><li id="dli_08_15020__li16710315185919">Duplicate change events<p id="dli_08_15020__p139711456236"><a name="dli_08_15020__li16710315185919"></a><a name="li16710315185919"></a>Under normal operating scenarios, the Debezium application delivers every change event exactly-once. Flink works pretty well when consuming Debezium produced events in this situation. However, Debezium application works in at-least-once delivery if any failover happens. That means, in the abnormal situations, Debezium may deliver duplicate change events to Kafka and Flink will get the duplicate events. This may cause Flink query to get wrong results or unexpected exceptions.</p>
|
|
<p id="dli_08_15020__p198978402414">Solution: Set <a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/#table-exec-source-cdc-events-duplicate" target="_blank" rel="noopener noreferrer">table.exec.source.cdc-events-duplicate</a> to <strong id="dli_08_15020__b34111736152019">true</strong> and define a primary key on this source.</p>
|
|
<p id="dli_08_15020__p68326159241">Framework will generate an additional stateful operator, and use the primary key to deduplicate the change events and produce a normalized changelog stream.</p>
|
|
<p id="dli_08_15020__p1721316372595">For more information, see <a href="https://debezium.io/documentation/faq/#what_happens_when_an_application_stops_or_crashes" target="_blank" rel="noopener noreferrer">Debezium documentation</a>.</p>
|
|
</li><li id="dli_08_15020__li690223115919">Consuming data produced by Debezium Postgres Connector<p id="dli_08_15020__p88299017266"><a name="dli_08_15020__li690223115919"></a><a name="li690223115919"></a>If you are using Debezium Connector for PostgreSQL to capture the changes to Kafka, please make sure the <a href="https://www.postgresql.org/docs/current/sql-altertable.html#SQL-CREATETABLE-REPLICA-IDENTITY" target="_blank" rel="noopener noreferrer">REPLICA IDENTITY</a> configuration of the monitored PostgreSQL table has been set to <strong id="dli_08_15020__b122911917121614">FULL</strong> which is by default <strong id="dli_08_15020__b10291181701611">DEFAULT</strong>. Otherwise, Flink SQL currently will fail to interpret the Debezium data.</p>
|
|
<p id="dli_08_15020__p210931452618">In FULL strategy, the UPDATE and DELETE events will contain the previous values of all the table's columns.</p>
|
|
<p id="dli_08_15020__p264817271264">In other strategies, the <strong id="dli_08_15020__b05451857172217">before</strong> field of UPDATE and DELETE events will only contain primary key columns or null if no primary key.</p>
|
|
<p id="dli_08_15020__p19549163012616">You can change the <strong id="dli_08_15020__b2708019235">REPLICA IDENTITY</strong> by running <strong id="dli_08_15020__b7718042317">ALTER TABLE <your-table-name> REPLICA IDENTITY FULL</strong>.</p>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="dli_08_15020__section139091342182616"><h4 class="sectiontitle">Parameter Description</h4><p id="dli_08_15020__p83001752131416">Flink provides <strong id="dli_08_15020__b1350103745415">debezium-avro-confluent</strong> and <strong id="dli_08_15020__b119520415541">debezium-json</strong> formats to interpret Avro or Json messages produced by Debezium. Use format <strong id="dli_08_15020__b035311497546">debezium-avro-confluent</strong> to interpret Debezium Avro messages and format <strong id="dli_08_15020__b129804547545">debezium-json</strong> to interpret Debezium Json messages.</p>
|
|
|
|
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15020__table1035742171510" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Debezium Avro parameters</caption><thead align="left"><tr id="dli_08_15020__row836164215156"><th align="left" class="cellrowborder" valign="top" width="20%" id="mcps1.3.4.3.2.6.1.1"><p id="dli_08_15020__p11366429154">Parameter</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="10.68%" id="mcps1.3.4.3.2.6.1.2"><p id="dli_08_15020__p3365427156">Mandatory</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="8.83%" id="mcps1.3.4.3.2.6.1.3"><p id="dli_08_15020__p63654215155">Default Value</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="8.81%" id="mcps1.3.4.3.2.6.1.4"><p id="dli_08_15020__p1536174212155">Data Type</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="51.68000000000001%" id="mcps1.3.4.3.2.6.1.5"><p id="dli_08_15020__p193674241515">Description</p>
|
|
</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody><tr id="dli_08_15020__row3363421157"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p15761135519166">format</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p6361842181519">Yes</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p43674214154">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p2036104231511">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p2360426155">Format to be used. Set this parameter to <strong id="dli_08_15020__b4991122181611">'debezium-avro-confluent'</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row103616421152"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p51001851171713">debezium-avro-confluent.basic-auth.credentials-source</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p636124214156">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p136142141520">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p1236642161514">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p19361442151517">Basic auth credentials source for Schema Registry</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row16366424156"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p4116134715237">debezium-avro-confluent.basic-auth.user-info</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p93617427154">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p1836134219154">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p93624211512">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p63634211157">Basic auth user info for schema registry</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row2036114281513"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p6389332132418">debezium-avro-confluent.bearer-auth.credentials-source</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p936134216150">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p936942171511">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p636194219155">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p19368420157">Bearer auth credentials source for Schema Registry</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row33613422159"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p4149154182610">debezium-avro-confluent.bearer-auth.token</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p10360422158">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p736194217158">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p636542181513">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p1936174210159">Bearer auth token for Schema Registry</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row1536194217158"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p1244413456278">debezium-avro-confluent.properties</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p1036134231510">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p036154217152">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p536742151512">Map</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p8365427158">Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row1736144212156"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p56085019297">debezium-avro-confluent.ssl.keystore.location</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p1736124214154">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p8361042191512">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p1236642131517">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p12361542131520">Location/File of SSL keystore</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row1036104217154"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p16507191343214">debezium-avro-confluent.ssl.keystore.password</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p63617428159">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p23624220154">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p113617429153">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p16361742101518">Password for SSL keystore</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row1536204220158"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p5366940113210">debezium-avro-confluent.ssl.truststore.location</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p143684291512">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p1736134241514">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p123694231515">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p236104210155">Location/File of SSL truststore</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row936742191511"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p1255731003316">debezium-avro-confluent.ssl.truststore.password</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p436542171513">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p1537442141519">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p12371642181513">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p437144217155">Password for SSL truststore</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row153712429152"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p10211165183413">debezium-avro-confluent.subject</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p143764241513">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p1480010142347">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p1837164211511">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p13704291515">The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '<topic_name>-value' or '<topic_name>-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row1937154241516"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.4.3.2.6.1.1 "><p id="dli_08_15020__p12504259406">debezium-avro-confluent.url</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.4.3.2.6.1.2 "><p id="dli_08_15020__p17371442191515">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.4.3.2.6.1.3 "><p id="dli_08_15020__p163714219158">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.4.3.2.6.1.4 "><p id="dli_08_15020__p337114211518">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.4.3.2.6.1.5 "><p id="dli_08_15020__p1137124271517">The URL of the Confluent Schema Registry to fetch/register schemas.</p>
|
|
</td>
|
|
</tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
|
|
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15020__table1035231193019" frame="border" border="1" rules="all"><caption><b>Table 2 </b>Debezium JSON parameters</caption><thead align="left"><tr id="dli_08_15020__row53583111302"><th align="left" class="cellrowborder" valign="top" width="19.36%" id="mcps1.3.4.4.2.6.1.1"><p id="dli_08_15020__p10359314305">Parameter</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="10.33%" id="mcps1.3.4.4.2.6.1.2"><p id="dli_08_15020__p23573111308">Mandatory</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="10.530000000000001%" id="mcps1.3.4.4.2.6.1.3"><p id="dli_08_15020__p193513119305">Default Value</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="10%" id="mcps1.3.4.4.2.6.1.4"><p id="dli_08_15020__p1535123143012">Mandatory</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="49.78%" id="mcps1.3.4.4.2.6.1.5"><p id="dli_08_15020__p153563115305">Description</p>
|
|
</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody><tr id="dli_08_15020__row637153110300"><td class="cellrowborder" valign="top" width="19.36%" headers="mcps1.3.4.4.2.6.1.1 "><p id="dli_08_15020__p47931610316">format</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.33%" headers="mcps1.3.4.4.2.6.1.2 "><p id="dli_08_15020__p47931653111">Yes</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.530000000000001%" headers="mcps1.3.4.4.2.6.1.3 "><p id="dli_08_15020__p1279131610311">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10%" headers="mcps1.3.4.4.2.6.1.4 "><p id="dli_08_15020__p167915168318">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="49.78%" headers="mcps1.3.4.4.2.6.1.5 "><p id="dli_08_15020__p7791916153116">Format to be used. In this example.Set this parameter to <strong id="dli_08_15020__b6442031114">debezium-json</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row193716316300"><td class="cellrowborder" valign="top" width="19.36%" headers="mcps1.3.4.4.2.6.1.1 "><p id="dli_08_15020__p197917163312">debezium-json.schema-include</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.33%" headers="mcps1.3.4.4.2.6.1.2 "><p id="dli_08_15020__p18019165318">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.530000000000001%" headers="mcps1.3.4.4.2.6.1.3 "><p id="dli_08_15020__p12801716163112">false</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10%" headers="mcps1.3.4.4.2.6.1.4 "><p id="dli_08_15020__p148018167310">Boolean</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="49.78%" headers="mcps1.3.4.4.2.6.1.5 "><p id="dli_08_15020__p118061610319">Whether the Debezium JSON messages contain the schema. When setting up Debezium Kafka Connect, enable the Kafka configuration <strong id="dli_08_15020__b11957134101814">value.converter.schemas.enable</strong> to include the schema in the message.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row737113193015"><td class="cellrowborder" valign="top" width="19.36%" headers="mcps1.3.4.4.2.6.1.1 "><p id="dli_08_15020__p1380816133113">debezium-json.ignore-parse-errors</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.33%" headers="mcps1.3.4.4.2.6.1.2 "><p id="dli_08_15020__p180171653110">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.530000000000001%" headers="mcps1.3.4.4.2.6.1.3 "><p id="dli_08_15020__p18807164314">false</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10%" headers="mcps1.3.4.4.2.6.1.4 "><p id="dli_08_15020__p380191623118">Boolean</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="49.78%" headers="mcps1.3.4.4.2.6.1.5 "><p id="dli_08_15020__p178016166318">Whether fields and rows with parse errors will be skipped or failed. The default value is <strong id="dli_08_15020__b92001959201817">false</strong>, indicating that an error will be thrown. Fields are set to null in case of errors.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row137193119302"><td class="cellrowborder" valign="top" width="19.36%" headers="mcps1.3.4.4.2.6.1.1 "><p id="dli_08_15020__p118021663115">debezium-json.timestamp-format.standard</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.33%" headers="mcps1.3.4.4.2.6.1.2 "><p id="dli_08_15020__p780416113118">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.530000000000001%" headers="mcps1.3.4.4.2.6.1.3 "><p id="dli_08_15020__p145917357318">'SQL'</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10%" headers="mcps1.3.4.4.2.6.1.4 "><p id="dli_08_15020__p188031693112">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="49.78%" headers="mcps1.3.4.4.2.6.1.5 "><p id="dli_08_15020__p4804161319">Input and output timestamp formats. Currently supported values are <strong id="dli_08_15020__b8898618154512">SQL</strong> and <strong id="dli_08_15020__b28152184516">ISO-8601</strong>.</p>
|
|
<ul id="dli_08_15020__ul180111610318"><li id="dli_08_15020__li1980316183115"><strong id="dli_08_15020__b13659216465">SQL</strong> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, for example <strong id="dli_08_15020__b458389174615">2020-12-30 12:13:14.123</strong> and output timestamp in the same format.</li><li id="dli_08_15020__li148031613310"><strong id="dli_08_15020__b13797191264610">ISO-8601</strong> will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, for example <strong id="dli_08_15020__b1441514433466">2020-12-30T12:13:14.123</strong> and output timestamp in the same format.</li></ul>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row7371331123018"><td class="cellrowborder" valign="top" width="19.36%" headers="mcps1.3.4.4.2.6.1.1 "><p id="dli_08_15020__p68021611319">debezium-json.map-null-key.mode</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.33%" headers="mcps1.3.4.4.2.6.1.2 "><p id="dli_08_15020__p188012162312">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.530000000000001%" headers="mcps1.3.4.4.2.6.1.3 "><p id="dli_08_15020__p854182293112">'FAIL'</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10%" headers="mcps1.3.4.4.2.6.1.4 "><p id="dli_08_15020__p180316193117">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="49.78%" headers="mcps1.3.4.4.2.6.1.5 "><p id="dli_08_15020__p1380816153118">Handling mode when serializing null keys for map data. Available values are as follows:</p>
|
|
<ul id="dli_08_15020__ul18801161314"><li id="dli_08_15020__li118051612311"><strong id="dli_08_15020__b19997124812463">FAIL</strong> will throw exception when encountering map value with null key.</li><li id="dli_08_15020__li68013168312"><strong id="dli_08_15020__b5444550184613">DROP</strong> will drop null key entries for map data.</li><li id="dli_08_15020__li14800166315"><strong id="dli_08_15020__b12100529461">LITERAL</strong> replaces the empty key value in the map with a string constant. The string literal is defined by <strong id="dli_08_15020__b6554165717465">debezium-json.map-null-key.literal</strong> option.</li></ul>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row19372031113018"><td class="cellrowborder" valign="top" width="19.36%" headers="mcps1.3.4.4.2.6.1.1 "><p id="dli_08_15020__p280616133115">debezium-json.map-null-key.literal</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.33%" headers="mcps1.3.4.4.2.6.1.2 "><p id="dli_08_15020__p08091619310">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.530000000000001%" headers="mcps1.3.4.4.2.6.1.3 "><p id="dli_08_15020__p15801416123111">'null'</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10%" headers="mcps1.3.4.4.2.6.1.4 "><p id="dli_08_15020__p1180316153115">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="49.78%" headers="mcps1.3.4.4.2.6.1.5 "><p id="dli_08_15020__p128010161317">String literal to replace null key when <strong id="dli_08_15020__b17367819112119">debezium-json.map-null-key.mode</strong> is <strong id="dli_08_15020__b537418198219">LITERAL</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row6612165354412"><td class="cellrowborder" valign="top" width="19.36%" headers="mcps1.3.4.4.2.6.1.1 "><p id="dli_08_15020__p4316173045316">debezium-json.encode.decimal-as-plain-number</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.33%" headers="mcps1.3.4.4.2.6.1.2 "><p id="dli_08_15020__p2612115394414">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.530000000000001%" headers="mcps1.3.4.4.2.6.1.3 "><p id="dli_08_15020__p1061235319447">false</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10%" headers="mcps1.3.4.4.2.6.1.4 "><p id="dli_08_15020__p2612165313441">Boolean</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="49.78%" headers="mcps1.3.4.4.2.6.1.5 "><p id="dli_08_15020__p361255394415">Encode all decimals as plain numbers instead of possible scientific notations. For example, <strong id="dli_08_15020__b52398427245619">0.000000027</strong> is encoded as <strong id="dli_08_15020__b35298424245619">2.7E-8</strong> by default, and will be written as <strong id="dli_08_15020__b1973743118438">0.000000027</strong> if set this parameter to <strong id="dli_08_15020__b1074183119434">true</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
</div>
|
|
<div class="section" id="dli_08_15020__section7531131010109"><h4 class="sectiontitle">Metadata</h4><p id="dli_08_15020__p102821933121112">The following format metadata can be exposed as read-only (VIRTUAL) columns in DDL.</p>
|
|
<p id="dli_08_15020__p1579431711017">Format metadata fields are only available if the corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose metadata fields for its value format.</p>
|
|
|
|
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15020__table1268012467122" frame="border" border="1" rules="all"><caption><b>Table 3 </b>Metadata</caption><thead align="left"><tr id="dli_08_15020__row06808469125"><th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.5.4.2.4.1.1"><p id="dli_08_15020__p4680746191214">Key</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.5.4.2.4.1.2"><p id="dli_08_15020__p166809469126">Data Type</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.5.4.2.4.1.3"><p id="dli_08_15020__p17680174631210">Description</p>
|
|
</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody><tr id="dli_08_15020__row17680194671214"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.1 "><p id="dli_08_15020__p46801146181216">schema</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.2 "><p id="dli_08_15020__p1768004616124">STRING NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.3 "><p id="dli_08_15020__p56805467126">JSON string describing the schema of the payload. <strong id="dli_08_15020__b159161041191614">Null</strong> if the schema is not included in the Debezium record.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row4680204613124"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.1 "><p id="dli_08_15020__p12681104661216">ingestion-timestamp</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.2 "><p id="dli_08_15020__p96811646171214">TIMESTAMP_LTZ(3) NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.3 "><p id="dli_08_15020__p176811146151216">The timestamp at which the connector processed the event. Corresponds to the <strong id="dli_08_15020__b3748184591618">ts_ms</strong> field in the Debezium record.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row1368115467123"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.1 "><p id="dli_08_15020__p18681246111220">source.timestamp</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.2 "><p id="dli_08_15020__p2681134620123">TIMESTAMP_LTZ(3) NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.3 "><p id="dli_08_15020__p14681124671219">The timestamp at which the source system created the event. Corresponds to the <strong id="dli_08_15020__b11844154213578">source.ts_ms</strong> field in the Debezium record.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row66819460123"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.1 "><p id="dli_08_15020__p4681114611128">source.database</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.2 "><p id="dli_08_15020__p1868144613125">STRING NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.3 "><p id="dli_08_15020__p13681154671219">The originating database. Corresponds to the <strong id="dli_08_15020__b77566895554828">source.db</strong> field in the Debezium record if available.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row6681184612124"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.1 "><p id="dli_08_15020__p368118468121">source.schema</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.2 "><p id="dli_08_15020__p668114631210">STRING NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.3 "><p id="dli_08_15020__p1668117466121">The originating database schema. Corresponds to the <strong id="dli_08_15020__b138846338554828">source.schema</strong> field in the Debezium record if available.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row768144611124"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.1 "><p id="dli_08_15020__p16818462124">source.table</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.2 "><p id="dli_08_15020__p14681124618122">STRING NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.3 "><p id="dli_08_15020__p116811446181214">The originating database table. Corresponds to the <strong id="dli_08_15020__b113675007154828">source.table</strong> or <strong id="dli_08_15020__b627472616179">source.collection</strong> field in the Debezium record if available.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15020__row1168184611215"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.1 "><p id="dli_08_15020__p156811946171211">source.properties</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.2 "><p id="dli_08_15020__p1668134616125">MAP<STRING, STRING> NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.5.4.2.4.1.3 "><p id="dli_08_15020__p36816463123">Map of various source properties. Corresponds to the <strong id="dli_08_15020__b72192311815">source</strong> field in the Debezium record.</p>
|
|
</td>
|
|
</tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
<p id="dli_08_15020__p2080664024817">The following example shows how to access Canal metadata fields in Kafka:</p>
|
|
<pre class="screen" id="dli_08_15020__screen892105004810">CREATE TABLE KafkaTable (
|
|
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
|
|
event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
|
|
origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,
|
|
origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,
|
|
origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,
|
|
origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,
|
|
user_id BIGINT,
|
|
item_id BIGINT,
|
|
behavior STRING
|
|
) WITH (
|
|
'connector' = 'kafka',
|
|
'topic' = '<em id="dli_08_15020__i771910114912"><strong id="dli_08_15020__b86019084919">kafkaTopic</strong></em>',
|
|
'properties.bootstrap.servers' = '<em id="dli_08_15020__i366724417446"><strong id="dli_08_15020__b1466716449449">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
|
|
'properties.group.id' = '<em id="dli_08_15020__i59661819493"><strong id="dli_08_15020__b68961773496">GroupId</strong></em>',
|
|
'scan.startup.mode' = 'earliest-offset',
|
|
'value.format' = 'debezium-json'
|
|
);</pre>
|
|
</div>
|
|
<div class="section" id="dli_08_15020__section241815222713"><h4 class="sectiontitle">Example</h4><p id="dli_08_15020__p15881132116016">Use Kafka to parse Debezium JSON data and output the result to Print.</p>
|
|
<ol id="dli_08_15020__ol192013231710"><li id="dli_08_15020__li04031578234"><span>Create a datasource connection for the communication with the VPC and subnet where Kafka locates and bind the connection to the queue. Set a security group and inbound rule to allow access of the queue and test the connectivity of the queue using the Kafka IP address. For example, locate a general-purpose queue where the job runs and choose <strong id="dli_08_15020__b11913342473">More</strong> > <strong id="dli_08_15020__b8191534194716">Test Address Connectivity</strong> in the <strong id="dli_08_15020__b131921434194717">Operation</strong> column. If the connection is successful, the datasource is bound to the queue. Otherwise, the binding fails.</span></li><li id="dli_08_15020__li18401164741714"><span>Create a Flink OpenSource SQL job and select Flink 1.15. Copy the following statement and submit the job:</span><p><pre class="screen" id="dli_08_15020__screen6126183681810">CREATE TABLE kafkaSource (
|
|
id bigint,
|
|
name string,
|
|
description string,
|
|
weight DECIMAL(10, 2)
|
|
) WITH (
|
|
'connector' = 'kafka',
|
|
'topic' = '<em id="dli_08_15020__i148393412104"><strong id="dli_08_15020__b297873361016">kafkaTopic</strong></em>',
|
|
'properties.bootstrap.servers' = '<em id="dli_08_15020__i387653131014"><strong id="dli_08_15020__b19871053111017">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
|
|
'properties.group.id' = '<em id="dli_08_15020__i14755185631012"><strong id="dli_08_15020__b1828855651019">GroupId</strong></em>',
|
|
'scan.startup.mode' = 'latest-offset',
|
|
'format' = 'debezium-json'
|
|
);
|
|
CREATE TABLE printSink (
|
|
id bigint,
|
|
name string,
|
|
description string,
|
|
weight DECIMAL(10, 2)
|
|
) WITH (
|
|
'connector' = 'print'
|
|
);
|
|
insert into printSink select * from kafkaSource;</pre>
|
|
</p></li><li id="dli_08_15020__li1285084111812"><span>Insert the data below into the appropriate Kafka topics. The data shows that the MySQL products table has four columns: <strong id="dli_08_15020__b1152241171112">id</strong>, <strong id="dli_08_15020__b715311416112">name</strong>, <strong id="dli_08_15020__b1715324131116">description</strong>, and <strong id="dli_08_15020__b0153164114119">weight</strong>. This JSON message represents an update event on the products table, where the <strong id="dli_08_15020__b1376110569133">weight</strong> value of the row with id = 111 has been changed from 5.18 to 5.15.</span><p><pre class="screen" id="dli_08_15020__screen1525315566188">{
|
|
"before": {
|
|
"id": 111,
|
|
"name": "scooter",
|
|
"description": "Big 2-wheel scooter",
|
|
"weight": 5.18
|
|
},
|
|
"after": {
|
|
"id": 111,
|
|
"name": "scooter",
|
|
"description": "Big 2-wheel scooter",
|
|
"weight": 5.15
|
|
},
|
|
"source": {
|
|
"version": "0.9.5.Final",
|
|
"connector": "mysql",
|
|
"name": "fullfillment",
|
|
"server_id" :1,
|
|
"ts_sec": 1629607909,
|
|
"gtid": "mysql-bin.000001",
|
|
"pos": 2238,"row": 0,
|
|
"snapshot": false,
|
|
"thread": 7,
|
|
"db": "inventory",
|
|
"table": "test",
|
|
"query": null},
|
|
"op": "u",
|
|
"ts_ms": 1589362330904,
|
|
"transaction": null
|
|
}</pre>
|
|
</p></li><li id="dli_08_15020__li4353143193117"><span>Perform the following operations to view the data result in the <strong id="dli_08_15020__b17254150039270">taskmanager.out</strong> file:</span><p><ol type="a" id="dli_08_15020__ol864115198285"><li id="dli_08_15020__li10901621122819">Log in to the DLI console. In the navigation pane, choose <strong id="dli_08_15020__b26019737391922">Job Management</strong> > <strong id="dli_08_15020__b212842336691922">Flink Jobs</strong>.</li><li id="dli_08_15020__li1912163912282">Click the name of the corresponding Flink job, choose <strong id="dli_08_15020__b778348891932">Run Log</strong>, click <strong id="dli_08_15020__b80975252291932">OBS Bucket</strong>, and locate the folder of the log you want to view according to the date.</li><li id="dli_08_15020__li0641191914285">Go to the folder of the date, find the folder whose name contains <strong id="dli_08_15020__b26660391542754">taskmanager</strong>, download the <strong id="dli_08_15020__b108263016442754">.out</strong> file, and view result logs.</li></ol>
|
|
<pre class="screen" id="dli_08_15020__screen184391521191911">-U[111, scooter, Big 2-wheel scooter, 5.18]
|
|
+U[111, scooter, Big 2-wheel scooter, 5.15]</pre>
|
|
</p></li></ol>
|
|
</div>
|
|
</div>
|
|
<div>
|
|
<div class="familylinks">
|
|
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_08_15014.html">Formats</a></div>
|
|
</div>
|
|
</div>
|
|
|