Files
doc-exports/docs/dli/sqlreference/dli_08_15023.html
Su, Xiaomeng be9eabe464 dli_sqlreference_20250305
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com>
Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
2025-03-25 09:06:21 +00:00

203 lines
18 KiB
HTML

<a name="dli_08_15023"></a><a name="dli_08_15023"></a>
<h1 class="topictitle1">Ogg</h1>
<div id="body0000001736196742"><div class="section" id="dli_08_15023__section18476312085"><h4 class="sectiontitle">Function</h4><p id="dli_08_15023__p2036201513816"><a href="https://www.oracle.com/integration/goldengate/" target="_blank" rel="noopener noreferrer">Oracle GoldenGate</a> (a.k.a ogg) is a comprehensive software package for real-time data capture and replication in heterogeneous IT environments. The product set enables high availability solutions, real-time data integration, transactional change data capture, data replication, transformations, and verification between operational and analytical enterprise systems. Ogg provides a format schema for changelog and supports to serialize messages using JSON.</p>
<p id="dli_08_15023__p43619157810">Flink supports to interpret Ogg JSON as INSERT/UPDATE/DELETE messages into Flink SQL system. This is useful in many cases to leverage this feature, such as:</p>
<ul id="dli_08_15023__ul10361615188"><li id="dli_08_15023__li19365151684">Synchronizing incremental data from databases to other systems</li><li id="dli_08_15023__li93614151589">Auditing logs</li><li id="dli_08_15023__li936181519815">Real-time materialized views on databases</li><li id="dli_08_15023__li16366158811">Temporal join changing history of a database table and so on.</li></ul>
<p id="dli_08_15023__p13691520813">Flink also supports to encode the INSERT/UPDATE/DELETE messages in Flink SQL as Ogg JSON, and emit to external systems like Kafka. However, currently Flink cannot combine UPDATE_BEFORE and UPDATE_AFTER into a single UPDATE message. Therefore, Flink encodes UPDATE_BEFORE and UPDATE_AFTER as DELETE and INSERT Ogg messages.</p>
</div>
<div class="section" id="dli_08_15023__section122491371116"><h4 class="sectiontitle">Supported Connectors</h4><ul id="dli_08_15023__ul188074312166"><li id="dli_08_15023__li93961142143420">Kafka</li><li id="dli_08_15023__li14357112884017">FileSystem</li></ul>
</div>
<div class="section" id="dli_08_15023__section1544182953515"><h4 class="sectiontitle">Parameter Description</h4>
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15023__table12424912353" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Parameters</caption><thead align="left"><tr id="dli_08_15023__row154449183515"><th align="left" class="cellrowborder" valign="top" width="14.650000000000002%" id="mcps1.3.3.2.2.6.1.1"><p id="dli_08_15023__p1041449123517">Parameter</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="7.800000000000002%" id="mcps1.3.3.2.2.6.1.2"><p id="dli_08_15023__p104649103515">Mandatory</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="10.06%" id="mcps1.3.3.2.2.6.1.3"><p id="dli_08_15023__p54174918353">Default Value</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="9.100000000000001%" id="mcps1.3.3.2.2.6.1.4"><p id="dli_08_15023__p341349123510">Data Type</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="58.39000000000001%" id="mcps1.3.3.2.2.6.1.5"><p id="dli_08_15023__p164134923514">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_08_15023__row174164920352"><td class="cellrowborder" valign="top" width="14.650000000000002%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15023__p5630161113372">format</p>
</td>
<td class="cellrowborder" valign="top" width="7.800000000000002%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15023__p6416493352">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="10.06%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15023__p840497353">(none)</p>
</td>
<td class="cellrowborder" valign="top" width="9.100000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15023__p1951549153518">String</p>
</td>
<td class="cellrowborder" valign="top" width="58.39000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15023__p1051949143513">Specify what format to use, here should be <strong id="dli_08_15023__b203262026717">ogg-json</strong>.</p>
</td>
</tr>
<tr id="dli_08_15023__row1451849133519"><td class="cellrowborder" valign="top" width="14.650000000000002%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15023__p755864713710">ogg-json.ignore-parse-errors</p>
</td>
<td class="cellrowborder" valign="top" width="7.800000000000002%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15023__p165104993510">No</p>
</td>
<td class="cellrowborder" valign="top" width="10.06%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15023__p175194953518">false</p>
</td>
<td class="cellrowborder" valign="top" width="9.100000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15023__p951649163516">Boolean</p>
</td>
<td class="cellrowborder" valign="top" width="58.39000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15023__p1651949173511">Whether fields and rows with parse errors will be skipped or failed. The default value is <strong id="dli_08_15023__b43121719974">false</strong>, indicating that an error will be thrown. Fields are set to null in case of errors.</p>
</td>
</tr>
<tr id="dli_08_15023__row1551949163517"><td class="cellrowborder" valign="top" width="14.650000000000002%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15023__p1597572810381">debezium-json.timestamp-format.standard</p>
</td>
<td class="cellrowborder" valign="top" width="7.800000000000002%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15023__p1951949173519">No</p>
</td>
<td class="cellrowborder" valign="top" width="10.06%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15023__p13594914353">'SQL'</p>
</td>
<td class="cellrowborder" valign="top" width="9.100000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15023__p145249193516">String</p>
</td>
<td class="cellrowborder" valign="top" width="58.39000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15023__p131714472387">Input and output timestamp formats. Currently supported values are <strong id="dli_08_15023__b140677733462755">SQL</strong> and <strong id="dli_08_15023__b211956111062755">ISO-8601</strong>:</p>
<ul id="dli_08_15023__ul1831744718384"><li id="dli_08_15023__li143174472385"><strong id="dli_08_15023__b19693699946518">SQL</strong> will parse input timestamp in "yyyy-MM-dd HH:mm:ss.s{precision}" format, for example <strong id="dli_08_15023__b2669325686518">2020-12-30 12:13:14.123</strong> and output timestamp in the same format.</li><li id="dli_08_15023__li1318124773816"><strong id="dli_08_15023__b10349310156518">ISO-8601</strong> will parse input timestamp in "yyyy-MM-ddTHH:mm:ss.s{precision}" format, for example <strong id="dli_08_15023__b7602854516518">2020-12-30T12:13:14.123</strong> and output timestamp in the same format.</li></ul>
</td>
</tr>
<tr id="dli_08_15023__row155194983519"><td class="cellrowborder" valign="top" width="14.650000000000002%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15023__p923110923915">ogg-json.map-null-key.mode</p>
</td>
<td class="cellrowborder" valign="top" width="7.800000000000002%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15023__p12512491357">No</p>
</td>
<td class="cellrowborder" valign="top" width="10.06%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15023__p956497353">'FAIL'</p>
</td>
<td class="cellrowborder" valign="top" width="9.100000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15023__p2052049153516">String</p>
</td>
<td class="cellrowborder" valign="top" width="58.39000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15023__p17542202493910">Handling mode when serializing null keys for map data. Currently supported values are <strong id="dli_08_15023__b1933252121">FAIL</strong>, <strong id="dli_08_15023__b44581330131218">DROP</strong>, and <strong id="dli_08_15023__b10813183813120">LITERAL</strong>:</p>
<ul id="dli_08_15023__ul85425244394"><li id="dli_08_15023__li1054292463911">Option <strong id="dli_08_15023__b38432575125">FAIL</strong> will throw exception when encountering map with null key.</li><li id="dli_08_15023__li054262473919">Option <strong id="dli_08_15023__b129221115151319">DROP</strong> will drop null key entries for map data.</li><li id="dli_08_15023__li754202473914">Option <strong id="dli_08_15023__b476410348137">LITERAL</strong> will replace null key with string literal. The string literal is defined by <strong id="dli_08_15023__b6994194961317">ogg-json.map-null-key.literal</strong>.</li></ul>
</td>
</tr>
<tr id="dli_08_15023__row175204915359"><td class="cellrowborder" valign="top" width="14.650000000000002%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15023__p7584123763919">ogg-json.map-null-key.literal</p>
</td>
<td class="cellrowborder" valign="top" width="7.800000000000002%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15023__p205124912355">No</p>
</td>
<td class="cellrowborder" valign="top" width="10.06%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15023__p951349123519">'null'</p>
</td>
<td class="cellrowborder" valign="top" width="9.100000000000001%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15023__p115449143518">String</p>
</td>
<td class="cellrowborder" valign="top" width="58.39000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15023__p1524919352">Specify string literal to replace null key when <strong id="dli_08_15023__b11774172144">ogg-json.map-null-key.mode</strong> is <strong id="dli_08_15023__b3340142016145">LITERAL</strong>.</p>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="section" id="dli_08_15023__section125692974011"><h4 class="sectiontitle">Metadata</h4>
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15023__table183481235401" frame="border" border="1" rules="all"><caption><b>Table 2 </b>Metadata</caption><thead align="left"><tr id="dli_08_15023__row1534815234401"><th align="left" class="cellrowborder" valign="top" width="22.762276227622763%" id="mcps1.3.4.2.2.4.1.1"><p id="dli_08_15023__p153483233409">Key</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="22.522252225222523%" id="mcps1.3.4.2.2.4.1.2"><p id="dli_08_15023__p12348162311404">Data Type</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="54.71547154715471%" id="mcps1.3.4.2.2.4.1.3"><p id="dli_08_15023__p8349123164012">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_08_15023__row203491723104011"><td class="cellrowborder" valign="top" width="22.762276227622763%" headers="mcps1.3.4.2.2.4.1.1 "><p id="dli_08_15023__p834914234405">table</p>
</td>
<td class="cellrowborder" valign="top" width="22.522252225222523%" headers="mcps1.3.4.2.2.4.1.2 "><p id="dli_08_15023__p103491723184010">STRING NULL</p>
</td>
<td class="cellrowborder" valign="top" width="54.71547154715471%" headers="mcps1.3.4.2.2.4.1.3 "><p id="dli_08_15023__p173494235405">Contains fully qualified table name. The format of the fully qualified table name is <strong id="dli_08_15023__b1157153613156">CATALOG NAME.SCHEMA NAME.TABLE NAME</strong>.</p>
</td>
</tr>
<tr id="dli_08_15023__row15349162344011"><td class="cellrowborder" valign="top" width="22.762276227622763%" headers="mcps1.3.4.2.2.4.1.1 "><p id="dli_08_15023__p934919231402">primary-keys</p>
</td>
<td class="cellrowborder" valign="top" width="22.522252225222523%" headers="mcps1.3.4.2.2.4.1.2 "><p id="dli_08_15023__p133491723194015">ARRAY&lt;STRING&gt; NULL</p>
</td>
<td class="cellrowborder" valign="top" width="54.71547154715471%" headers="mcps1.3.4.2.2.4.1.3 "><p id="dli_08_15023__p1953110227456">An array variable holding the column names of the primary keys of the source table.</p>
<p id="dli_08_15023__p16349192317406">The <strong id="dli_08_15023__b17996171621612">primary-keys</strong> field is only included in the JSON output if the <strong id="dli_08_15023__b465118333165">includePrimaryKeys</strong> configuration property is set to <strong id="dli_08_15023__b944373615168">true</strong>.</p>
</td>
</tr>
<tr id="dli_08_15023__row16349172312409"><td class="cellrowborder" valign="top" width="22.762276227622763%" headers="mcps1.3.4.2.2.4.1.1 "><p id="dli_08_15023__p19349923194015">ingestion-timestamp</p>
</td>
<td class="cellrowborder" valign="top" width="22.522252225222523%" headers="mcps1.3.4.2.2.4.1.2 "><p id="dli_08_15023__p6349142319405">TIMESTAMP_LTZ(6) NULL</p>
</td>
<td class="cellrowborder" valign="top" width="54.71547154715471%" headers="mcps1.3.4.2.2.4.1.3 "><p id="dli_08_15023__p1534912232405">The timestamp at which the connector processed the event. Corresponds to the <strong id="dli_08_15023__b3758254131611">current_ts</strong> field in the Ogg record.</p>
</td>
</tr>
<tr id="dli_08_15023__row1734918231405"><td class="cellrowborder" valign="top" width="22.762276227622763%" headers="mcps1.3.4.2.2.4.1.1 "><p id="dli_08_15023__p16349623134018">event-timestamp</p>
</td>
<td class="cellrowborder" valign="top" width="22.522252225222523%" headers="mcps1.3.4.2.2.4.1.2 "><p id="dli_08_15023__p33499235404">TIMESTAMP_LTZ(6) NULL</p>
</td>
<td class="cellrowborder" valign="top" width="54.71547154715471%" headers="mcps1.3.4.2.2.4.1.3 "><p id="dli_08_15023__p1734972384012">The timestamp at which the source system created the event. Corresponds to the <strong id="dli_08_15023__b83964247174">op_ts</strong> field in the Ogg record.</p>
</td>
</tr>
</tbody>
</table>
</div>
<p id="dli_08_15023__p2080664024817">The following example shows how to access Canal metadata fields in Kafka:</p>
<pre class="screen" id="dli_08_15023__screen13902433154711">CREATE TABLE KafkaTable (
origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,
event_time TIMESTAMP(3) METADATA FROM 'value.event-timestamp' VIRTUAL,
origin_table STRING METADATA FROM 'value.table' VIRTUAL,
primary_keys ARRAY&lt;STRING&gt; METADATA FROM 'value.primary_keys' VIRTUAL,
user_id BIGINT,
item_id BIGINT,
behavior STRING
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15023__i462981854917"><strong id="dli_08_15023__b1664181810497">kafkaTopic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15023__i387653131014"><strong id="dli_08_15023__b19871053111017">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<em id="dli_08_15023__i10632184018499"><strong id="dli_08_15023__b419564015492">GroupId</strong></em>',
'scan.startup.mode' = 'earliest-offset',
'value.format' = 'ogg-json'
);</pre>
</div>
<div class="section" id="dli_08_15023__section14892577502"><h4 class="sectiontitle">Example</h4><p id="dli_08_15023__p15881132116016">Use ogg-json to read Ogg records in Kafka and output them to Print.</p>
<ol id="dli_08_15023__ol06367232518"><li id="dli_08_15023__li04031578234"><span>Create a datasource connection for the communication with the VPC and subnet where Kafka locates and bind the connection to the queue. Set a security group and inbound rule to allow access of the queue and test the connectivity of the queue using the Kafka IP address. For example, locate a general-purpose queue where the job runs and choose <strong id="dli_08_15023__b151048220242623">More</strong> &gt; <strong id="dli_08_15023__b52759973542623">Test Address Connectivity</strong> in the <strong id="dli_08_15023__b123115241942623">Operation</strong> column. If the connection is successful, the datasource is bound to the queue. Otherwise, the binding fails.</span></li><li id="dli_08_15023__li1599913011242"><span>Create a Flink OpenSource SQL job and select Flink 1.15. Copy the following statement and submit the job:</span><p><pre class="screen" id="dli_08_15023__screen177262050142516">CREATE TABLE kafkaSource (
id bigint,
name string,
description string,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15023__i1018018501627"><strong id="dli_08_15023__b5412149928">kafkaTopic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15023__i44191052793"><strong id="dli_08_15023__b144196521996">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<em id="dli_08_15023__i830645838"><strong id="dli_08_15023__b11806348316">GroupId</strong></em>',
'scan.startup.mode' = 'latest-offset',
'format' = 'ogg-json'
);
CREATE TABLE printSink (
id bigint,
name string,
description string,
weight DECIMAL(10, 2)
) WITH (
'connector' = 'print'
);
insert into printSink select * from kafkaSource;
</pre>
</p></li><li id="dli_08_15023__li185918297252"><span>Insert the data below into the appropriate Kafka topics. The data shows that the Oracle PRODUCTS table has four columns: <strong id="dli_08_15023__b15950114961515">id</strong>, <strong id="dli_08_15023__b119501449171518">name</strong>, <strong id="dli_08_15023__b17951114919151">description</strong>, and <strong id="dli_08_15023__b10951194917156">weight</strong>. This JSON message represents an update event on the PRODUCTS table, where the <strong id="dli_08_15023__b19896202781618">weight</strong> value of the row with id = 111 has been changed from 5.18 to 5.15.</span><p><pre class="screen" id="dli_08_15023__screen2451108112611">{
"before": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.18
},
"after": {
"id": 111,
"name": "scooter",
"description": "Big 2-wheel scooter",
"weight": 5.15
},
"op_type": "U",
"op_ts": "2020-05-13 15:40:06.000000",
"current_ts": "2020-05-13 15:40:07.000000",
"primary_keys": [
"id"
],
"pos": "00000000000000000000143",
"table": "PRODUCTS"
}</pre>
</p></li><li id="dli_08_15023__li4353143193117"><span>Perform the following operations to view the data result in the <strong id="dli_08_15023__b21049182289274">taskmanager.out</strong> file:</span><p><ol type="a" id="dli_08_15023__ol864115198285"><li id="dli_08_15023__li10901621122819">Log in to the DLI console. In the navigation pane, choose <strong id="dli_08_15023__b64061806591926">Job Management</strong> &gt; <strong id="dli_08_15023__b35278812591926">Flink Jobs</strong>.</li><li id="dli_08_15023__li1912163912282">Click the name of the corresponding Flink job, choose <strong id="dli_08_15023__b20131078491936">Run Log</strong>, click <strong id="dli_08_15023__b195971653591936">OBS Bucket</strong>, and locate the folder of the log you want to view according to the date.</li><li id="dli_08_15023__li0641191914285">Go to the folder of the date, find the folder whose name contains <strong id="dli_08_15023__b6651341564494">taskmanager</strong>, download the <strong id="dli_08_15023__b5417834114494">.out</strong> file, and view result logs.</li></ol>
<pre class="screen" id="dli_08_15023__screen1526419222268">-U[111, scooter, Big 2-wheel scooter, 5.18]
+U[111, scooter, Big 2-wheel scooter, 5.15]</pre>
</p></li></ol>
</div>
</div>
<div>
<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_08_15014.html">Formats</a></div>
</div>
</div>