forked from docs/doc-exports
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com> Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com> Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
263 lines
22 KiB
HTML
263 lines
22 KiB
HTML
<a name="dli_08_15017"></a><a name="dli_08_15017"></a>
|
|
|
|
<h1 class="topictitle1">Canal</h1>
|
|
<div id="body0000001310015813"><div class="section" id="dli_08_15017__section167371042163516"><h4 class="sectiontitle">Function</h4><p id="dli_08_15017__p613994415365">Canal is a Changelog Data Capture (CDC) tool that can stream changes in real-time from MySQL into other systems. Canal provides a unified format schema for changelog and supports to serialize messages using JSON and protobuf (the default format for Canal).</p>
|
|
<p id="dli_08_15017__p191398442368">Flink supports to interpret Canal JSON messages as INSERT, UPDATE, and DELETE messages into the Flink SQL system. This is useful in many cases to leverage this feature, such as:</p>
|
|
<ul id="dli_08_15017__ul7139164493616"><li id="dli_08_15017__li18139114419367">synchronizing incremental data from databases to other systems</li><li id="dli_08_15017__li1913917445364">Auditing logs</li><li id="dli_08_15017__li0139134416368">Real-time materialized view on databases</li><li id="dli_08_15017__li181391444153617">Temporal join changing history of a database table, etc.</li></ul>
|
|
<p id="dli_08_15017__p063063210210">Flink also supports to encode the INSERT, UPDATE, and DELETE messages in Flink SQL as Canal JSON messages, and emit to storage like Kafka. However, currently Flink cannot combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Canal messages.</p>
|
|
<p id="dli_08_15017__p1068612594214">For details, see <a href="https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/connectors/table/formats/canal/" target="_blank" rel="noopener noreferrer">Canal Format</a>.</p>
|
|
</div>
|
|
<div class="section" id="dli_08_15017__section8503847368"><h4 class="sectiontitle">Supported Connectors</h4><ul id="dli_08_15017__ul11921748173912"><li id="dli_08_15017__li79211348183917">Kafka</li><li id="dli_08_15017__li3545173912114">FileSystem</li></ul>
|
|
</div>
|
|
<div class="section" id="dli_08_15017__section1392435673512"><h4 class="sectiontitle">Parameters</h4>
|
|
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15017__table435879171415" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Parameter description</caption><thead align="left"><tr id="dli_08_15017__row93599914149"><th align="left" class="cellrowborder" valign="top" width="18.34%" id="mcps1.3.3.2.2.6.1.1"><p id="dli_08_15017__p12919181516141">Parameter</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="11.959999999999999%" id="mcps1.3.3.2.2.6.1.2"><p id="dli_08_15017__p4359194143">Mandatory</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="10.93%" id="mcps1.3.3.2.2.6.1.3"><p id="dli_08_15017__p635989181414">Default Value</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="13.780000000000001%" id="mcps1.3.3.2.2.6.1.4"><p id="dli_08_15017__p143594916144">Type</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="44.99%" id="mcps1.3.3.2.2.6.1.5"><p id="dli_08_15017__p93590913145">Description</p>
|
|
</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody><tr id="dli_08_15017__row1335959131413"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p15430123321510">format</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p144302033111510">Yes</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p1643043361513">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p6430103312158">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p643033311519">Format to be used. In this example.Set this parameter to <strong id="dli_08_15017__b12755964512">canal-json</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row935919951413"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p16430123351510">canal-json.ignore-parse-errors</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p154301433141513">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p11430133131513">false</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p154301933101516">Boolean</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p5430333171512">Whether fields and rows with parse errors will be skipped or failed. The default value is <strong id="dli_08_15017__b15814394712">false</strong>, indicating that an error will be thrown. Fields are set to null in case of errors.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row835959111418"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p1943063321514">canal-json.timestamp-format.standard</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p1643023351517">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p835811243316">'SQL'</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p7430433101519">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p54301433181514">Input and output timestamp formats. Currently supported values are <strong id="dli_08_15017__b20801173974119">SQL</strong> and <strong id="dli_08_15017__b1815545144115">ISO-8601</strong>:</p>
|
|
<ul id="dli_08_15017__ul14430123311518"><li id="dli_08_15017__li13430133171512"><strong id="dli_08_15017__b1989117269439">SQL</strong> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, for example <strong id="dli_08_15017__b095610444413">2020-12-30 12:13:14.123</strong> and output timestamp in the same format.</li><li id="dli_08_15017__li19430233111512"><strong id="dli_08_15017__b1696253584317">ISO-8601</strong> will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, for example <strong id="dli_08_15017__b59303490430">2020-12-30T12:13:14.123</strong> and output timestamp in the same format.</li></ul>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row1736016971410"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p743113391514">canal-json.map-null-key.mode</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p4431153361520">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p76701236153311">'FALL'</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p4431123361515">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p74311333151519">Handling mode when serializing null keys for map data. Available values are as follows:</p>
|
|
<ul id="dli_08_15017__ul14431533201515"><li id="dli_08_15017__li1143111337157"><strong id="dli_08_15017__b1321417016423">FAIL</strong> will throw exception when encountering map value with null key.</li><li id="dli_08_15017__li943113341518"><strong id="dli_08_15017__b473413410422">DROP</strong> will drop null key entries for map data.</li><li id="dli_08_15017__li1543173319157"><strong id="dli_08_15017__b9974171234218">LITERAL</strong> replaces the empty key value in the map with a string constant. The string literal is defined by <strong id="dli_08_15017__b1896016276445">canal-json.map-null-key.literal</strong> option.</li></ul>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row436119971415"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p84316338150">canal-json.map-null-key.literal</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p64311339152">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p943173315151">'null'</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p164315335154">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p184311133121512">String literal to replace null key when <strong id="dli_08_15017__b63473105523">canal-json.map-null-key.mode</strong> is <strong id="dli_08_15017__b43571212155212">LITERAL</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row103634110207"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p1545601262013">canal-json.encode.decimal-as-plain-number</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p153632011112017">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p23631611152017">false</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p636314115209">Boolean</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p136317111205">Encode all decimals as plain numbers instead of possible scientific notations. By default, decimals may be written using scientific notation. For example, <strong id="dli_08_15017__b1450924115616">0.000000027</strong> is encoded as <strong id="dli_08_15017__b1626579105615">2.7E-8</strong> by default, and will be written as <strong id="dli_08_15017__b7251152718565">0.000000027</strong> if set this parameter to <strong id="dli_08_15017__b161823016566">true</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row63611795147"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p8431033101518">canal-json.database.include</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p124311333181515">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p18431123310157">None</p>
|
|
<p id="dli_08_15017__p11431533151520"></p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p543112336155">String</p>
|
|
<p id="dli_08_15017__p1143153318159"></p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p943114330152">An optional regular expression to only read the specific databases changelog rows by regular matching the "database" meta field in the Canal record. The pattern string is compatible with Java's <a href="https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html" target="_blank" rel="noopener noreferrer">Pattern</a>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row17129103923716"><td class="cellrowborder" valign="top" width="18.34%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15017__p121300392376">canal-json.table.include</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="11.959999999999999%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15017__p31306396372">No</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="10.93%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15017__p14130133918371">None</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="13.780000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15017__p0130133953718">String</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="44.99%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15017__p113013953710">An optional regular expression to only read the specific tables changelog rows by regular matching the "table" meta field in the Canal record. The pattern string is compatible with Java's <a href="https://docs.oracle.com/javase/8/docs/api/java/util/regex/Pattern.html" target="_blank" rel="noopener noreferrer">Pattern</a>.</p>
|
|
</td>
|
|
</tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
</div>
|
|
<div class="section" id="dli_08_15017__section395645211266"><h4 class="sectiontitle">Metadata</h4><p id="dli_08_15017__p102821933121112">The following format metadata can be exposed as read-only (VIRTUAL) columns in DDL.</p>
|
|
<p id="dli_08_15017__p1579431711017">Format metadata fields are only available if the corresponding connector forwards format metadata. Currently, only the Kafka connector is able to expose metadata fields for its value format.</p>
|
|
|
|
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15017__table1268012467122" frame="border" border="1" rules="all"><caption><b>Table 2 </b>Metadata</caption><thead align="left"><tr id="dli_08_15017__row06808469125"><th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.4.4.2.4.1.1"><p id="dli_08_15017__p4680746191214">Key</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.4.4.2.4.1.2"><p id="dli_08_15017__p166809469126">Data Type</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="33.33333333333333%" id="mcps1.3.4.4.2.4.1.3"><p id="dli_08_15017__p17680174631210">Description</p>
|
|
</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody><tr id="dli_08_15017__row17680194671214"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.1 "><p id="dli_08_15017__p46801146181216">database</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.2 "><p id="dli_08_15017__p1768004616124">STRING NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.3 "><p id="dli_08_15017__p12557544182719">The originating database. Corresponds to the <strong id="dli_08_15017__b11103419919">database</strong> field in the Canal record if available.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row4680204613124"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.1 "><p id="dli_08_15017__p12681104661216">table</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.2 "><p id="dli_08_15017__p1071181962820">STRING NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.3 "><p id="dli_08_15017__p176811146151216">The originating database table. Corresponds to the <strong id="dli_08_15017__b103101416114">table</strong> field in the Canal record if available.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row1368115467123"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.1 "><p id="dli_08_15017__p18681246111220">sql-type</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.2 "><p id="dli_08_15017__p23802054162819">MAP<STRING, INT> NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.3 "><p id="dli_08_15017__p14681124671219">Map of various sql types. Corresponds to the <strong id="dli_08_15017__b15981103815118">sqlType</strong> field in the Canal record if available.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row66819460123"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.1 "><p id="dli_08_15017__p28351553292">pk-names</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.2 "><p id="dli_08_15017__p362413313304">ARRAY<STRING> NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.3 "><p id="dli_08_15017__p13681154671219">Array of primary key names. Corresponds to the <strong id="dli_08_15017__b1692185511110">pkNames</strong> field in the Canal record if available.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_08_15017__row6681184612124"><td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.1 "><p id="dli_08_15017__p368118468121">ingestion-timestamp</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.2 "><p id="dli_08_15017__p11586165243017">TIMESTAMP_LTZ(3) NULL</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="33.33333333333333%" headers="mcps1.3.4.4.2.4.1.3 "><p id="dli_08_15017__p1668117466121">The timestamp at which the connector processed the event. Corresponds to the <strong id="dli_08_15017__b5270117529">ts</strong> field in the Canal record.</p>
|
|
</td>
|
|
</tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
<p id="dli_08_15017__p2080664024817">The following example shows how to access Canal metadata fields in Kafka:</p>
|
|
<pre class="screen" id="dli_08_15017__screen953265763119">CREATE TABLE KafkaTable (
|
|
origin_database STRING METADATA FROM 'value.database' VIRTUAL,
|
|
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
|
|
origin_sql_type MAP<STRING, INT> METADATA FROM 'value.sql-type' VIRTUAL,
|
|
origin_pk_names ARRAY<STRING> METADATA FROM 'value.pk-names' VIRTUAL,
|
|
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
|
|
user_id BIGINT,
|
|
item_id BIGINT,
|
|
behavior STRING
|
|
) WITH (
|
|
'connector' = 'kafka',
|
|
'topic' = '<em id="dli_08_15017__i12943983220"><strong id="dli_08_15017__b17390158143218">kafkaTopic</strong></em>',
|
|
'properties.bootstrap.servers' = '<em id="dli_08_15017__i387653131014"><strong id="dli_08_15017__b19871053111017">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
|
|
'properties.group.id' = '<em id="dli_08_15017__i16339113417325"><strong id="dli_08_15017__b17739331328">GroupId</strong></em>',
|
|
'scan.startup.mode' = 'earliest-offset',
|
|
'value.format' = 'canal-json'
|
|
);</pre>
|
|
</div>
|
|
<div class="section" id="dli_08_15017__section1713602117369"><h4 class="sectiontitle">Example</h4><p id="dli_08_15017__p15881132116016">Use canal-json to read Canal records in Kafka and output them to Print.</p>
|
|
<ol id="dli_08_15017__ol06367232518"><li id="dli_08_15017__li04031578234"><span>Create a datasource connection for the communication with the VPC and subnet where Kafka locates and bind the connection to the queue. Set a security group and inbound rule to allow access of the queue and test the connectivity of the queue using the Kafka IP address. For example, locate a general-purpose queue where the job runs and choose <strong id="dli_08_15017__b144611019205115">More</strong> > <strong id="dli_08_15017__b846141912515">Test Address Connectivity</strong> in the <strong id="dli_08_15017__b246271912517">Operation</strong> column. If the connection is successful, the datasource is bound to the queue. Otherwise, the binding fails.</span></li><li id="dli_08_15017__li1599913011242"><span>Create a Flink OpenSource SQL job and select Flink 1.15. Copy the following statement and submit the job:</span><p><pre class="screen" id="dli_08_15017__screen177262050142516">create table kafkaSource(
|
|
id bigint,
|
|
name string,
|
|
description string,
|
|
weight DECIMAL(10, 2)
|
|
) with (
|
|
'connector' = 'kafka',
|
|
'topic' = '<yourTopic>',
|
|
'properties.group.id' = '<yourGroupId>',
|
|
'properties.bootstrap.servers' = '<yourKafkaAddress>:<yourKafkaPort>',
|
|
'scan.startup.mode' = 'latest-offset',
|
|
'format' = 'canal-json'
|
|
);
|
|
create table printSink(
|
|
id bigint,
|
|
name string,
|
|
description string,
|
|
weight DECIMAL(10, 2)
|
|
) with (
|
|
'connector' = 'print'
|
|
);
|
|
insert into printSink select * from kafkaSource;</pre>
|
|
</p></li><li id="dli_08_15017__li185918297252"><span>Insert the data below into the appropriate Kafka topics. The data shows that the MySQL products table has four columns: <strong id="dli_08_15017__b559618381017">id</strong>, <strong id="dli_08_15017__b145031840813">name</strong>, <strong id="dli_08_15017__b105621241317">description</strong>, and <strong id="dli_08_15017__b199891142711">weight</strong>. This JSON message is an update event on the products table, indicating that the value of the <strong id="dli_08_15017__b187081516728">weight</strong> field has changed from 5.15 to 5.18 for the row with id = 111.</span><p><pre class="screen" id="dli_08_15017__screen2451108112611">{
|
|
"data": [
|
|
{
|
|
"id": "111",
|
|
"name": "scooter",
|
|
"description": "Big 2-wheel scooter",
|
|
"weight": "5.18"
|
|
}
|
|
],
|
|
"database": "inventory",
|
|
"es": 1589373560000,
|
|
"id": 9,
|
|
"isDdl": false,
|
|
"mysqlType": {
|
|
"id": "INTEGER",
|
|
"name": "VARCHAR(255)",
|
|
"description": "VARCHAR(512)",
|
|
"weight": "FLOAT"
|
|
},
|
|
"old": [
|
|
{
|
|
"weight": "5.15"
|
|
}
|
|
],
|
|
"pkNames": [
|
|
"id"
|
|
],
|
|
"sql": "",
|
|
"sqlType": {
|
|
"id": 4,
|
|
"name": 12,
|
|
"description": 12,
|
|
"weight": 7
|
|
},
|
|
"table": "products",
|
|
"ts": 1589373560798,
|
|
"type": "UPDATE"
|
|
}</pre>
|
|
</p></li><li id="dli_08_15017__li4353143193117"><span>Perform the following operations to view the data result in the <strong id="dli_08_15017__b45522900892657">taskmanager.out</strong> file:</span><p><ol type="a" id="dli_08_15017__ol864115198285"><li id="dli_08_15017__li10901621122819">Log in to the DLI console. In the navigation pane, choose <strong id="dli_08_15017__b143601586191919">Job Management</strong> > <strong id="dli_08_15017__b134092248591919">Flink Jobs</strong>.</li><li id="dli_08_15017__li1912163912282">Click the name of the corresponding Flink job, choose <strong id="dli_08_15017__b142577067091930">Run Log</strong>, click <strong id="dli_08_15017__b171898183391930">OBS Bucket</strong>, and locate the folder of the log you want to view according to the date.</li><li id="dli_08_15017__li0641191914285">Go to the folder of the date, find the folder whose name contains <strong id="dli_08_15017__b141450915542751">taskmanager</strong>, download the <strong id="dli_08_15017__b130284135942751">.out</strong> file, and view result logs.</li></ol>
|
|
<pre class="screen" id="dli_08_15017__screen1526419222268">-U[111, scooter, Big 2-wheel scooter, 5.15]
|
|
+U[111, scooter, Big 2-wheel scooter, 5.18]</pre>
|
|
</p></li></ol>
|
|
<p id="dli_08_15017__p13300127285"></p>
|
|
</div>
|
|
</div>
|
|
<div>
|
|
<div class="familylinks">
|
|
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_08_15014.html">Formats</a></div>
|
|
</div>
|
|
</div>
|
|
|