Files
doc-exports/docs/dli/sqlreference/dli_08_15058.html
Su, Xiaomeng be9eabe464 dli_sqlreference_20250305
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com>
Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
2025-03-25 09:06:21 +00:00

900 lines
87 KiB
HTML

<a name="dli_08_15058"></a><a name="dli_08_15058"></a>
<h1 class="topictitle1">Kafka</h1>
<div id="body0000001772046997"><div class="section" id="dli_08_15058__dli_08_0237_en-us_topic_0111555123_section17358104193813"><h4 class="sectiontitle">Function</h4><p id="dli_08_15058__dli_08_0239_en-us_topic_0111499973_p18963184212118">The Kafka connector allows for reading data from and writing data into Kafka topics.</p>
<p id="dli_08_15058__dli_08_0239_en-us_topic_0111499973_p433435316341">Apache Kafka is a fast, scalable, and fault-tolerant distributed message publishing and subscription system. It delivers high throughput and built-in partitions and provides data replicas and fault tolerance. Apache Kafka is applicable to scenarios of handling massive messages.</p>
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15058__table3954102713514" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Supported types</caption><thead align="left"><tr id="dli_08_15058__row139551727153515"><th align="left" class="cellrowborder" valign="top" width="50%" id="mcps1.3.1.4.2.3.1.1"><p id="dli_08_15058__p169550272355">Type</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="50%" id="mcps1.3.1.4.2.3.1.2"><p id="dli_08_15058__p9955172713520">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_08_15058__row595518271358"><td class="cellrowborder" valign="top" width="50%" headers="mcps1.3.1.4.2.3.1.1 "><p id="dli_08_15058__p4955182716353">Supported Table Types</p>
</td>
<td class="cellrowborder" valign="top" width="50%" headers="mcps1.3.1.4.2.3.1.2 "><p id="dli_08_15058__p1595518273356">Source table and result table</p>
</td>
</tr>
<tr id="dli_08_15058__row395582710356"><td class="cellrowborder" valign="top" width="50%" headers="mcps1.3.1.4.2.3.1.1 "><p id="dli_08_15058__p1795572733511">Supported Data Formats</p>
</td>
<td class="cellrowborder" valign="top" width="50%" headers="mcps1.3.1.4.2.3.1.2 "><p id="dli_08_15058__p12955172783510"><a href="dli_08_15019.html">CSV</a></p>
<p id="dli_08_15058__p293523613713"><a href="dli_08_15021.html">JSON</a></p>
<p id="dli_08_15058__p1880505293717"><a href="dli_08_15016.html">Apache Avro</a></p>
<p id="dli_08_15058__p1383181623819"><a href="dli_08_15018.html">Confluent Avro</a></p>
<p id="dli_08_15058__p10413194133817"><a href="dli_08_15020.html">Debezium CDC</a></p>
<p id="dli_08_15058__p3453113733919"><a href="dli_08_15017.html">Canal CDC</a></p>
<p id="dli_08_15058__p122607535391"><a href="dli_08_15022.html">Maxwell CDC</a></p>
<p id="dli_08_15058__p343361124019"><a href="dli_08_15023.html">OGG CDC</a></p>
<p id="dli_08_15058__p293342784017"><a href="dli_08_15026.html">Raw</a></p>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="section" id="dli_08_15058__dli_08_0237_section1144171011544"><h4 class="sectiontitle">Prerequisites</h4><ul id="dli_08_15058__ul578517503176"><li id="dli_08_15058__li2785750141711">You have created a Kafka cluster.</li><li id="dli_08_15058__li1627119813515">An enhanced datasource connection has been created for DLI to connect to Kafka clusters, so that jobs can run on the dedicated queue of DLI and you can set the security group rules as required.
</li></ul>
</div>
<div class="section" id="dli_08_15058__section495615388216"><h4 class="sectiontitle">Caveats</h4><ul id="dli_08_15058__ul17647145194019"><li id="dli_08_15058__li421613293215">For details, see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/kafka/" target="_blank" rel="noopener noreferrer">Apache Kafka SQL Connector</a>.</li><li id="dli_08_15058__li13608118132418">When you create a Flink OpenSource SQL job, set <strong id="dli_08_15058__dli_08_15029_b163001353185217">Flink Version</strong> to <strong id="dli_08_15058__dli_08_15029_b1430115539523">1.15</strong> in the <strong id="dli_08_15058__dli_08_15029_b1030175315523">Running Parameters</strong> tab. Select <strong id="dli_08_15058__dli_08_15029_b430135325212">Save Job Log</strong>, and specify the OBS bucket for saving job logs.</li><li id="dli_08_15058__li1089410417226">Storing authentication credentials such as usernames and passwords in code or plaintext poses significant security risks. It is recommended using DEW to manage credentials instead. Storing encrypted credentials in configuration files or environment variables and decrypting them when needed ensures security. For details, see .</li><li id="dli_08_15058__li15792148183316">Fields in the <strong id="dli_08_15058__b167384553312">with</strong> parameter can only be enclosed in single quotes.</li><li id="dli_08_15058__li10841731415">For details about how to use data types when creating tables, see <a href="dli_08_15014.html">Format</a>.</li><li id="dli_08_15058__li980192610493">Storing authentication credentials such as usernames and passwords in code or plaintext poses significant security risks. It is recommended using DEW to manage credentials instead. Storing encrypted credentials in configuration files or environment variables and decrypting them when needed ensures security. For details, see .</li></ul>
</div>
<div class="section" id="dli_08_15058__dli_08_0237_en-us_topic_0111555123_section1832173783817"><h4 class="sectiontitle">Syntax</h4><div class="codecoloring" codetype="Sql" id="dli_08_15058__dli_08_0237_screen16634186122414"><div class="highlight"><table class="highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre><span class="normal"> 1</span>
<span class="normal"> 2</span>
<span class="normal"> 3</span>
<span class="normal"> 4</span>
<span class="normal"> 5</span>
<span class="normal"> 6</span>
<span class="normal"> 7</span>
<span class="normal"> 8</span>
<span class="normal"> 9</span>
<span class="normal">10</span>
<span class="normal">11</span>
<span class="normal">12</span>
<span class="normal">13</span>
<span class="normal">14</span></pre></div></td><td class="code"><div><pre><span></span><span class="k">create</span><span class="w"> </span><span class="k">table</span><span class="w"> </span><span class="n">kafkaSource</span><span class="p">(</span>
<span class="w"> </span><span class="n">attr_name</span><span class="w"> </span><span class="n">attr_type</span><span class="w"> </span>
<span class="w"> </span><span class="p">(</span><span class="s1">','</span><span class="w"> </span><span class="n">attr_name</span><span class="w"> </span><span class="n">attr_type</span><span class="p">)</span><span class="o">*</span><span class="w"> </span>
<span class="w"> </span><span class="p">(</span><span class="s1">','</span><span class="k">PRIMARY</span><span class="w"> </span><span class="k">KEY</span><span class="w"> </span><span class="p">(</span><span class="n">attr_name</span><span class="p">,</span><span class="w"> </span><span class="p">...)</span><span class="w"> </span><span class="k">NOT</span><span class="w"> </span><span class="n">ENFORCED</span><span class="p">)</span>
<span class="w"> </span><span class="p">(</span><span class="s1">','</span><span class="w"> </span><span class="n">WATERMARK</span><span class="w"> </span><span class="k">FOR</span><span class="w"> </span><span class="n">rowtime_column_name</span><span class="w"> </span><span class="k">AS</span><span class="w"> </span><span class="n">watermark</span><span class="o">-</span><span class="n">strategy_expression</span><span class="p">)</span>
<span class="p">)</span>
<span class="k">with</span><span class="w"> </span><span class="p">(</span>
<span class="w"> </span><span class="s1">'connector'</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">'kafka'</span><span class="p">,</span>
<span class="w"> </span><span class="s1">'topic'</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">''</span><span class="p">,</span>
<span class="w"> </span><span class="s1">'properties.bootstrap.servers'</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">''</span><span class="p">,</span>
<span class="w"> </span><span class="s1">'properties.group.id'</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">''</span><span class="p">,</span>
<span class="w"> </span><span class="s1">'scan.startup.mode'</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">''</span><span class="p">,</span>
<span class="w"> </span><span class="s1">'format'</span><span class="w"> </span><span class="o">=</span><span class="w"> </span><span class="s1">''</span>
<span class="p">);</span>
</pre></div></td></tr></table></div>
</div>
</div>
<div class="section" id="dli_08_15058__section13754915155510"><h4 class="sectiontitle">Source Table Parameter Description</h4>
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15058__table185541831102213" frame="border" border="1" rules="all"><caption><b>Table 2 </b>Source table parameters</caption><thead align="left"><tr id="dli_08_15058__row17554113122215"><th align="left" class="cellrowborder" valign="top" width="17.98%" id="mcps1.3.5.2.2.6.1.1"><p id="dli_08_15058__p19554231162219">Parameter</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="9.74%" id="mcps1.3.5.2.2.6.1.2"><p id="dli_08_15058__p85541931152210">Mandatory</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="9.13%" id="mcps1.3.5.2.2.6.1.3"><p id="dli_08_15058__p1355420315224">Default Value</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="11.57%" id="mcps1.3.5.2.2.6.1.4"><p id="dli_08_15058__p855443142218">Data Type</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="51.580000000000005%" id="mcps1.3.5.2.2.6.1.5"><p id="dli_08_15058__p1755423182218">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_08_15058__row12554203162212"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p455415311227">connector</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p105541431182216">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p1751255011569">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p1554143182210">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p13554103162218">Specify what connector to use, for Kafka use <strong id="dli_08_15058__b2236161513291">kafka</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row7554123117224"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p755433111222">topic</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p12554133122219">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p1355420312222">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p1055483111222">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p6711125112372">Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <strong id="dli_08_15058__b2192172953015">topic-1;topic-2</strong>.</p>
<p id="dli_08_15058__p10259253173720">Note, only one of <strong id="dli_08_15058__b1295124463017">topic-pattern</strong> and <strong id="dli_08_15058__b7323151103017">topic</strong> can be specified for sources.</p>
<p id="dli_08_15058__p6820184763716">When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</p>
</td>
</tr>
<tr id="dli_08_15058__row12554103122215"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p17554731112214">topic-pattern</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p355483132220">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p012095655615">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p455493116222">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p221979143810">The regular expression for a pattern of topic names to read from.</p>
<p id="dli_08_15058__p1892261133811">All topics with names that match the specified regular expression will be subscribed by the consumer when the job starts running.</p>
<p id="dli_08_15058__p324317673811">Note, only one of <strong id="dli_08_15058__b208431716321">topic-pattern</strong> and <strong id="dli_08_15058__b3339649326">topic</strong> can be specified for sources.</p>
<p id="dli_08_15058__p461591212353">For more information, see <a href="#dli_08_15058__section12233124102">Topic and Partition Discovery</a>.</p>
</td>
</tr>
<tr id="dli_08_15058__row1855423122217"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p6554113162211">properties.bootstrap.servers</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p1554163122214">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p10582657155610">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p555473182217">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p3554183182218">Comma separated list of Kafka brokers.</p>
</td>
</tr>
<tr id="dli_08_15058__row8554113119225"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p955417319221">properties.group.id</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p3554163112223">optional for source, not applicable for sink</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p10125155985617">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p755493192210">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p1455412319223">The ID of the consumer group for Kafka source. If group ID is not specified, an automatically generated ID <strong id="dli_08_15058__b16141232143414">KafkaSource-</strong><em id="dli_08_15058__i15199173513418">{tableIdentifier}</em> will be used.</p>
</td>
</tr>
<tr id="dli_08_15058__row18554113114228"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p1755413314229">properties.*</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p115541531182212">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p35547318225">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p10554113119225">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p97929517327">This can set and pass arbitrary Kafka configurations.</p>
<ul id="dli_08_15058__ul0491787522"><li id="dli_08_15058__li199931630195813">Suffix names must match the configuration key defined in <a href="https://kafka.apache.org/documentation/#configuration" target="_blank" rel="noopener noreferrer">Apache Kafka</a>.<p id="dli_08_15058__p948323185810">Flink will remove the <strong id="dli_08_15058__b826713913364">properties.</strong> key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via <strong id="dli_08_15058__b11395195717205">'properties.allow.auto.create.topics' = 'false'</strong>.</p>
</li><li id="dli_08_15058__li1678319119157">But there are some configurations that do not support to set, because Flink will override them, e.g. <strong id="dli_08_15058__b1487917953714">key.deserializer</strong> and <strong id="dli_08_15058__b66329161375">value.deserializer</strong>.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row4554123182211"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p8554231182218">format</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p855410313225">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p155453112221">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p65551531202220">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p10324161765517">The format used to deserialize and serialize the value part of Kafka messages.</p>
<p id="dli_08_15058__p990012310114">Either this parameter or the <strong id="dli_08_15058__b1536193763811">value.format</strong> parameter is required.</p>
<ul id="dli_08_15058__ul151896215555"><li id="dli_08_15058__li17189102110554">For details about the message key and body of Kafka messages, see <a href="#dli_08_15058__section9256199230">Key and Value Formats</a>.</li><li id="dli_08_15058__li418910217556">Refer to <a href="dli_08_15014.html">Format</a> for more details and format parameters.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row115558318229"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p45551331122214">key.format</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p1555593115225">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p1685117175713">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p1555831112210">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p13344156135116">The format used to deserialize and serialize the key part of Kafka messages.</p>
<ul id="dli_08_15058__ul14125816519"><li id="dli_08_15058__li1812531195116">If a key format is defined, the <strong id="dli_08_15058__b1871714279421">key.fields</strong> parameter is required as well. Otherwise the Kafka records will have an empty key.</li><li id="dli_08_15058__li134171651171711">Refer to <a href="dli_08_15014.html">Format</a> for more details and format parameters.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row1055523142219"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p65559316223">key.fields</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p1955515318222">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p1355515319223">[]</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p155563192211">List&lt;String&gt;</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p769342515488">Defines an explicit list of physical columns from the table schema that configure the data type for the key format.</p>
<p id="dli_08_15058__p1275275133218">By default, this list is empty and thus a key is undefined. The list should look like <strong id="dli_08_15058__b57041480432">field1;field2</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row115557317227"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p1555519310225">key.fields-prefix</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p125552031152220">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p1655523116225">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p20555231112219">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p9980165494816">Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty.</p>
<p id="dli_08_15058__p12301767493">If a custom prefix is defined, both the table schema and <strong id="dli_08_15058__b1638445844411">key.fields</strong> will work with prefixed names.</p>
<p id="dli_08_15058__p14976185512120">When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format.</p>
<p id="dli_08_15058__p6318135024815">Note that this parameter requires that <strong id="dli_08_15058__b18634481459">value.fields-include</strong> must be set to <strong id="dli_08_15058__b77041546451">EXCEPT_KEY</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row1555133117226"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p10555163112220">value.format</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p1055593118223">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p1892114128578">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p7555193116229">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p8314133141511">The format used to deserialize and serialize the value part of Kafka messages.</p>
<ul id="dli_08_15058__ul166571238111518"><li id="dli_08_15058__li667718114163">Either this parameter or the <strong id="dli_08_15058__b209042298393129">format</strong> parameter is required. If two parameters are configured, a conflict occurs.</li><li id="dli_08_15058__li7846255121711">Refer to <a href="dli_08_15014.html">Format</a> for more details and format parameters.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row15555123102217"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p145552031142215">value.fields-include</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p1955503122213">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p13555133172214">ALL</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p134898438282">Enum</p>
<p id="dli_08_15058__p548954310282">Possible values: [ALL, EXCEPT_KEY]</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p393217303500">Defines a strategy how to deal with key columns in the data type of the value format.</p>
<p id="dli_08_15058__p14555193120228">By default, <strong id="dli_08_15058__b4184839104715">ALL</strong> physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.</p>
</td>
</tr>
<tr id="dli_08_15058__row955523152217"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p1555631202210">scan.startup.mode</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p195551231142217">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p20555431122219">group-offsets</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p17555163112216">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p1955515319224">Startup mode for Kafka consumer.</p>
<p id="dli_08_15058__p18140182923611">Valid values are:</p>
<ul id="dli_08_15058__ul1414013299363"><li id="dli_08_15058__li15140202914369"><strong id="dli_08_15058__b15711631834">earliest-offset</strong>: start from the earliest offset possible.</li><li id="dli_08_15058__li2140029193615"><strong id="dli_08_15058__b748011447315">latest-offset</strong>: start from the latest offset.</li><li id="dli_08_15058__li91401329193610"><strong id="dli_08_15058__b143221244413">group-offsets</strong>: start from committed offsets in ZooKeeper/Kafka brokers of a specific consumer group.</li><li id="dli_08_15058__li1914032910365"><strong id="dli_08_15058__b3497527842">timestamp</strong>: start from user-supplied timestamp for each partition.</li><li id="dli_08_15058__li2093083916533"><strong id="dli_08_15058__b84215441347">specific-offsets</strong>: start from user-supplied specific offsets for each partition, and the position is specified by <strong id="dli_08_15058__b16266125216504">scan.startup.specific-offsets</strong>.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row455511315228"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p455573192210">scan.startup.specific-offsets</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p135551031102218">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p32321118175714">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p7555133192215">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p386918723611">Specify offsets for each partition in case of <strong id="dli_08_15058__b149961144104914">specific-offsets</strong> startup mode, e.g. <strong id="dli_08_15058__b197715754911">partition:0,offset:42;partition:1,offset:300</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row7555193113226"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p12555531162219">scan.startup.timestamp-millis</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p175551312226">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p7668191985713">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p19555173122215">Long</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p1425614893714">Start from the specified epoch timestamp (milliseconds) used in case of <strong id="dli_08_15058__b1773918255500">timestamp</strong> startup mode.</p>
</td>
</tr>
<tr id="dli_08_15058__row1955512313223"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.5.2.2.6.1.1 "><p id="dli_08_15058__p8555193162211">scan.topic-partition-discovery.interval</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.5.2.2.6.1.2 "><p id="dli_08_15058__p17555103120224">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.5.2.2.6.1.3 "><p id="dli_08_15058__p11997142095710">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.5.2.2.6.1.4 "><p id="dli_08_15058__p055553115229">Duration</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.5.2.2.6.1.5 "><p id="dli_08_15058__p1355543119227">Interval for consumer to discover dynamically created Kafka topics and partitions periodically.</p>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="section" id="dli_08_15058__section19581121995819"><h4 class="sectiontitle">Result Table Parameters</h4>
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15058__table31121843531" frame="border" border="1" rules="all"><caption><b>Table 3 </b>Result table parameters</caption><thead align="left"><tr id="dli_08_15058__row1411234145311"><th align="left" class="cellrowborder" valign="top" width="17.98%" id="mcps1.3.6.2.2.6.1.1"><p id="dli_08_15058__p5112444535">Parameter</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="9.74%" id="mcps1.3.6.2.2.6.1.2"><p id="dli_08_15058__p14113164145319">Mandatory</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="9.13%" id="mcps1.3.6.2.2.6.1.3"><p id="dli_08_15058__p1111384175319">Default Value</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="11.57%" id="mcps1.3.6.2.2.6.1.4"><p id="dli_08_15058__p4113194145312">Data Type</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="51.580000000000005%" id="mcps1.3.6.2.2.6.1.5"><p id="dli_08_15058__p19113144185318">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_08_15058__row1711315435312"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p181132043539">connector</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p151131413535">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p1011318425313">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p11131141536">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p1011354175315">Specify what connector to use, for Kafka use <strong id="dli_08_15058__b860162055119">kafka</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row1211317414537"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p21134415313">topic</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p1113134115310">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p1711313455316">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p17113144175314">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p711314417536">Topic name(s) to read data from when the table is used as source. It also supports topic list for source by separating topic by semicolon like <strong id="dli_08_15058__b1299622955112">topic-1;topic-2</strong>.</p>
<p id="dli_08_15058__p8113164185313">Note, only one of <strong id="dli_08_15058__b6909632165117">topic-pattern</strong> and <strong id="dli_08_15058__b1091018324517">topic</strong> can be specified for sources.</p>
<p id="dli_08_15058__p16113244538">When the table is used as sink, the topic name is the topic to write data to. Note topic list is not supported for sinks.</p>
</td>
</tr>
<tr id="dli_08_15058__row11113144165310"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p81133465316">properties.bootstrap.servers</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p13113124145313">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p3113440533">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p1311334155317">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p1611316410535">Comma separated list of Kafka brokers.</p>
</td>
</tr>
<tr id="dli_08_15058__row10113344533"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p811354125311">properties.*</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p511317405311">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p1711314195316">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p911311414531">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p711394125313">This can set and pass arbitrary Kafka configurations.</p>
<ul id="dli_08_15058__ul1511315414535"><li id="dli_08_15058__li9113124105319">Suffix names must match the configuration key defined in <a href="https://kafka.apache.org/documentation/#configuration" target="_blank" rel="noopener noreferrer">Apache Kafka</a>.<p id="dli_08_15058__p81131548539">Flink will remove the <strong id="dli_08_15058__b13392211526">properties.</strong> key prefix and pass the transformed key and values to the underlying KafkaClient. For example, you can disable automatic topic creation via <strong id="dli_08_15058__b14779659182010">'properties.allow.auto.create.topics' = 'false'</strong>.</p>
</li><li id="dli_08_15058__li18114244539">But there are some configurations that do not support to set, because Flink will override them, e.g. <strong id="dli_08_15058__b20452117145214">key.deserializer</strong> and <strong id="dli_08_15058__b104531975526">value.deserializer</strong>.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row101141042534"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p21140411537">format</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p20114194125313">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p91141041538">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p91146445313">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p151141444532">The format used to deserialize and serialize the value part of Kafka messages. Note, either this parameter or the <strong id="dli_08_15058__b198811183526">value.format</strong> parameter is required.</p>
<ul id="dli_08_15058__ul8221744828"><li id="dli_08_15058__li19223441827">For details about the message key and body of Kafka messages, see <a href="#dli_08_15058__section9256199230">Key and Value Formats</a>.</li><li id="dli_08_15058__li1322544223">Refer to <a href="dli_08_15014.html">Format</a> for more details and format parameters.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row811417465313"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p11114114205310">key.format</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p17114134195311">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p18114242538">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p1611417417539">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p61144415534">The format used to deserialize and serialize the key part of Kafka messages.</p>
<ul id="dli_08_15058__ul181147495311"><li id="dli_08_15058__li201141440538">If a key format is defined, the <strong id="dli_08_15058__b18852852155212">key.fields</strong> parameter is required as well. Otherwise the Kafka records will have an empty key.</li><li id="dli_08_15058__li51141485314">Refer to <a href="dli_08_15014.html">Format</a> for more details and format parameters.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row141148465317"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p131141248537">key.fields</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p161141425317">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p21143419537">[]</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p111419465317">List&lt;String&gt;</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p181147418530">Defines an explicit list of physical columns from the table schema that configure the data type for the key format.</p>
<p id="dli_08_15058__p91141945538">By default, this list is empty and thus a key is undefined. The list should look like <strong id="dli_08_15058__b103801478537">field1;field2</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row1114134185316"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p131142419532">key.fields-prefix</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p211414495315">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p161141342536">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p141141748534">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p11148455314">Defines a custom prefix for all fields of the key format to avoid name clashes with fields of the value format. By default, the prefix is empty.</p>
<p id="dli_08_15058__p15114643539">If a custom prefix is defined, both the table schema and <strong id="dli_08_15058__b20811018165318">key.fields</strong> will work with prefixed names.</p>
<p id="dli_08_15058__p71141042537">When constructing the data type of the key format, the prefix will be removed and the non-prefixed names will be used within the key format. Note that this parameter requires that <strong id="dli_08_15058__b19201182905311">value.fields-include</strong> must be set to <strong id="dli_08_15058__b4202182917537">EXCEPT_KEY</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row1011404165313"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p2114340535">value.format</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p911414425314">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p011494125318">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p5114154185311">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p11144455310">The format used to deserialize and serialize the value part of Kafka messages.</p>
<ul id="dli_08_15058__ul411424115313"><li id="dli_08_15058__li191141547535">Either this parameter or the <strong id="dli_08_15058__b2014434513538">format</strong> parameter is required. If two parameters are configured, a conflict occurs.</li><li id="dli_08_15058__li6114154105314">Refer to <a href="dli_08_15014.html">Format</a> for more details and format parameters.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row1611484175314"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p161152419530">value.fields-include</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p171152415537">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p141156412533">ALL</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p71151640533">Enum</p>
<p id="dli_08_15058__p7115748533">Possible values: [ALL, EXCEPT_KEY]</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p9115741535">Defines a strategy how to deal with key columns in the data type of the value format.</p>
<p id="dli_08_15058__p611514416532">By default, <strong id="dli_08_15058__b144280576533">ALL</strong> physical columns of the table schema will be included in the value format which means that key columns appear in the data type for both the key and value format.</p>
</td>
</tr>
<tr id="dli_08_15058__row61161743532"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p811610418536">sink.partitioner</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p131161543531">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p1711612416539">'default'</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p1711664185320">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p51163485311">Output partitioning from Flink's partitions into Kafka's partitions. Valid values are:</p>
<ul id="dli_08_15058__ul711619411537"><li id="dli_08_15058__li15116194175317"><strong id="dli_08_15058__b66391846205416">default</strong>: use the kafka default partitioner to partition records.</li><li id="dli_08_15058__li2116144530"><strong id="dli_08_15058__b1559256195410">fixed</strong>: each Flink partition ends up in at most one Kafka partition.</li><li id="dli_08_15058__li16116741538"><strong id="dli_08_15058__b31571915105514">round-robin</strong>: a Flink partition is distributed to Kafka partitions sticky round-robin. It only works when record's keys are not specified.</li><li id="dli_08_15058__li131162445317">Custom <strong id="dli_08_15058__b852015479556">FlinkKafkaPartitioner</strong> subclass: e.g. <strong id="dli_08_15058__b710210597555">org.mycompany.MyPartitioner</strong>.</li></ul>
</td>
</tr>
<tr id="dli_08_15058__row141161841533"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p411611419537">sink.semantic</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p711620455311">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p1411615465319">at-least-once</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p1711614412533">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p11167420538">Defines the delivery semantic for the Kafka sink. Valid enumerationns are <strong id="dli_08_15058__b17939183577">at-least-once</strong>, <strong id="dli_08_15058__b185673236570">exactly-once</strong>, and <strong id="dli_08_15058__b1991622910575">none</strong>.</p>
</td>
</tr>
<tr id="dli_08_15058__row1711612415538"><td class="cellrowborder" valign="top" width="17.98%" headers="mcps1.3.6.2.2.6.1.1 "><p id="dli_08_15058__p14116344538">sink.parallelism</p>
</td>
<td class="cellrowborder" valign="top" width="9.74%" headers="mcps1.3.6.2.2.6.1.2 "><p id="dli_08_15058__p12116114135314">No</p>
</td>
<td class="cellrowborder" valign="top" width="9.13%" headers="mcps1.3.6.2.2.6.1.3 "><p id="dli_08_15058__p16116444539">None</p>
</td>
<td class="cellrowborder" valign="top" width="11.57%" headers="mcps1.3.6.2.2.6.1.4 "><p id="dli_08_15058__p171161542538">Integer</p>
</td>
<td class="cellrowborder" valign="top" width="51.580000000000005%" headers="mcps1.3.6.2.2.6.1.5 "><p id="dli_08_15058__p121160412530">Defines the parallelism of the Kafka sink operator. By default, the parallelism is determined by the framework: using the same parallelism as the upstream chained operator.</p>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="section" id="dli_08_15058__section9326019161710"><a name="dli_08_15058__section9326019161710"></a><a name="section9326019161710"></a><h4 class="sectiontitle">Metadata</h4><p id="dli_08_15058__p717317446584">You can define metadata in the source table to obtain the metadata of Kafka messages.</p>
<p id="dli_08_15058__p1631183325318">For example, if multiple topics are defined in the <strong id="dli_08_15058__b201104972993129">WITH</strong> parameter and metadata is defined in the Kafka source table, the data read by Flink is labeled with the topic from which the data is read.</p>
</div>
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15058__table7595115635314" frame="border" border="1" rules="all"><caption><b>Table 4 </b>Metadata</caption><thead align="left"><tr id="dli_08_15058__row55955561539"><th align="left" class="cellrowborder" valign="top" width="17.41%" id="mcps1.3.8.2.5.1.1"><p id="dli_08_15058__p14595556115317">Key</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="36.27%" id="mcps1.3.8.2.5.1.2"><p id="dli_08_15058__p5595175612539">Data Type</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="16.34%" id="mcps1.3.8.2.5.1.3"><p id="dli_08_15058__p1859585655319">R/W</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="29.98%" id="mcps1.3.8.2.5.1.4"><p id="dli_08_15058__p145953563537">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_08_15058__row18595156115315"><td class="cellrowborder" valign="top" width="17.41%" headers="mcps1.3.8.2.5.1.1 "><p id="dli_08_15058__p9595956145318">topic</p>
</td>
<td class="cellrowborder" valign="top" width="36.27%" headers="mcps1.3.8.2.5.1.2 "><p id="dli_08_15058__p1259516563534">STRING NOT NULL</p>
</td>
<td class="cellrowborder" valign="top" width="16.34%" headers="mcps1.3.8.2.5.1.3 "><p id="dli_08_15058__p75957563538">R</p>
</td>
<td class="cellrowborder" valign="top" width="29.98%" headers="mcps1.3.8.2.5.1.4 "><p id="dli_08_15058__p341123195710">Topic name of the Kafka record.</p>
</td>
</tr>
<tr id="dli_08_15058__row115951156165312"><td class="cellrowborder" valign="top" width="17.41%" headers="mcps1.3.8.2.5.1.1 "><p id="dli_08_15058__p135951456165320">partition</p>
</td>
<td class="cellrowborder" valign="top" width="36.27%" headers="mcps1.3.8.2.5.1.2 "><p id="dli_08_15058__p15595135615538">INT NOT NULL</p>
</td>
<td class="cellrowborder" valign="top" width="16.34%" headers="mcps1.3.8.2.5.1.3 "><p id="dli_08_15058__p95951956115313">R</p>
</td>
<td class="cellrowborder" valign="top" width="29.98%" headers="mcps1.3.8.2.5.1.4 "><p id="dli_08_15058__p259517562538">Partition ID of the Kafka record.</p>
</td>
</tr>
<tr id="dli_08_15058__row1159585620531"><td class="cellrowborder" valign="top" width="17.41%" headers="mcps1.3.8.2.5.1.1 "><p id="dli_08_15058__p5595456185315">headers</p>
</td>
<td class="cellrowborder" valign="top" width="36.27%" headers="mcps1.3.8.2.5.1.2 "><p id="dli_08_15058__p2059525635313">MAP&lt;STRING, BYTES&gt; NOT NULL</p>
</td>
<td class="cellrowborder" valign="top" width="16.34%" headers="mcps1.3.8.2.5.1.3 "><p id="dli_08_15058__p17595756185319">R/W</p>
</td>
<td class="cellrowborder" valign="top" width="29.98%" headers="mcps1.3.8.2.5.1.4 "><p id="dli_08_15058__p9480165812579">Headers of the Kafka record as a map of raw bytes.</p>
</td>
</tr>
<tr id="dli_08_15058__row124337451411"><td class="cellrowborder" valign="top" width="17.41%" headers="mcps1.3.8.2.5.1.1 "><p id="dli_08_15058__p194331145447">leader-epoch</p>
</td>
<td class="cellrowborder" valign="top" width="36.27%" headers="mcps1.3.8.2.5.1.2 "><p id="dli_08_15058__p6433445644">INT NULL</p>
</td>
<td class="cellrowborder" valign="top" width="16.34%" headers="mcps1.3.8.2.5.1.3 "><p id="dli_08_15058__p104334451643">R</p>
</td>
<td class="cellrowborder" valign="top" width="29.98%" headers="mcps1.3.8.2.5.1.4 "><p id="dli_08_15058__p114331745346">Leader epoch of the Kafka record if available.</p>
</td>
</tr>
<tr id="dli_08_15058__row859520562533"><td class="cellrowborder" valign="top" width="17.41%" headers="mcps1.3.8.2.5.1.1 "><p id="dli_08_15058__p759518566538">offset</p>
</td>
<td class="cellrowborder" valign="top" width="36.27%" headers="mcps1.3.8.2.5.1.2 "><p id="dli_08_15058__p459515569535">BIGINT NOT NULL</p>
</td>
<td class="cellrowborder" valign="top" width="16.34%" headers="mcps1.3.8.2.5.1.3 "><p id="dli_08_15058__p1959565625311">R</p>
</td>
<td class="cellrowborder" valign="top" width="29.98%" headers="mcps1.3.8.2.5.1.4 "><p id="dli_08_15058__p659517566534">Offset of the Kafka record in the partition.</p>
</td>
</tr>
<tr id="dli_08_15058__row1859516567535"><td class="cellrowborder" valign="top" width="17.41%" headers="mcps1.3.8.2.5.1.1 "><p id="dli_08_15058__p15959563532">timestamp</p>
</td>
<td class="cellrowborder" valign="top" width="36.27%" headers="mcps1.3.8.2.5.1.2 "><p id="dli_08_15058__p6595125619532">TIMESTAMP(3) WITH LOCAL TIME ZONE NOT NULL</p>
</td>
<td class="cellrowborder" valign="top" width="16.34%" headers="mcps1.3.8.2.5.1.3 "><p id="dli_08_15058__p3595256145317">R/W</p>
</td>
<td class="cellrowborder" valign="top" width="29.98%" headers="mcps1.3.8.2.5.1.4 "><p id="dli_08_15058__p135958561531">Timestamp of the Kafka record.</p>
</td>
</tr>
<tr id="dli_08_15058__row51111649345"><td class="cellrowborder" valign="top" width="17.41%" headers="mcps1.3.8.2.5.1.1 "><p id="dli_08_15058__p211119493420">timestamp-type</p>
</td>
<td class="cellrowborder" valign="top" width="36.27%" headers="mcps1.3.8.2.5.1.2 "><p id="dli_08_15058__p1511110491244">STRING NOT NULL</p>
</td>
<td class="cellrowborder" valign="top" width="16.34%" headers="mcps1.3.8.2.5.1.3 "><p id="dli_08_15058__p811111495410">R</p>
</td>
<td class="cellrowborder" valign="top" width="29.98%" headers="mcps1.3.8.2.5.1.4 "><p id="dli_08_15058__p1412210418512">Timestamp type of the Kafka record.</p>
<ul id="dli_08_15058__ul712284110514"><li id="dli_08_15058__li161223411955"><strong id="dli_08_15058__b204044988893129">NoTimestampType</strong>: No timestamp is defined in the message.</li><li id="dli_08_15058__li2122144116514"><strong id="dli_08_15058__b62529402793129">CreateTime</strong>: time when the message is generated.</li><li id="dli_08_15058__li4995445465"><strong id="dli_08_15058__b131772780593129">LogAppendTime</strong>: time when the message is added to the Kafka broker.</li></ul>
</td>
</tr>
</tbody>
</table>
</div>
<div class="section" id="dli_08_15058__section9256199230"><a name="dli_08_15058__section9256199230"></a><a name="section9256199230"></a><h4 class="sectiontitle">Key and Value Formats</h4><p id="dli_08_15058__p986261811411">Both the key and value part of a Kafka record can be serialized to and deserialized from raw bytes using one of the given <a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/overview/" target="_blank" rel="noopener noreferrer">formats</a>.</p>
<ul id="dli_08_15058__ul182051654442"><li id="dli_08_15058__li20205105410418"><strong id="dli_08_15058__b204923259131">Value Format</strong><p id="dli_08_15058__p58766585417">Since a key is optional in Kafka records, the following statement reads and writes records with a configured value format but without a key format. The <strong id="dli_08_15058__b208371955141320">format</strong> parameter is a synonym for <strong id="dli_08_15058__b1517961421419">value.format</strong>. All format options are prefixed with the format identifier.</p>
<pre class="screen" id="dli_08_15058__screen102204191450">CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'format' = 'json',
'json.ignore-parse-errors' = 'true'
)</pre>
<p id="dli_08_15058__p13121037250">The value format will be configured with the following data type:</p>
<pre class="screen" id="dli_08_15058__screen620315531151">ROW&lt;`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING&gt;</pre>
</li><li id="dli_08_15058__li11727313614"><strong id="dli_08_15058__b109180403146">Key and Value Format</strong><p id="dli_08_15058__p168500231565">The following example shows how to specify and configure key and value formats. The format options are prefixed with either the <strong id="dli_08_15058__b4436921519">key</strong> or <strong id="dli_08_15058__b1655019139155">value</strong> plus format identifier.</p>
<pre class="screen" id="dli_08_15058__screen1354113545615">CREATE TABLE KafkaTable (
`ts` TIMESTAMP(3) METADATA FROM 'timestamp',
`user_id` BIGINT,
`item_id` BIGINT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.json.ignore-parse-errors' = 'true',
'key.fields' = 'user_id;item_id',
'value.format' = 'json',
'value.json.fail-on-missing-field' = 'false',
'value.fields-include' = 'ALL'
)</pre>
<p id="dli_08_15058__p9019538616">The key format includes the fields listed in <strong id="dli_08_15058__b155722034131515">key.fields</strong> (using <strong id="dli_08_15058__b31381840121518">;</strong> as the delimiter) in the same order. Thus, it will be configured with the following data type:</p>
<pre class="screen" id="dli_08_15058__screen97252561376">ROW&lt;`user_id` BIGINT, `item_id` BIGINT&gt;</pre>
<p id="dli_08_15058__p196808471710">Since the value format is configured with <strong id="dli_08_15058__b1093311718168">'value.fields-include' = 'ALL'</strong>, key fields will also end up in the value format's data type:</p>
<pre class="screen" id="dli_08_15058__screen131114203812">ROW&lt;`user_id` BIGINT, `item_id` BIGINT, `behavior` STRING&gt;</pre>
</li><li id="dli_08_15058__li770113555415"><strong id="dli_08_15058__b1422612911161">Overlapping Format Fields</strong><p id="dli_08_15058__p126961015911">The connector cannot split the table's columns into key and value fields based on schema information if both key and value formats contain fields of the same name. The <strong id="dli_08_15058__b1391614514188">key.fields-prefix</strong> parameter allows to give key columns a unique name in the table schema while keeping the original names when configuring the key format.</p>
<p id="dli_08_15058__p14696701395">The following example shows a key and value format that both contain a version field:</p>
<pre class="screen" id="dli_08_15058__screen1774113217912">CREATE TABLE KafkaTable (
`k_version` INT,
`k_user_id` BIGINT,
`k_item_id` BIGINT,
`version` INT,
`behavior` STRING
) WITH (
'connector' = 'kafka',
...
'key.format' = 'json',
'key.fields-prefix' = 'k_',
'key.fields' = 'k_version;k_user_id;k_item_id',
'value.format' = 'json',
'value.fields-include' = 'EXCEPT_KEY'
)</pre>
<p id="dli_08_15058__p55079290918">The value format must be configured in <strong id="dli_08_15058__b177846422185">EXCEPT_KEY</strong> mode. The formats will be configured with the following data types:</p>
<pre class="screen" id="dli_08_15058__screen2051874915918">Key format:
ROW&lt;`version` INT, `user_id` BIGINT, `item_id` BIGINT&gt;
Value format:
ROW&lt;`version` INT, `behavior` STRING&gt;</pre>
</li></ul>
</div>
<div class="section" id="dli_08_15058__section12233124102"><a name="dli_08_15058__section12233124102"></a><a name="section12233124102"></a><h4 class="sectiontitle">Topic and Partition Discovery</h4><p id="dli_08_15058__p5997430105">The config parameters <strong id="dli_08_15058__b11886245141916">topic</strong> and <strong id="dli_08_15058__b1818185013192">topic-pattern</strong> specify the topics or topic pattern to consume for source. The config parameter <strong id="dli_08_15058__b16464153272010">topic</strong> can accept topic list using semicolon separator like <strong id="dli_08_15058__b12645442102012">topic-1;topic-2</strong>. The config parameter <strong id="dli_08_15058__b637237219">topic-pattern</strong> will use regular expression to discover the matched topic. For example, if the <strong id="dli_08_15058__b2449214202117">topic-pattern</strong> is <strong id="dli_08_15058__b20432418152110">test-topic-[0-9]</strong>, then all topics with names that match the specified regular expression (starting with test-topic- and ending with a single digit)) will be subscribed by the consumer when the job starts running.</p>
<p id="dli_08_15058__p10991243181015">To allow the consumer to discover dynamically created topics after the job started running, set a non-negative value for <strong id="dli_08_15058__b84771850182116">scan.topic-partition-discovery.interval</strong>. This allows the consumer to discover partitions of new topics with names that also match the specified pattern.</p>
<div class="note" id="dli_08_15058__note58731728115910"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="dli_08_15058__p1287302810592">Note that topic list and topic pattern only work in sources. In sinks, Flink currently only supports a single topic.</p>
</div></div>
</div>
<div class="section" id="dli_08_15058__dli_08_0237_section11421328152716"><h4 class="sectiontitle">Example 1: Reading DMS Kafka Metadata in CSV Format and Outputting It to a Kafka Sink (Applicable for Kafka Clusters Without SASL_SSL Enabled)</h4><ol id="dli_08_15058__ol14771132475"><li id="dli_08_15058__li6127741153217">Create an enhanced datasource connection in the VPC and subnet where Kafka locates, and bind the connection to the required Flink elastic resource pool.</li><li id="dli_08_15058__li1233002211514">Set Kafka security groups and add inbound rules to allow access from the Flink queue. Test the connectivity using the Kafka address. If the connection passes the test, it is bound to the queue.</li><li id="dli_08_15058__li1077111334713">Create a Flink OpenSource SQL job. Enter the following job script and submit the job.<div class="p" id="dli_08_15058__p39352912333"><a name="dli_08_15058__li1077111334713"></a><a name="li1077111334713"></a>When you create a job, set <strong id="dli_08_15058__b7256398122731">Flink Version</strong> to <strong id="dli_08_15058__b174201695122731">1.15</strong> in the <strong id="dli_08_15058__b2941065522731">Running Parameters</strong> tab. Select <strong id="dli_08_15058__b152702732722731">Save Job Log</strong>, and specify the OBS bucket for saving job logs. <strong id="dli_08_15058__b72618834291126">Change the values of the parameters in bold as needed in the following script.</strong><pre class="screen" id="dli_08_15058__screen73275223261">CREATE TABLE kafkaSource(
`topic` String metadata virtual,
`partition` int metadata virtual,
`headers` MAP&lt;STRING, BYTES&gt; metadata virtual,
`leader-epoch` INT metadata virtual,
`offset` bigint metadata virtual,
`timestamp-type` string metadata virtual,
`event_time` TIMESTAMP(3) metadata FROM 'timestamp',
`message` string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15058__i496614336446"><strong id="dli_08_15058__b19668334448">SourceKafka</strong></em><em id="dli_08_15058__i1696653344418"><strong id="dli_08_15058__b696673374419">Topic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i366724417446"><strong id="dli_08_15058__b1466716449449">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<em id="dli_08_15058__i18890145118441"><strong id="dli_08_15058__b208903515444">GroupId</strong></em>',
'scan.startup.mode' = 'latest-offset',
'format' = 'csv',
'csv.field-delimiter' = '\u0001',
'csv.quote-character' = ''''
);
CREATE TABLE kafkaSink (
`topic` String,
`partition` int,
`headers` MAP&lt;STRING, BYTES&gt;,
`leader-epoch` INT,
`offset` bigint,
`timestampType` string,
`event_time` TIMESTAMP(3),
`message` string -- Indicates that data written by users is read from Kafka.
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15058__i11511627193513"><strong id="dli_08_15058__b9511927133516">SinkKafka</strong></em><em id="dli_08_15058__i1651627133518"><strong id="dli_08_15058__b251162718354">Topic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i45102717356"><strong id="dli_08_15058__b165172716353">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'format' = 'json'
);
insert into kafkaSink select * from kafkaSource;</pre>
</div>
</li><li id="dli_08_15058__li122711798216">Send the following data to the topic of the source table in Kafka. The Kafka topic is kafkaSource.<p id="dli_08_15058__p113085101325"><a name="dli_08_15058__li122711798216"></a><a name="li122711798216"></a></p>
<pre class="screen" id="dli_08_15058__screen29631127162115">{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}</pre>
</li><li id="dli_08_15058__li185511218316">Read the topic of the Kafka result table. The Kafka topic is kafkaSink.<pre class="screen" id="dli_08_15058__screen3907125312210">{"topic":"kafkaSource","partition":1,"headers":{},"leader-epoch":0,"offset":4,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.369","message":"{\"order_id\":\"202103251202020001\", \"order_channel\":\"miniAppShop\", \"order_time\":\"2021-03-25 12:02:02\", \"pay_amount\":\"60.00\", \"real_pay\":\"60.00\", \"pay_time\":\"2021-03-25 12:03:00\", \"user_id\":\"0002\", \"user_name\":\"Bob\", \"area_id\":\"330110\"}"}
{"topic":"kafkaSource","partition":0,"headers":{},"leader-epoch":0,"offset":6,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.367","message":"{\"order_id\":\"202103241000000001\",\"order_channel\":\"webShop\",\"order_time\":\"2021-03-24 10:00:00\",\"pay_amount\":100.0,\"real_pay\":100.0,\"pay_time\":\"2021-03-24 10:02:03\",\"user_id\":\"0001\",\"user_name\":\"Alice\",\"area_id\":\"330106\"}"}
{"topic":"kafkaSource","partition":2,"headers":{},"leader-epoch":0,"offset":5,"timestampType":"LogAppendTime","event_time":"2023-11-16 11:16:30.368","message":"{\"order_id\":\"202103241606060001\",\"order_channel\":\"appShop\",\"order_time\":\"2021-03-24 16:06:06\",\"pay_amount\":200.0,\"real_pay\":180.0,\"pay_time\":\"2021-03-24 16:10:06\",\"user_id\":\"0001\",\"user_name\":\"Alice\",\"area_id\":\"330106\"}"}</pre>
</li></ol>
</div>
<div class="section" id="dli_08_15058__section14937135293113"><h4 class="sectiontitle">Example 2: Using DMS Kafka in JSON Format as the Source Table and Outputting It to a Kafka Sink (Applicable for Kafka Clusters Without SASL_SSL Enabled)</h4><div class="p" id="dli_08_15058__p974717371324"><strong id="dli_08_15058__b1336838136">Use the Kafka source table and Kafka result table to read JSON data from Kafka and output it to the log file.</strong><ol id="dli_08_15058__ol17226191620486"><li id="dli_08_15058__li10278194718557">Create an enhanced datasource connection in the VPC and subnet where Kafka locates, and bind the connection to the required Flink elastic resource pool.</li><li id="dli_08_15058__li182781747185517">Set Kafka security groups and add inbound rules to allow access from the Flink queue. Test the connectivity using the Kafka address. If the connection passes the test, it is bound to the queue.</li><li id="dli_08_15058__li722618161489">Create a Flink OpenSource SQL job. Enter the following job script and submit the job.<div class="p" id="dli_08_15058__p1790134833214"><a name="dli_08_15058__li722618161489"></a><a name="li722618161489"></a>When you create a job, set <strong id="dli_08_15058__b71365874822731">Flink Version</strong> to <strong id="dli_08_15058__b170711342922731">1.15</strong> in the <strong id="dli_08_15058__b48347465822731">Running Parameters</strong> tab. Select <strong id="dli_08_15058__b56572100522731">Save Job Log</strong>, and specify the OBS bucket for saving job logs. <strong id="dli_08_15058__b74827823391126">Change the values of the parameters in bold as needed in the following script.</strong><pre class="screen" id="dli_08_15058__screen116021338175619">CREATE TABLE kafkaSource(
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15058__i1852971016241"><strong id="dli_08_15058__b1529111019246">KafkaSource</strong></em><em id="dli_08_15058__i95291010112413"><strong id="dli_08_15058__b13529151032417">Topic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i1455522182415"><strong id="dli_08_15058__b1145532262417">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<em id="dli_08_15058__i117181930132417"><strong id="dli_08_15058__b1771833013246">GroupId</strong></em>',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE kafkaSink (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15058__i108751536142412"><strong id="dli_08_15058__b7875113618244">KafkaSink</strong></em><em id="dli_08_15058__i1587563622413"><strong id="dli_08_15058__b587563613243">Topic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i3603175016243"><strong id="dli_08_15058__b17603155072419">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'format' = 'json'
);
insert into kafkaSink select * from kafkaSource;</pre>
</div>
</li><li id="dli_08_15058__li3226161674814">Send the following data to the topic of the source table in Kafka:<pre class="screen" id="dli_08_15058__screen16679723173112">{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}</pre>
</li><li id="dli_08_15058__li72262162488">Read the topic of the Kafka result table. The data results are as follows:<pre class="screen" id="dli_08_15058__screen24791121123516">{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103251202020001", "order_channel":"miniAppShop", "order_time":"2021-03-25 12:02:02", "pay_amount":"60.00", "real_pay":"60.00", "pay_time":"2021-03-25 12:03:00", "user_id":"0002", "user_name":"Bob", "area_id":"330110"}</pre>
</li></ol>
</div>
</div>
<div class="section" id="dli_08_15058__section1376121165311"><h4 class="sectiontitle">Example 3: Using DMS Kafka as the Source Table and Print as the Result Table (Applicable for Kafka Clusters with SASL_SSL Enabled)</h4><p id="dli_08_15058__p5450046113612">Create a Kafka cluster for DMS, enable SASL_SSL, download the SSL certificate, and upload the downloaded certificate <strong id="dli_08_15058__b321118145364">client.jks</strong> to an OBS bucket.</p>
<p id="dli_08_15058__p160454572115">The <strong id="dli_08_15058__b1194725417291">properties.sasl.jaas.config</strong> field contains account passwords encrypted using DEW.</p>
<pre class="screen" id="dli_08_15058__screen691531701511">CREATE TABLE ordersSource (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15058__i11250649149"><strong id="dli_08_15058__b156722481410">KafkaTopic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i125069597418"><strong id="dli_08_15058__b105061759347">KafkaAddress1</strong></em>:9093,<em id="dli_08_15058__i729319217516"><strong id="dli_08_15058__b329332052">KafkaAddress2</strong></em>:9093',
'properties.group.id' = '<em id="dli_08_15058__i62481015514"><strong id="dli_08_15058__b0413291651">GroupId</strong></em>',
'scan.startup.mode' = 'latest-offset',
'properties.connector.auth.open' = 'true',
'properties.ssl.truststore.location' = '<em id="dli_08_15058__i1247516510209"><strong id="dli_08_15058__b163376472019">obs://xx/client.jks</strong></em>', -- Location where the user uploads the certificate to
'properties.sasl.mechanism' = '<em id="dli_08_15058__i1718229184518"><strong id="dli_08_15058__b044918298459">PLAIN</strong></em>',
'properties.security.protocol' = '<em id="dli_08_15058__i34633124520"><strong id="dli_08_15058__b19771173224515">SASL_SSL</strong></em>',
'properties.sasl.jaas.config' = '<em id="dli_08_15058__i2062413163209"><strong id="dli_08_15058__b192781916182011">xx</strong></em>', -- Key in DEW secret management, whose value is like <strong id="dli_08_15058__b5562857165911">org.apache.kafka.common.security.plain.PlainLoginModule required username=xx password=xx</strong>;
'format' = 'json',
'dew.endpoint' = '<strong id="dli_08_15058__b193571336154611"><em id="dli_08_15058__i798613417467">kms.xx.com</em></strong>', --<em id="dli_08_15058__i94633592598">Endpoint information for the DEW service being used</em>
'dew.csms.secretName' = '<strong id="dli_08_15058__b0319143164615"><em id="dli_08_15058__i87919438467">xx</em></strong>', --<em id="dli_08_15058__i75071619015">Name of the DEW shared secret</em>
'dew.csms.decrypt.fields' = 'properties.sasl.jaas.config', --The <strong id="dli_08_15058__b22361242019">properties.sasl.jaas.config</strong> field value must be decrypted and replaced using DEW secret management.
'dew.csms.version' = 'v1'
);
CREATE TABLE ordersSink (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'print'
);
insert into ordersSink select * from ordersSource;</pre>
</div>
<div class="section" id="dli_08_15058__section0883191453419"><h4 class="sectiontitle">Example 4: Using Kafka (MRS Cluster) as the Source Table and Print as the Result Table (Applicable for Kafka with SASL_SSL Enabled and MRS Using Kerberos Authentication)</h4><ul id="dli_08_15058__ul1526213104347"><li id="dli_08_15058__li1726220102344">Enable Kerberos authentication for the MRS cluster.</li><li id="dli_08_15058__li192626101347">Click the <strong id="dli_08_15058__b19180183433715">Components</strong> tab and click <strong id="dli_08_15058__b81811534103719">Kafka</strong>. On the displayed page, click the <strong id="dli_08_15058__b5182934183715">Service Configuration</strong> tab, locate the <strong id="dli_08_15058__b1183123411374">ssl.mode.enable</strong>, set it to <strong id="dli_08_15058__b17184834203716">true</strong>, and restart Kafka.</li><li id="dli_08_15058__li32629108343">Download the user credential. Log in to the MRS Manager of the MRS cluster and choose <strong id="dli_08_15058__b1531164217212">System</strong> &gt; <strong id="dli_08_15058__b15311342112116">Permission</strong> &gt; <strong id="dli_08_15058__b18532742172112">User</strong>. Locate the row that contains the target user, click <strong id="dli_08_15058__b1053314232117">More</strong>, and select <strong id="dli_08_15058__b1853374242113">Download Authentication Credential</strong>.<p id="dli_08_15058__p13262101019344">Obtain the <strong id="dli_08_15058__b108788542593129">truststore.jks</strong> file using the authentication credential and store the credential and <strong id="dli_08_15058__b203988587793129">truststore.jks</strong> file in OBS.</p>
</li><li id="dli_08_15058__li17262151015342">If "Message stream modified (41)" is displayed, the JDK version may be incorrect. Change the JDK version in the sample code to a version earlier than 8u_242 or delete the <strong id="dli_08_15058__b147852978893129">renew_lifetime = 0m</strong> configuration item from the <strong id="dli_08_15058__b195206023093129">krb5.conf</strong> configuration file.</li><li id="dli_08_15058__li10262121053413">Set the port to the <strong id="dli_08_15058__b3400192973814">sasl_ssl.port</strong> configured in the Kafka service configuration. The default value is <strong id="dli_08_15058__b1743975719386">21009</strong>.</li><li id="dli_08_15058__li026211105348">In the following statements, set <strong id="dli_08_15058__b37502018593129">security.protocol</strong> to <strong id="dli_08_15058__b25532671193129">SASL_SSL</strong>.</li><li id="dli_08_15058__li4437203142110">The <strong id="dli_08_15058__b414164893018">properties.ssl.truststore.password</strong> field in the <strong id="dli_08_15058__b1968115515300">with</strong> parameter is encrypted using DEW.</li></ul>
<pre class="screen" id="dli_08_15058__screen1526317103341">CREATE TABLE ordersSource (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<strong id="dli_08_15058__b192636102348"><em id="dli_08_15058__i12263131083415">kafkaTopic</em></strong>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i9263010203416"><strong id="dli_08_15058__b326391063414">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<strong id="dli_08_15058__b5263710163417"><em id="dli_08_15058__i5263161018342">GroupId</em></strong>',
'scan.startup.mode' = 'latest-offset',
'properties.sasl.kerberos.service.name' = 'kafka', -- <em id="dli_08_15058__i262961112016">Value configured in the MRS cluster</em>
'properties.connector.auth.open' = 'true',
'properties.connector.kerberos.principal' = 'xx', --Username
'properties.connector.kerberos.krb5' = 'obs://xx/krb5.conf',
'properties.connector.kerberos.keytab' = 'obs://xx/user.keytab',
'properties.security.protocol' = 'SASL_SSL',
'properties.ssl.truststore.location' = 'obs://xx/truststore.jks',
'properties.ssl.truststore.password' = 'xx', -- <em id="dli_08_15058__i14770163322916">Key in the DEW secret</em>
'properties.sasl.mechanism' = 'GSSAPI',
'format' = 'json',
'dew.endpoint'='kms.xx.xx.com', --<em id="dli_08_15058__i11518514123920">Endpoint information for the DEW service being used</em>
'dew.csms.secretName'='xx', --<em id="dli_08_15058__i6305161920391">Name of the DEW shared secret</em>
'dew.csms.decrypt.fields'='properties.ssl.truststore.password', --The <strong id="dli_08_15058__b915282283910">properties.ssl.truststore.password</strong> field value must be decrypted and replaced using DEW secret management.
'dew.csms.version'='v1'
);
CREATE TABLE ordersSink (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'print'
);
insert into ordersSink select * from ordersSource;</pre>
</div>
<div class="section" id="dli_08_15058__section158020489345"><h4 class="sectiontitle">Example 5: Using Kafka (MRS Cluster) as the Source Table and Print as the Result Table (Applicable for Kafka with SASL_SSL Enabled and MRS Using SASL_PLAINTEXT with Kerberos Authentication)</h4><ul id="dli_08_15058__ul20263810173412"><li id="dli_08_15058__li126391023415">Enable Kerberos authentication for the MRS cluster.</li><li id="dli_08_15058__li7263171017344">Click the <strong id="dli_08_15058__b9527135318391">Components</strong> tab and click <strong id="dli_08_15058__b652815319399">Kafka</strong>. On the displayed page, click the <strong id="dli_08_15058__b165299532397">Service Configuration</strong> tab, locate the <strong id="dli_08_15058__b14530145353918">ssl.mode.enable</strong>, set it to <strong id="dli_08_15058__b1953145311392">true</strong>, and restart Kafka.</li><li id="dli_08_15058__li8263151003417">Log in to the MRS Manager of the MRS cluster and download the user credential. Choose <strong id="dli_08_15058__b71893183393129">System</strong> &gt; <strong id="dli_08_15058__b84090987493129">Permission</strong> &gt; <strong id="dli_08_15058__b171855413493129">User</strong>. Locate the row that contains the target user, choose <strong id="dli_08_15058__b144907871193129">More</strong> &gt; <strong id="dli_08_15058__b75104967493129">Download Authentication Credential</strong>. Upload the credential to OBS.</li><li id="dli_08_15058__li182635108344">If error message "Message stream modified (41)" is displayed, the JDK version may be incorrect. Change the JDK version in the sample code to a version earlier than 8u_242 or delete the <strong id="dli_08_15058__b1811399593129">renew_lifetime = 0m</strong> configuration item from the <strong id="dli_08_15058__b195309277993129">krb5.conf</strong> configuration file.</li><li id="dli_08_15058__li15263121013348">Set the port to the <strong id="dli_08_15058__b2475151618402">sasl.port</strong> configured in the Kafka service configuration. The default value is <strong id="dli_08_15058__b18476101610403">21007</strong>.</li><li id="dli_08_15058__li526321083414">In the following statements, set <strong id="dli_08_15058__b108722009993129">security.protocol</strong> to <strong id="dli_08_15058__b198762499293129">SASL_PLAINTEXT</strong>.</li></ul>
<pre class="screen" id="dli_08_15058__screen726511109345">CREATE TABLE ordersSource (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15058__i11264171053413"><strong id="dli_08_15058__b1926431003414">Kafka</strong></em><em id="dli_08_15058__i2264710143410"><strong id="dli_08_15058__b026413106345">Topic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i1926413102346"><strong id="dli_08_15058__b1926491023416">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<strong id="dli_08_15058__b152641310163412"><em id="dli_08_15058__i226431083418">GroupId</em></strong>',
'scan.startup.mode' = 'latest-offset',
'properties.sasl.kerberos.service.name' = 'kafka', -- <em id="dli_08_15058__i1690114410297">Configured in the MRS cluster</em>
'properties.connector.auth.open' = 'true',
'properties.connector.kerberos.principal' = '<em id="dli_08_15058__i633282018527"><strong id="dli_08_15058__b16438181935210">xx</strong></em>',
'properties.connector.kerberos.krb5' = '<em id="dli_08_15058__i1115062414524"><strong id="dli_08_15058__b198721923185212">obs://xx/krb5.conf</strong></em>',
'properties.connector.kerberos.keytab' = '<em id="dli_08_15058__i17751202714524"><strong id="dli_08_15058__b18557142717529">obs://xx/user.keytab</strong></em>',
'properties.security.protocol' = 'SASL_PLAINTEXT',
'properties.sasl.mechanism' = 'GSSAPI',
'format' = 'json'
);
CREATE TABLE ordersSink (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'print'
);
insert into ordersSink select * from ordersSource;</pre>
</div>
<div class="section" id="dli_08_15058__section9534710123511"><h4 class="sectiontitle">Example 6: Using Kafka (MRS Cluster) as the Source Table and Print as the Result Table (Applicable for Kafka with SSL Enabled and MRS Without Kerberos Authentication Enabled)</h4><ul id="dli_08_15058__ul32661610163410"><li id="dli_08_15058__li1326519106343">Do not enable Kerberos authentication for the MRS cluster.</li><li id="dli_08_15058__li1526516109343">Download the user credential. Log in to the MRS Manager of the MRS cluster and choose <strong id="dli_08_15058__b194261050143319">System</strong> &gt; <strong id="dli_08_15058__b114271650193317">Permission</strong> &gt; <strong id="dli_08_15058__b5428155016339">User</strong>. Locate the row that contains the target user, click <strong id="dli_08_15058__b642818503339">More</strong>, and select <strong id="dli_08_15058__b34291450143320">Download Authentication Credential</strong>.<p id="dli_08_15058__p152658100348">Obtain the <strong id="dli_08_15058__b89141071393129">truststore.jks</strong> file using the authentication credential and store the credential and <strong id="dli_08_15058__b146042723893129">truststore.jks</strong> file in OBS.</p>
</li><li id="dli_08_15058__li15265131014344">Set the port to the <strong id="dli_08_15058__b11101244134016">ssl.port</strong> configured in the Kafka service configuration. The default value is <strong id="dli_08_15058__b15111344134014">9093</strong>.</li><li id="dli_08_15058__li132655100349">Set <strong id="dli_08_15058__b196111544113414">security.protocol</strong> in the <strong id="dli_08_15058__b956784812348">with</strong> parameter to <strong id="dli_08_15058__b1548765013417">SSL</strong>.</li><li id="dli_08_15058__li152655109341">In the Kafka configuration of the MRS cluster, set <strong id="dli_08_15058__b82018263244">ssl.mode.enable</strong> to <strong id="dli_08_15058__b19158172914243">true</strong> and restart Kafka.</li><li id="dli_08_15058__li6785181312217">The <strong id="dli_08_15058__b1391819633120">properties.ssl.truststore.password</strong> field in the <strong id="dli_08_15058__b991917620314">with</strong> parameter is encrypted using DEW.<pre class="screen" id="dli_08_15058__screen8266610113411">CREATE TABLE ordersSource (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15058__i1726581083418"><strong id="dli_08_15058__b826581063410">kafkaTopic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15058__i1726521017344"><strong id="dli_08_15058__b152651810133419">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<em id="dli_08_15058__i426511107345"><strong id="dli_08_15058__b1226551011341">GroupId</strong></em>',
'scan.startup.mode' = 'latest-offset',
'properties.connector.auth.open' = 'true',
'properties.ssl.truststore.location' = '<em id="dli_08_15058__i13943102114534"><strong id="dli_08_15058__b196031621145319">o</strong><strong id="dli_08_15058__b17468216125317">bs://xx/truststore.jks</strong></em>',
'properties.ssl.truststore.password' = '<em id="dli_08_15058__i8117104332318"><strong id="dli_08_15058__b137152042192310">xx</strong></em>', -- Key for DEW secret management, whose value is the password set when generating <strong id="dli_08_15058__b1158104003613">truststore.jks</strong>
'properties.security.protocol' = '<strong id="dli_08_15058__b2040112332537"><em id="dli_08_15058__i1652914328536">SSL</em></strong>',
'format' = 'json',
'dew.endpoint' = '<strong id="dli_08_15058__b16427647162314"><em id="dli_08_15058__i1042784710231">kms.xx.com</em></strong>', --<em id="dli_08_15058__i1842718474239">Endpoint information for the DEW service being used</em>
'dew.csms.secretName' = '<strong id="dli_08_15058__b71009509233"><em id="dli_08_15058__i19100950172317">xx</em></strong>', --<em id="dli_08_15058__i310011502233">Name of the DEW shared secret</em>
'dew.csms.decrypt.fields' = '<strong id="dli_08_15058__b87634917532"><em id="dli_08_15058__i12757174819536">properties.ssl.truststore.password</em></strong>', --The <strong id="dli_08_15058__b1659485720399">properties.ssl.truststore.password</strong> field value must be decrypted and replaced using DEW secret management.
'dew.csms.version' = 'v1'
);
CREATE TABLE ordersSink (
order_id string,
order_channel string,
order_time timestamp(3),
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'print'
);
insert into ordersSink select * from ordersSource;</pre>
</li></ul>
</div>
<div class="section" id="dli_08_15058__section6615132175310"><h4 class="sectiontitle">FAQ</h4><ul id="dli_08_15058__ul2134161411254"><li id="dli_08_15058__li12490182212593"><strong id="dli_08_15058__b149901948113615">Q: What should I do if the Flink job execution fails and the log contains the following error information?</strong><pre class="screen" id="dli_08_15058__screen1325344945818">org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata</pre>
<p id="dli_08_15058__p169161432181415">A: The datasource connection is not bound, the binding fails, or the security group of the Kafka cluster is not configured to allow access from the network segment of the DLI queue. Reconfigure the datasource connection or configure the security group of the Kafka cluster to allow access from the DLI queue.</p>
</li><li id="dli_08_15058__li1604152318012"><strong id="dli_08_15058__b15674125111369">Q: What should I do if the Flink job execution fails and the log contains the following error information?</strong><pre class="screen" id="dli_08_15058__screen78029291301">Caused by: java.lang.RuntimeException: RealLine:45;Table 'default_catalog.default_database.printSink' declares persistable metadata columns, but the underlying DynamicTableSink doesn't implement the SupportsWritingMetadata interface. If the column should not be persisted, it can be declared with the VIRTUAL keyword.</pre>
<p id="dli_08_15058__p14021031803">A: The metadata type is defined in the sink table, but the Print connector does not support deletion of matadata from the sink table.</p>
</li></ul>
</div>
</div>
<div>
<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_08_15027.html">Connectors</a></div>
</div>
</div>