forked from docs/doc-exports
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com> Co-authored-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-committed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
246 lines
55 KiB
HTML
246 lines
55 KiB
HTML
<a name="dli_09_0013"></a><a name="dli_09_0013"></a>
|
|
|
|
<h1 class="topictitle1">Reading Data from PostgreSQL CDC and Writing Data to GaussDB(DWS)</h1>
|
|
<div id="body8662426"><div class="notice" id="dli_09_0013__en-us_topic_0000001269022188_note04712015123019"><span class="noticetitle"><img src="public_sys-resources/notice_3.0-en-us.png"> </span><div class="noticebody"><p id="dli_09_0013__en-us_topic_0000001269022188_p19225112543315">This guide provides reference for Flink 1.12 only.</p>
|
|
</div></div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section10920163411416"><h4 class="sectiontitle">Description</h4><p id="dli_09_0013__en-us_topic_0000001269022188_p128093392048">Change Data Capture (CDC) can synchronize incremental changes from the source database to one or more destinations. During data synchronization, CDC processes data, for example, grouping (GROUP BY) and joining multiple tables (JOIN).</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p519175932112">This example creates a PostgreSQL CDC source table to monitor PostgreSQL data changes and insert the changed data into a GaussDB(DWS) database.</p>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section99670144912"><h4 class="sectiontitle">Prerequisites</h4><ol id="dli_09_0013__en-us_topic_0000001269022188_ol188213811126"><li id="dli_09_0013__en-us_topic_0000001269022188_li1087117102575">You have created an RDS for PostgreSQL DB instance. In this example, the RDS for PostgreSQL database version is 11.<div class="note" id="dli_09_0013__en-us_topic_0000001269022188_note158215267521"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="dli_09_0013__en-us_topic_0000001269022188_p2821102611528">The version of the RDS PostgreSQL database cannot be earlier than 11.</p>
|
|
</div></div>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li82101214183620">You have created a GaussDB(DWS) instance.</li></ol>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section12587518310"><h4 class="sectiontitle">Overall Process</h4><div class="p" id="dli_09_0013__en-us_topic_0000001269022188_p7741285314"><a href="#dli_09_0013__en-us_topic_0000001269022188_fig1691441652">Figure 1</a> shows the overall development process.<div class="fignone" id="dli_09_0013__en-us_topic_0000001269022188_fig1691441652"><a name="dli_09_0013__en-us_topic_0000001269022188_fig1691441652"></a><a name="en-us_topic_0000001269022188_fig1691441652"></a><span class="figcap"><b>Figure 1 </b>Job development process</span><br><span><img id="dli_09_0013__en-us_topic_0000001269022188_image691841454" src="en-us_image_0000001318542105.png"></span></div>
|
|
</div>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p16413127863"><a href="#dli_09_0013__en-us_topic_0000001269022188_section792923214216">Step 1: Create a Queue</a></p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p1259352616818"><a href="#dli_09_0013__en-us_topic_0000001269022188_section1627154113018">Step 2: Create an RDS PostgreSQL Database and Table</a></p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p112432312316"><a href="#dli_09_0013__en-us_topic_0000001269022188_section1744572443816">Step 3: Create a GaussDB(DWS) Database and Table</a></p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p11783144819818"><a href="#dli_09_0013__en-us_topic_0000001269022188_section074025752119">Step 4: Create an Enhanced Datasource Connection</a></p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p102618381914"><a href="#dli_09_0013__en-us_topic_0000001269022188_section12448959174212">Step 5: Run a Job</a></p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p491452441010"><a href="#dli_09_0013__en-us_topic_0000001269022188_section4387527162418">Step 6: Send Data and Query Results</a></p>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section792923214216"><a name="dli_09_0013__en-us_topic_0000001269022188_section792923214216"></a><a name="en-us_topic_0000001269022188_section792923214216"></a><h4 class="sectiontitle">Step 1: Create a Queue</h4><ol id="dli_09_0013__en-us_topic_0000001269022188_ol193907145108"><li id="dli_09_0013__en-us_topic_0000001269022188_li3390161431020">Log in to the DLI console. In the navigation pane on the left, choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b168615819316">Resources</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b16686165823118">Queue Management</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li5390171401016">On the displayed page, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b0303170193214">Buy Queue</strong> in the upper right corner.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li133901314191011">On the <strong id="dli_09_0013__en-us_topic_0000001269022188_b539611112320">Buy Queue</strong> page, set queue parameters as follows:<ul id="dli_09_0013__en-us_topic_0000001269022188_ul161521581016"><li id="dli_09_0013__en-us_topic_0000001269022188_li5992112615541"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1254154685012">Billing Mode</strong>: . </li><li id="dli_09_0013__en-us_topic_0000001269022188_li17566251558"><strong id="dli_09_0013__en-us_topic_0000001269022188_b109021116173216">Region</strong> and <strong id="dli_09_0013__en-us_topic_0000001269022188_b1090841620323">Project</strong>: Retain the default values.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li9378124110319"><strong id="dli_09_0013__en-us_topic_0000001269022188_b13138142173219">Name</strong>: Enter a queue name.<div class="note" id="dli_09_0013__en-us_topic_0000001269022188_note1523218284569"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="dli_09_0013__en-us_topic_0000001269022188_en-us_topic_0069078607_en-us_topic_0069077926_p61185513">The queue name can contain only digits, letters, and underscores (_), but cannot contain only digits or start with an underscore (_). The name must contain 1 to 128 characters.</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p6288122915199"><strong id="dli_09_0013__en-us_topic_0000001269022188_b12931823153213">The queue name is case-insensitive. Uppercase letters will be automatically converted to lowercase letters.</strong></p>
|
|
</div></div>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1761814682119"><strong id="dli_09_0013__en-us_topic_0000001269022188_b2853172453211">Type</strong>: Select <strong id="dli_09_0013__en-us_topic_0000001269022188_b17853102453217">For general purpose</strong>. Select the <strong id="dli_09_0013__en-us_topic_0000001269022188_b9801526143214">Dedicated Resource Mode</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li360481635014"><strong id="dli_09_0013__en-us_topic_0000001269022188_b4637144913509">AZ Mode</strong> and <strong id="dli_09_0013__en-us_topic_0000001269022188_b18637174914507">Specifications</strong>: Retain the default values.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li32421882515"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1451292973215">Enterprise Project</strong>: Select <strong id="dli_09_0013__en-us_topic_0000001269022188_b115137297327">default</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li6862026155115"><strong id="dli_09_0013__en-us_topic_0000001269022188_b82991630143216">Advanced Settings</strong>: Select <strong id="dli_09_0013__en-us_topic_0000001269022188_b830063012322">Custom</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li615215819015"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1920133112324">CIDR Block</strong>: Specify the queue network segment. For example, <strong id="dli_09_0013__en-us_topic_0000001269022188_b11291532153212">10.0.0.0/16</strong>.<div class="caution" id="dli_09_0013__en-us_topic_0000001269022188_note243428112912"><span class="cautiontitle"><img src="public_sys-resources/caution_3.0-en-us.png"> </span><div class="cautionbody"><p id="dli_09_0013__en-us_topic_0000001269022188_p114344818296">The CIDR block of a queue cannot overlap with the CIDR blocks of DMS Kafka and RDS for MySQL DB instances. Otherwise, datasource connections will fail to be created.</p>
|
|
</div></div>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1066019193813">Set other parameters as required.</li></ul>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li83901314181011">Click <strong id="dli_09_0013__en-us_topic_0000001269022188_b20948143616328">Buy</strong>. Confirm the configuration and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b194810363321">Submit</strong>.</li></ol>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section1627154113018"><a name="dli_09_0013__en-us_topic_0000001269022188_section1627154113018"></a><a name="en-us_topic_0000001269022188_section1627154113018"></a><h4 class="sectiontitle">Step 2: Create an RDS PostgreSQL Database and Table</h4><ol id="dli_09_0013__en-us_topic_0000001269022188_ol622615816111"><li id="dli_09_0013__en-us_topic_0000001269022188_li16226198013">Log in to the RDS console. On the displayed page, locate the target PostgreSQL DB instance and choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b5701195625020">More</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b270235617502">Log In</strong> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b670218564500">Operation</strong> column.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1371112481236">In the login dialog box displayed, enter the username and password and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b1684221963312">Log In</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li2298131648">Create a database instance and name it <strong id="dli_09_0013__en-us_topic_0000001269022188_b115734652714">testrdsdb</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li18256148105717">Create a schema named <strong id="dli_09_0013__en-us_topic_0000001269022188_b13215823132911">test</strong> for the <strong id="dli_09_0013__en-us_topic_0000001269022188_b10817728112914">testrdsdb</strong> database.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li75524472088">Choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b20894325172514">SQL Operations</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b158127294255">SQL Query</strong>. On the page displayed, create a RDS for PostgreSQL table.<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen16327121991213">create table test.cdc_order(
|
|
order_id VARCHAR,
|
|
order_channel VARCHAR,
|
|
order_time VARCHAR,
|
|
pay_amount FLOAT8,
|
|
real_pay FLOAT8,
|
|
pay_time VARCHAR,
|
|
user_id VARCHAR,
|
|
user_name VARCHAR,
|
|
area_id VARCHAR,
|
|
primary key(order_id));</pre>
|
|
<div class="p" id="dli_09_0013__en-us_topic_0000001269022188_p169158131312">Run the following statement in the PostgreSQL instance:<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen4397125413105">ALTER TABLE test.cdc_order REPLICA IDENTITY FULL;</pre>
|
|
</div>
|
|
</li></ol>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section1744572443816"><a name="dli_09_0013__en-us_topic_0000001269022188_section1744572443816"></a><a name="en-us_topic_0000001269022188_section1744572443816"></a><h4 class="sectiontitle">Step 3: Create a GaussDB(DWS) Database and Table</h4><ol id="dli_09_0013__en-us_topic_0000001269022188_ol5444161915482"><li id="dli_09_0013__en-us_topic_0000001269022188_li1644413197483">Connect to the created GaussDB(DWS) cluster.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li13452124716514">Connect to the default database <strong id="dli_09_0013__en-us_topic_0000001269022188_b3654153917403">gaussdb</strong> of a GaussDB(DWS) cluster.<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen83931272818">gsql -d gaussdb -h <em id="dli_09_0013__en-us_topic_0000001269022188_i158461540184015">Connection address of the GaussDB(DWS) cluster</em> -U dbadmin -p 8000 -W <em id="dli_09_0013__en-us_topic_0000001269022188_i084619407404">password</em> -r</pre>
|
|
<ul id="dli_09_0013__en-us_topic_0000001269022188_ul174831718681"><li id="dli_09_0013__en-us_topic_0000001269022188_li15692151710103"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1879454114018">gaussdb</strong>: Default database of the GaussDB(DWS) cluster</li><li id="dli_09_0013__en-us_topic_0000001269022188_li124831018983"><strong id="dli_09_0013__en-us_topic_0000001269022188_b69694317403">Connection address of the DWS cluster</strong>: If a public network address is used for connection, set this parameter to the public network IP address or domain name. If a private network address is used for connection, set this parameter to the private network IP address or domain name. If an ELB is used for connection, set this parameter to the ELB address.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li727319471581"><strong id="dli_09_0013__en-us_topic_0000001269022188_b412854654020">dbadmin</strong>: Default administrator username used during cluster creation</li><li id="dli_09_0013__en-us_topic_0000001269022188_li4715191411119"><strong id="dli_09_0013__en-us_topic_0000001269022188_b10257154712404">-W</strong>: Default password of the administrator</li></ul>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1024132193916">Run the following command to create the <strong id="dli_09_0013__en-us_topic_0000001269022188_b14268481408">testdwsdb</strong> database:<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen6700124819131">CREATE DATABASE testdwsdb;</pre>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li18700124812130">Run the following command to exit the <strong id="dli_09_0013__en-us_topic_0000001269022188_b1345114503401">gaussdb</strong> database and connect to <strong id="dli_09_0013__en-us_topic_0000001269022188_b24512503408">testdwsdb</strong>:<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen184211821618">\q
|
|
gsql -d testdwsdb -h <em id="dli_09_0013__en-us_topic_0000001269022188_i323835244014">Connection address of the GaussDB(DWS) cluster</em> -U dbadmin -p 8000 -W <em id="dli_09_0013__en-us_topic_0000001269022188_i152387522401">password</em> -r</pre>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1996848151611">Run the following commands to create a table:<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen1610120280179">create schema test;
|
|
set current_schema= test;
|
|
drop table if exists dws_order;
|
|
CREATE TABLE dws_order
|
|
(
|
|
order_id VARCHAR,
|
|
order_channel VARCHAR,
|
|
order_time VARCHAR,
|
|
pay_amount FLOAT8,
|
|
real_pay FLOAT8,
|
|
pay_time VARCHAR,
|
|
user_id VARCHAR,
|
|
user_name VARCHAR,
|
|
area_id VARCHAR
|
|
);</pre>
|
|
</li></ol>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section074025752119"><a name="dli_09_0013__en-us_topic_0000001269022188_section074025752119"></a><a name="en-us_topic_0000001269022188_section074025752119"></a><h4 class="sectiontitle">Step 4: Create an Enhanced Datasource Connection</h4><ul id="dli_09_0013__en-us_topic_0000001269022188_ul1231663714228"><li id="dli_09_0013__en-us_topic_0000001269022188_li193161137122213"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1377610596407">Connecting DLI to RDS</strong><ol id="dli_09_0013__en-us_topic_0000001269022188_ol24611049949"><li id="dli_09_0013__en-us_topic_0000001269022188_li876915469390">Go to the RDS console, click the name of the target RDS DB instance on the <strong id="dli_09_0013__en-us_topic_0000001269022188_b492620019415">Instances</strong> page. Basic information of the instance is displayed.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1976917464396"><a name="dli_09_0013__en-us_topic_0000001269022188_li1976917464396"></a><a name="en-us_topic_0000001269022188_li1976917464396"></a>In the <strong id="dli_09_0013__en-us_topic_0000001269022188_b169333111412">Connection Information</strong> pane, obtain the floating IP address, database port, VPC, and subnet.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li11769194618399">Click the security group name. On the displayed page, click the <strong id="dli_09_0013__en-us_topic_0000001269022188_b184019394119">Inbound Rules</strong> tab and add a rule to allow access from DLI queues. For example, if the CIDR block of the queue is 10.0.0.0/16, set <strong id="dli_09_0013__en-us_topic_0000001269022188_b14989436417">Priority</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b10989113194112">1</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b1098943184118">Action</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b209898314111">Allow</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b109901339411">Protocol</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b7990163154119">TCP</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b109903314417">Type</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b139906312416">IPv4</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b49913319414">Source</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b19991123134111">10.0.0.0/16</strong>, and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b29910313417">OK</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li27691446183918">Log in to the DLI management console. In the navigation pane on the left, choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b1095954144118">Datasource Connections</strong>. On the displayed page, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b12960146415">Create</strong> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b1696010419414">Enhanced</strong> tab.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li77695469394">In the displayed dialog box, set the following parameters: For details, see the following section:<ul id="dli_09_0013__en-us_topic_0000001269022188_ul9769174683913"><li id="dli_09_0013__en-us_topic_0000001269022188_li4769134623917"><strong id="dli_09_0013__en-us_topic_0000001269022188_b092419711414">Connection Name</strong>: Enter a name for the enhanced datasource connection. For this example, enter <strong id="dli_09_0013__en-us_topic_0000001269022188_b081219864110">dli_rds</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li18769114618392"><strong id="dli_09_0013__en-us_topic_0000001269022188_b159981013419">Resource Pool</strong>: Select the name of the queue created in <a href="#dli_09_0013__en-us_topic_0000001269022188_section792923214216">Step 1: Create a Queue</a>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1776916465398"><strong id="dli_09_0013__en-us_topic_0000001269022188_b18880812124118">VPC</strong>: Select the VPC of the RDS DB instance.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li11769134616395"><strong id="dli_09_0013__en-us_topic_0000001269022188_b7916121394119">Subnet</strong>: Select the subnet of RDS DB instance.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li276944623911">Set other parameters as you need.</li></ul>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p11769194618394">Click <strong id="dli_09_0013__en-us_topic_0000001269022188_b14160161654115">OK</strong>. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to <strong id="dli_09_0013__en-us_topic_0000001269022188_b199421714114">Active</strong>.</p>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1376924683912">In the navigation pane on the left, choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b112821118194111">Resources</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b202831918154114">Queue Management</strong>. On the page displayed, locate the queue you created in <a href="#dli_09_0013__en-us_topic_0000001269022188_section792923214216">Step 1: Create a Queue</a>, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b5283101894118">More</strong> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b15676126152017">Operation</strong> column, and select <strong id="dli_09_0013__en-us_topic_0000001269022188_b2028461854119">Test Address Connectivity</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li976911460392">In the displayed dialog box, enter <em id="dli_09_0013__en-us_topic_0000001269022188_i1935222013413">floating IP address</em><strong id="dli_09_0013__en-us_topic_0000001269022188_b14352172064114">:</strong><em id="dli_09_0013__en-us_topic_0000001269022188_i13532207418">database port</em> of the RDS DB instance you have obtained in <a href="#dli_09_0013__en-us_topic_0000001269022188_li1976917464396">2</a> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b1235332012412">Address</strong> box and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b23536207414">Test</strong> to check whether the database is reachable.</li></ol>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li474495213221"><strong id="dli_09_0013__en-us_topic_0000001269022188_b165561021184110">Connecting DLI to GaussDB(DWS)</strong><ol id="dli_09_0013__en-us_topic_0000001269022188_ol17815135442310"><li id="dli_09_0013__en-us_topic_0000001269022188_li14815454112312">On the GaussDB(DWS) management console, choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b155381922164111">Clusters</strong>. On the displayed page, click the name of the created GaussDB(DWS) cluster to view basic information.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li19666016361"><a name="dli_09_0013__en-us_topic_0000001269022188_li19666016361"></a><a name="en-us_topic_0000001269022188_li19666016361"></a>In the Basic Information tab, locate the <strong id="dli_09_0013__en-us_topic_0000001269022188_b18744182344119">Database Attributes</strong> pane and obtain the private IP address and port number of the DB instance. In the <strong id="dli_09_0013__en-us_topic_0000001269022188_b2074518235412">Network</strong> pane, obtain VPC, and subnet information.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li4233434123610">Click the security group name. On the displayed page, click the <strong id="dli_09_0013__en-us_topic_0000001269022188_b15812624124119">Inbound Rules</strong> tab and add a rule to allow access from DLI queues. For example, if the CIDR block of the queue is 10.0.0.0/16, set <strong id="dli_09_0013__en-us_topic_0000001269022188_b185626104111">Priority</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b78552614113">1</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b148620260414">Action</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b1986526144112">Allow</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b1186112612418">Protocol</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b786102664110">TCP</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b386526104110">Type</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b787132611413">IPv4</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b1887112614417">Source</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b1687226144110">10.0.0.0/16</strong>, and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b18875267416">OK</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li14803182620216">Check whether the RDS instance and GaussDB(DWS) instance are in the same VPC and subnet.<ol type="a" id="dli_09_0013__en-us_topic_0000001269022188_ol184431423137"><li id="dli_09_0013__en-us_topic_0000001269022188_li957111932">If they are, go to <a href="#dli_09_0013__en-us_topic_0000001269022188_li9816175412318">7</a>. You do not need to create an enhanced datasource connection again.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1086652519411">If they are not, go to <a href="#dli_09_0013__en-us_topic_0000001269022188_li11976319011">5</a>. Create an enhanced datasource connection to connect RDS to the subnet where the GaussDB(DWS) instance locates.</li></ol>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li11976319011"><a name="dli_09_0013__en-us_topic_0000001269022188_li11976319011"></a><a name="en-us_topic_0000001269022188_li11976319011"></a>Log in to the DLI management console. In the navigation pane on the left, choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b164221432174120">Datasource Connections</strong>. On the displayed page, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b1142216320412">Create</strong> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b11423432144114">Enhanced</strong> tab.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li198151354192319">In the displayed dialog box, set the following parameters: For details, see the following section:<ul id="dli_09_0013__en-us_topic_0000001269022188_ul17815125415233"><li id="dli_09_0013__en-us_topic_0000001269022188_li1181518543233"><strong id="dli_09_0013__en-us_topic_0000001269022188_b122261237114118">Connection Name</strong>: Enter a name for the enhanced datasource connection. For this example, enter <strong id="dli_09_0013__en-us_topic_0000001269022188_b32001388411">dli_dws</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li681518542232"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1712193914417">Resource Pool</strong>: Select the name of the queue created in <a href="#dli_09_0013__en-us_topic_0000001269022188_section792923214216">Step 1: Create a Queue</a>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li58162542235"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1561540154113">VPC</strong>: Select the VPC of the GaussDB(DWS) instance.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li11816105432317"><strong id="dli_09_0013__en-us_topic_0000001269022188_b44148414413">Subnet</strong>: Select the subnet of GaussDB(DWS) instance.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li781675472310">Set other parameters as you need.</li></ul>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p1081617549235">Click <strong id="dli_09_0013__en-us_topic_0000001269022188_b17601744104119">OK</strong>. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to <strong id="dli_09_0013__en-us_topic_0000001269022188_b161081745144118">Active</strong>.</p>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li9816175412318"><a name="dli_09_0013__en-us_topic_0000001269022188_li9816175412318"></a><a name="en-us_topic_0000001269022188_li9816175412318"></a>In the navigation pane on the left, choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b231817203206">Resources</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b031913207205">Queue Management</strong>. On the page displayed, locate the queue you created in <a href="#dli_09_0013__en-us_topic_0000001269022188_section792923214216">Step 1: Create a Queue</a>, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b63204207207">More</strong> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b18321620112019">Operation</strong> column, and select <strong id="dli_09_0013__en-us_topic_0000001269022188_b1332102052013">Test Address Connectivity</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li7816454162319">In the displayed dialog box, enter <em id="dli_09_0013__en-us_topic_0000001269022188_i15228144819417">floating IP address</em><strong id="dli_09_0013__en-us_topic_0000001269022188_b1229144812417">:</strong><em id="dli_09_0013__en-us_topic_0000001269022188_i3229104814113">database port</em> of the GaussDB(DWS) instance you have obtained in <a href="#dli_09_0013__en-us_topic_0000001269022188_li19666016361">2</a> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b1422915484417">Address</strong> box and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b22301248134115">Test</strong> to check whether the database is reachable.</li></ol>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section12448959174212"><a name="dli_09_0013__en-us_topic_0000001269022188_section12448959174212"></a><a name="en-us_topic_0000001269022188_section12448959174212"></a><h4 class="sectiontitle">Step 5: Run a Job</h4><ol id="dli_09_0013__en-us_topic_0000001269022188_ol1313811362437"><li id="dli_09_0013__en-us_topic_0000001269022188_li219661215114">On the DLI management console, choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b163161502417">Job Management</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b73164505412">Flink Jobs</strong>. On the <strong id="dli_09_0013__en-us_topic_0000001269022188_b33176502417">Flink Jobs</strong> page, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b2317165074112">Create Job</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li18197181225114">In the <strong id="dli_09_0013__en-us_topic_0000001269022188_b112511525411"> Create Job</strong> dialog box, set <strong id="dli_09_0013__en-us_topic_0000001269022188_b112518523419">Type</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b122527524414">Flink OpenSource SQL</strong> and <strong id="dli_09_0013__en-us_topic_0000001269022188_b02521252154117">Name</strong> to <strong id="dli_09_0013__en-us_topic_0000001269022188_b132529529411">FlinkCDCPostgreDWS</strong>. Click <strong id="dli_09_0013__en-us_topic_0000001269022188_b10279195344112">OK</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li119731295112">On the job editing page, set the following parameters and retain the default values of other parameters.<ul id="dli_09_0013__en-us_topic_0000001269022188_ul1970291612112"><li id="dli_09_0013__en-us_topic_0000001269022188_li6702216152118"><strong id="dli_09_0013__en-us_topic_0000001269022188_b126592416424">Queue</strong>: Select the queue created in <a href="#dli_09_0013__en-us_topic_0000001269022188_section792923214216">Step 1: Create a Queue</a>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1629221563615"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1774112515422">Flink Version</strong>: Select <strong id="dli_09_0013__en-us_topic_0000001269022188_b4741359421">1.12</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li84401118192110"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1041117184220">Save Job Log</strong>: Enable this function.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li12684193718212"><strong id="dli_09_0013__en-us_topic_0000001269022188_b109603764211">OBS Bucket</strong>: Select an OBS bucket for storing job logs and grant access permissions of the OBS bucket as prompted.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1382275713215"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1289614844212">Enable Checkpointing</strong>: Enable this function.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1479717252318">Enter a SQL statement in the editing pane. The following is an example. Modify the parameters in bold as you need.<div class="note" id="dli_09_0013__en-us_topic_0000001269022188_note16434951143717"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="dli_09_0013__en-us_topic_0000001269022188_p1268724365519">In this example, the syntax version of Flink OpenSource SQL is 1.12. In this example, the data source is Kafka and the result data is written to Elasticsearch.</p>
|
|
</div></div>
|
|
<div class="p" id="dli_09_0013__en-us_topic_0000001269022188_p14169151163511">
|
|
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_09_0013__en-us_topic_0000001269022188_table1925783111356" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Job running parameters</caption><thead align="left"><tr id="dli_09_0013__en-us_topic_0000001269022188_row1254731123515"><th align="left" class="cellrowborder" valign="top" width="22.189999999999998%" id="mcps1.3.9.2.3.1.6.2.1.2.3.1.1"><p id="dli_09_0013__en-us_topic_0000001269022188_p5254203110353">Parameter</p>
|
|
</th>
|
|
<th align="left" class="cellrowborder" valign="top" width="77.81%" id="mcps1.3.9.2.3.1.6.2.1.2.3.1.2"><p id="dli_09_0013__en-us_topic_0000001269022188_p132542317359">Description</p>
|
|
</th>
|
|
</tr>
|
|
</thead>
|
|
<tbody><tr id="dli_09_0013__en-us_topic_0000001269022188_row19255143183519"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p1254031173516">Queue</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p725473133515">A shared queue is selected by default. You can select a CCE queue with dedicated resources and configure the following parameters:</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p122545319350"><strong id="dli_09_0013__en-us_topic_0000001269022188_b75498457617">UDF Jar</strong>: UDF Jar file. Before selecting such a file, upload the corresponding JAR file to the OBS bucket and choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b1141343013234">Data Management</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b14880103432313">Package Management</strong> to create a package. For details, see .</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p14254153133510">In SQL, you can call a UDF that is inserted into a JAR file.</p>
|
|
<div class="note" id="dli_09_0013__en-us_topic_0000001269022188_note92551631133510"><span class="notetitle"> NOTE: </span><div class="notebody"><p id="dli_09_0013__en-us_topic_0000001269022188_p1325416316353">When creating a job, a sub-user can only select the queue that has been allocated to the user.</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p2255331103514">If the remaining capacity of the selected queue cannot meet the job requirements, the system automatically scales up the capacity and you will be billed based on the increased capacity. When a queue is idle, the system automatically scales in its capacity.</p>
|
|
</div></div>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row122559311359"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p10255193113511">CUs</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p1255163163512">Sum of the number of compute units and job manager CUs of DLI. One CU equals 1 vCPU and 4 GB.</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p1225583173512">The value is the number of CUs required for job running and cannot exceed the number of CUs in the bound queue.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row1725523133519"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p1255131123512">Job Manager CUs</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p19255183117357">Number of CUs of the management unit.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row825511310357"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p182556311353">Parallelism</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p925514316359">Maximum number of Flink OpenSource SQL jobs that can run at the same time.</p>
|
|
<div class="note" id="dli_09_0013__en-us_topic_0000001269022188_note32551631153518"><span class="notetitle"> NOTE: </span><div class="notebody"><p id="dli_09_0013__en-us_topic_0000001269022188_p825593111356">This value cannot be greater than four times the compute units (number of CUs minus the number of JobManager CUs).</p>
|
|
</div></div>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row1225553112352"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p725514313359">Task Manager Configuration</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p11255173113510">Whether to set Task Manager resource parameters.</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p10255131123514">If this option is selected, you need to set the following parameters:</p>
|
|
<ul id="dli_09_0013__en-us_topic_0000001269022188_ul1625513314356"><li id="dli_09_0013__en-us_topic_0000001269022188_li22551331193514"><strong id="dli_09_0013__en-us_topic_0000001269022188_b349943863413">CU(s) per TM</strong>: Number of resources occupied by each Task Manager.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li5255123193514"><strong id="dli_09_0013__en-us_topic_0000001269022188_b78591253203412">Slot(s) per TM</strong>: Number of slots contained in each Task Manager.</li></ul>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row625512319352"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p18255113119359">OBS Bucket</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p6255133116358">OBS bucket to store job logs and checkpoint information. If the selected OBS bucket is not authorized, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b282165215351">Authorize</strong>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row6255931163519"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p0255173112359">Save Job Log</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p325563115357">Whether to save job run logs to OBS. The logs are saved in <em id="dli_09_0013__en-us_topic_0000001269022188_i36572038123616">Bucket name</em><strong id="dli_09_0013__en-us_topic_0000001269022188_b1865873813362">/jobs/logs/</strong><em id="dli_09_0013__en-us_topic_0000001269022188_i1465873811369">Directory starting with the job ID</em>.</p>
|
|
<div class="caution" id="dli_09_0013__en-us_topic_0000001269022188_note16255113133514"><span class="cautiontitle"> CAUTION: </span><div class="cautionbody"><p id="dli_09_0013__en-us_topic_0000001269022188_p152551831193514">You are advised to configure this parameter. Otherwise, no run log is generated after the job is executed. If the job fails, the run log cannot be obtained for fault locating.</p>
|
|
</div></div>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p1825503119358">If this option is selected, you need to set the following parameters:</p>
|
|
<div class="p" id="dli_09_0013__en-us_topic_0000001269022188_p1025553110354"><strong id="dli_09_0013__en-us_topic_0000001269022188_b18212124523718">OBS Bucket</strong>: Select an OBS bucket to store user job logs. If the selected OBS bucket is not authorized, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b6457952173513">Authorize</strong>.<div class="note" id="dli_09_0013__en-us_topic_0000001269022188_note14255133183519"><span class="notetitle"> NOTE: </span><div class="notebody"><p id="dli_09_0013__en-us_topic_0000001269022188_p5255153114356">If <strong id="dli_09_0013__en-us_topic_0000001269022188_b8596205373712">Enable Checkpointing</strong> and <strong id="dli_09_0013__en-us_topic_0000001269022188_b65965533371">Save Job Log</strong> are both selected, you only need to authorize OBS once.</p>
|
|
</div></div>
|
|
</div>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row025693133516"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p19255133118359">Alarm Generation upon Job Exception</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p172561531123513">Whether to notify users of any job exceptions, such as running exceptions or arrears, via SMS or email.</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p72561331133510">If this option is selected, you need to set the following parameters:</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p2256113153518"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1079393834810">SMN Topic</strong></p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p3256131143518">Select a user-defined SMN topic. For details about how to create a custom SMN topic, see "Creating a Topic" in <em id="dli_09_0013__en-us_topic_0000001269022188_i169419461485">Simple Message Notification User Guide</em>.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row102561331123515"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p02561131113518">Enable Checkpointing</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p82561431183519">Whether to enable job snapshots. If this function is enabled, jobs can be restored based on checkpoints.</p>
|
|
<div class="p" id="dli_09_0013__en-us_topic_0000001269022188_p22561631183513">If this option is selected, you need to set the following parameters:<ul id="dli_09_0013__en-us_topic_0000001269022188_ul1925616315350"><li id="dli_09_0013__en-us_topic_0000001269022188_li17256103117357"><strong id="dli_09_0013__en-us_topic_0000001269022188_b2484135195314">Checkpoint Interval</strong>: interval for creating checkpoints, in seconds. The value ranges from 1 to 999999, and the default value is <strong id="dli_09_0013__en-us_topic_0000001269022188_b148435165315">30</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li825673113354"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1830031225410">Checkpoint Mode</strong>: checkpointing mode, which can be set to either of the following values:<ul id="dli_09_0013__en-us_topic_0000001269022188_ul17256113153518"><li id="dli_09_0013__en-us_topic_0000001269022188_li182561331113516"><strong id="dli_09_0013__en-us_topic_0000001269022188_b132713015512">At least once</strong>: Events are processed at least once.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1725623110350"><strong id="dli_09_0013__en-us_topic_0000001269022188_b42556405512">Exactly once</strong>: Events are processed only once.</li></ul>
|
|
</li></ul>
|
|
<ul id="dli_09_0013__en-us_topic_0000001269022188_ul2256331193516"><li id="dli_09_0013__en-us_topic_0000001269022188_li1225693173519"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1821931345516">OBS Bucket</strong>: Select an OBS bucket to store your checkpoints. If the selected OBS bucket is not authorized, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b114601852203511">Authorize</strong>.<div class="p" id="dli_09_0013__en-us_topic_0000001269022188_p625610316357">Checkpoints are saved in <em id="dli_09_0013__en-us_topic_0000001269022188_i15248162055513">Bucket name</em><strong id="dli_09_0013__en-us_topic_0000001269022188_b52491020105511">/jobs/checkpoint/</strong><em id="dli_09_0013__en-us_topic_0000001269022188_i11249520105519">Directory starting with the job ID</em>.<div class="note" id="dli_09_0013__en-us_topic_0000001269022188_note7256531193517"><span class="notetitle"> NOTE: </span><div class="notebody"><p id="dli_09_0013__en-us_topic_0000001269022188_p1256123118352">If <strong id="dli_09_0013__en-us_topic_0000001269022188_b65961292468">Enable Checkpointing</strong> and <strong id="dli_09_0013__en-us_topic_0000001269022188_b8596129114615">Save Job Log</strong> are both selected, you only need to authorize OBS once.</p>
|
|
</div></div>
|
|
</div>
|
|
</li></ul>
|
|
</div>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row225717315357"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p725683120355">Auto Restart upon Exception</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p72561311352">Whether to enable automatic restart. If this function is enabled, jobs will be automatically restarted and restored when exceptions occur.</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p2256113153515">If this option is selected, you need to set the following parameters:</p>
|
|
<ul id="dli_09_0013__en-us_topic_0000001269022188_ul142574316354"><li id="dli_09_0013__en-us_topic_0000001269022188_li132571031133516"><strong id="dli_09_0013__en-us_topic_0000001269022188_b147517401302">Max. Retry Attempts</strong>: maximum number of retries upon an exception. The unit is times/hour.<ul id="dli_09_0013__en-us_topic_0000001269022188_ul22563319355"><li id="dli_09_0013__en-us_topic_0000001269022188_li1425643113357"><strong id="dli_09_0013__en-us_topic_0000001269022188_b13902130115">Unlimited</strong>: The number of retries is unlimited.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li11256931193514"><strong id="dli_09_0013__en-us_topic_0000001269022188_b152101549408">Limited</strong>: The number of retries is user-defined.</li></ul>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li172571231103515"><strong id="dli_09_0013__en-us_topic_0000001269022188_b7931161914610">Restore Job from Checkpoint</strong>: This parameter is available only when <strong id="dli_09_0013__en-us_topic_0000001269022188_b12931619124615">Enable Checkpointing</strong> is selected.</li></ul>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row112571031113511"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p20257531103512">Idle State Retention Time</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p11257123117353">How long the state of a key is retained without being updated before it is removed in <strong id="dli_09_0013__en-us_topic_0000001269022188_b151161761227">GroupBy</strong> or <strong id="dli_09_0013__en-us_topic_0000001269022188_b0116961722">Window</strong>. The default value is 1 hour.</p>
|
|
</td>
|
|
</tr>
|
|
<tr id="dli_09_0013__en-us_topic_0000001269022188_row1225719311359"><td class="cellrowborder" valign="top" width="22.189999999999998%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.1 "><p id="dli_09_0013__en-us_topic_0000001269022188_p82579313352">Dirty Data Policy</p>
|
|
</td>
|
|
<td class="cellrowborder" valign="top" width="77.81%" headers="mcps1.3.9.2.3.1.6.2.1.2.3.1.2 "><p id="dli_09_0013__en-us_topic_0000001269022188_p15257731143519">Policy for processing dirty data. The following policies are supported: <span class="parmvalue" id="dli_09_0013__en-us_topic_0000001269022188_parmvalue37645421328"><b>Ignore</b></span>, <span class="parmvalue" id="dli_09_0013__en-us_topic_0000001269022188_parmvalue1276416428215"><b>Trigger a job exception</b></span>, and <span class="parmvalue" id="dli_09_0013__en-us_topic_0000001269022188_parmvalue187646420218"><b>Save</b></span>.</p>
|
|
<p id="dli_09_0013__en-us_topic_0000001269022188_p225753110359">If you set this field to <strong id="dli_09_0013__en-us_topic_0000001269022188_b16546749127">Save</strong>, <strong id="dli_09_0013__en-us_topic_0000001269022188_b75466491322">Dirty Data Dump Address</strong> must be set. Click the address box to select the OBS path for storing dirty data.</p>
|
|
</td>
|
|
</tr>
|
|
</tbody>
|
|
</table>
|
|
</div>
|
|
<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen453112325017">create table PostgreCdcSource(
|
|
order_id string,
|
|
order_channel string,
|
|
order_time string,
|
|
pay_amount double,
|
|
real_pay double,
|
|
pay_time string,
|
|
user_id string,
|
|
user_name string,
|
|
area_id STRING,
|
|
primary key (order_id) not enforced
|
|
) with (
|
|
'connector' = 'postgres-cdc',
|
|
'hostname' = ' <em id="dli_09_0013__en-us_topic_0000001269022188_i15219163134214"><strong id="dli_09_0013__en-us_topic_0000001269022188_b202126312427">192.168.15.153</strong></em>',--IP address of the PostgreSQL instance
|
|
'port'= ' <em id="dli_09_0013__en-us_topic_0000001269022188_i199531844104212"><strong id="dli_09_0013__en-us_topic_0000001269022188_b10953164419423">5432</strong></em>',--Port number of the PostgreSQL instance
|
|
'pwd_auth_name'= '<em id="dli_09_0013__en-us_topic_0000001269022188_i1593933296114526"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1200313730114526">xxxxx</strong></em>', -- Name of the datasource authentication of the password type created on DLI. If datasource authentication is used, you do not need to set the username and password for the job.
|
|
'database-name' = ' <em id="dli_09_0013__en-us_topic_0000001269022188_i156061419184317"><strong id="dli_09_0013__en-us_topic_0000001269022188_b16061919184314">testrdsdb</strong></em>',--Database name of the PostgreSQL instance
|
|
'schema-name' = ' <strong id="dli_09_0013__en-us_topic_0000001269022188_b144461643155114">test</strong>',-- Schema in the PostgreSQL database
|
|
'table-name' = ' <strong id="dli_09_0013__en-us_topic_0000001269022188_b1243514555113">cdc_order</strong>'--Table name in the PostgreSQL database
|
|
);
|
|
|
|
create table dwsSink(
|
|
order_id string,
|
|
order_channel string,
|
|
order_time string,
|
|
pay_amount double,
|
|
real_pay double,
|
|
pay_time string,
|
|
user_id string,
|
|
user_name string,
|
|
area_id STRING,
|
|
primary key(order_id) not enforced
|
|
) with (
|
|
'connector' = 'gaussdb',
|
|
'driver' = 'com.gauss200.jdbc.Driver',
|
|
'url'='jdbc:gaussdb://<strong id="dli_09_0013__en-us_topic_0000001269022188_b286118324416"><em id="dli_09_0013__en-us_topic_0000001269022188_i198541035445">192.168.168.16:8000</em>/testdwsdb</strong> ', ---192.168.168.16:8000 indicates the internal IP address and port of the GaussDB(DWS) instance. testdwsdb indicates the name of the created GaussDB(DWS) database.
|
|
'table-name' = ' <strong id="dli_09_0013__en-us_topic_0000001269022188_b167231870441">test\".\"dws_order</strong>', ---test indicates the schema of the created GaussDB(DWS) table, and dws_order indicates the GaussDB(DWS) table name.
|
|
'username' = '<em id="dli_09_0013__en-us_topic_0000001269022188_i1083622154414"><strong id="dli_09_0013__en-us_topic_0000001269022188_b083611211443">xxx</strong></em><em id="dli_09_0013__en-us_topic_0000001269022188_i153493725210"><strong id="dli_09_0013__en-us_topic_0000001269022188_b1434153775218">xx</strong></em>',--Username of the GaussDB(DWS) instance
|
|
'password' = '<em id="dli_09_0013__en-us_topic_0000001269022188_i16817143344413"><strong id="dli_09_0013__en-us_topic_0000001269022188_b781115334448">xxx</strong></em><em id="dli_09_0013__en-us_topic_0000001269022188_i1239163995212"><strong id="dli_09_0013__en-us_topic_0000001269022188_b439113394526">xx</strong></em>',--Password of the GaussDB(DWS) instance
|
|
'write.mode' = 'insert'
|
|
);
|
|
|
|
insert into dwsSink select * from PostgreCdcSource where pay_amount > 100;
|
|
</pre>
|
|
</div>
|
|
</li></ul>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li219761217517">Click <strong id="dli_09_0013__en-us_topic_0000001269022188_b1987610447446">Check Semantic</strong> and ensure that the SQL statement passes the check. Click <strong id="dli_09_0013__en-us_topic_0000001269022188_b168715453444">Save</strong>. Click <strong id="dli_09_0013__en-us_topic_0000001269022188_b2084394674416">Start</strong>, confirm the job parameters, and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b1384414620445">Start Now</strong> to execute the job. Wait until the job status changes to <strong id="dli_09_0013__en-us_topic_0000001269022188_b18650547184413">Running</strong>.</li></ol>
|
|
</div>
|
|
<div class="section" id="dli_09_0013__en-us_topic_0000001269022188_section4387527162418"><a name="dli_09_0013__en-us_topic_0000001269022188_section4387527162418"></a><a name="en-us_topic_0000001269022188_section4387527162418"></a><h4 class="sectiontitle">Step 6: Send Data and Query Results</h4><ol id="dli_09_0013__en-us_topic_0000001269022188_ol0558165272410"><li id="dli_09_0013__en-us_topic_0000001269022188_li1942135894212">Log in to the RDS console. On the displayed page, locate the target PostgreSQL DB instance and choose <strong id="dli_09_0013__en-us_topic_0000001269022188_b1349814499445">More</strong> > <strong id="dli_09_0013__en-us_topic_0000001269022188_b549844924410">Log In</strong> in the <strong id="dli_09_0013__en-us_topic_0000001269022188_b2499134904412">Operation</strong> column.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li3202121184911">On the displayed login dialog box, enter the username and password and click <strong id="dli_09_0013__en-us_topic_0000001269022188_b1752815110455">Log In</strong>.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1247361815504">In the <strong id="dli_09_0013__en-us_topic_0000001269022188_b13287115210449">Operation</strong> column of row where the created database locates, click <strong id="dli_09_0013__en-us_topic_0000001269022188_b1528713528449">SQL Window</strong> and enter the following statement to create a table and insert data to the table:<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen15137185717515">insert into test.cdc_order values
|
|
('202103241000000001','webShop','2021-03-24 10:00:00','50.00','100.00','2021-03-24 10:02:03','0001','Alice','330106'),
|
|
('202103251606060001','appShop','2021-03-24 12:06:06','200.00','180.00','2021-03-24 16:10:06','0002','Jason','330106'),
|
|
('202103261000000001','webShop','2021-03-24 14:03:00','300.00','100.00','2021-03-24 10:02:03','0003','Lily','330106'),
|
|
('202103271606060001','appShop','2021-03-24 16:36:06','99.00','150.00','2021-03-24 16:10:06','0001','Henry','330106');</pre>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1113716579515">Connect to the created GaussDB(DWS) cluster.</li><li id="dli_09_0013__en-us_topic_0000001269022188_li1047472222014">Connect to the default database <strong id="dli_09_0013__en-us_topic_0000001269022188_b15308915174516">testdwsdb</strong> of a GaussDB(DWS) cluster.<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen58207310201">gsql -d testdwsdb -h <em id="dli_09_0013__en-us_topic_0000001269022188_i1259291684515">Connection address of the GaussDB(DWS) cluster</em> -U dbadmin -p 8000 -W <em id="dli_09_0013__en-us_topic_0000001269022188_i185921316164518">password</em> -r</pre>
|
|
</li><li id="dli_09_0013__en-us_topic_0000001269022188_li4931152514209">Run the following statements to query table data:<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen893492516205">select * from test.dws_order;</pre>
|
|
<div class="p" id="dli_09_0013__en-us_topic_0000001269022188_p13934102532017">The query result is as follows:<pre class="screen" id="dli_09_0013__en-us_topic_0000001269022188_screen4934325122014">order_channel order_channel order_time pay_amount real_pay pay_time user_id user_name area_id
|
|
202103251606060001 appShop 2021-03-24 12:06:06 200.0 180.0 2021-03-24 16:10:06 0002 Jason 330106
|
|
202103261000000001 webShop 2021-03-24 14:03:00 300.0 100.0 2021-03-24 10:02:03 0003 Lily 330106</pre>
|
|
</div>
|
|
</li></ol>
|
|
</div>
|
|
</div>
|
|
<div>
|
|
<div class="familylinks">
|
|
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_09_0006.html">Flink OpenSource SQL Jobs</a></div>
|
|
</div>
|
|
</div>
|
|
|