Files
doc-exports/docs/dli/dev/dli_09_0011.html
Hasko, Vladimir cfc48b3aed dli_dev_0104_version
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com>
Co-authored-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
Co-committed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com>
2024-05-06 09:14:57 +00:00

203 lines
36 KiB
HTML

<a name="dli_09_0011"></a><a name="dli_09_0011"></a>
<h1 class="topictitle1">Reading Data from Kafka and Writing Data to Elasticsearch</h1>
<div id="body8662426"><div class="notice" id="dli_09_0011__en-us_topic_0000001318422057_note04712015123019"><span class="noticetitle"><img src="public_sys-resources/notice_3.0-en-us.png"> </span><div class="noticebody"><p id="dli_09_0011__en-us_topic_0000001318422057_p87351118103316">This guide provides reference for Flink 1.12 only.</p>
</div></div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section10920163411416"><h4 class="sectiontitle">Description</h4><p id="dli_09_0011__en-us_topic_0000001318422057_p128093392048">This example analyzes offering purchase data and collects statistics on data results that meet specific conditions. The offering purchase data is stored in the Kafka source table, and then the analysis result is output to Elasticsearch .</p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p17817311367">For example, enter the following sample data:</p>
<pre class="screen" id="dli_09_0011__en-us_topic_0000001318422057_screen1275535143115">{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}</pre>
<p id="dli_09_0011__en-us_topic_0000001318422057_p13373161717716">DLI reads data from Kafka and writes the data to Elasticsearch. You can view the result in Kibana of the Elasticsearch cluster.</p>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section99670144912"><h4 class="sectiontitle">Prerequisites</h4><ol id="dli_09_0011__en-us_topic_0000001318422057_ol188213811126"><li id="dli_09_0011__en-us_topic_0000001318422057_li12282225113612">You have created a DMS for Kafka instance.<div class="caution" id="dli_09_0011__en-us_topic_0000001318422057_note12300232141314"><span class="cautiontitle"><img src="public_sys-resources/caution_3.0-en-us.png"> </span><div class="cautionbody"><p id="dli_09_0011__en-us_topic_0000001318422057_p14301143212130">When you create a DMS Kafka instance, do not enable <strong id="dli_09_0011__en-us_topic_0000001318422057_b3987162624412">Kafka SASL_SSL</strong>.</p>
</div></div>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li611719544378">You have created a CSS Elasticsearch cluster.<p id="dli_09_0011__en-us_topic_0000001318422057_p139161233131815"><a name="dli_09_0011__en-us_topic_0000001318422057_li611719544378"></a><a name="en-us_topic_0000001318422057_li611719544378"></a>In this example, the version of the created CSS cluster is 7.6.2, and security mode is disabled for the cluster.</p>
</li></ol>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section12587518310"><h4 class="sectiontitle">Overall Process</h4><div class="p" id="dli_09_0011__en-us_topic_0000001318422057_p7741285314"><a href="#dli_09_0011__en-us_topic_0000001318422057_fig1691441652">Figure 1</a> shows the overall development process.<div class="fignone" id="dli_09_0011__en-us_topic_0000001318422057_fig1691441652"><a name="dli_09_0011__en-us_topic_0000001318422057_fig1691441652"></a><a name="en-us_topic_0000001318422057_fig1691441652"></a><span class="figcap"><b>Figure 1 </b>Job development process</span><br><span><img id="dli_09_0011__en-us_topic_0000001318422057_image691841454" src="en-us_image_0000001318422061.png"></span></div>
</div>
<p id="dli_09_0011__en-us_topic_0000001318422057_p16413127863"><a href="#dli_09_0011__en-us_topic_0000001318422057_section792923214216">Step 1: Create a Queue</a></p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p0567192986"><a href="#dli_09_0011__en-us_topic_0000001318422057_section78516116518">Step 2: Create a Kafka Topic</a></p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p1259352616818"><a href="#dli_09_0011__en-us_topic_0000001318422057_section1627154113018">Step 3: Create an Elasticsearch Index</a></p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p11783144819818"><a href="#dli_09_0011__en-us_topic_0000001318422057_section074025752119">Step 4: Create an Enhanced Datasource Connection</a></p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p102618381914"><a href="#dli_09_0011__en-us_topic_0000001318422057_section12448959174212">Step 5: Run a Job</a></p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p491452441010"><a href="#dli_09_0011__en-us_topic_0000001318422057_section4387527162418">Step 6: Send Data and Query Results</a></p>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section792923214216"><a name="dli_09_0011__en-us_topic_0000001318422057_section792923214216"></a><a name="en-us_topic_0000001318422057_section792923214216"></a><h4 class="sectiontitle">Step 1: Create a Queue</h4><ol id="dli_09_0011__en-us_topic_0000001318422057_ol193907145108"><li id="dli_09_0011__en-us_topic_0000001318422057_li3390161431020">Log in to the DLI console. In the navigation pane on the left, choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b14249115911462">Resources</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b124911599467">Queue Management</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li5390171401016">On the displayed page, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b164984034716">Buy Queue</strong> in the upper right corner.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li133901314191011">On the <strong id="dli_09_0011__en-us_topic_0000001318422057_b104704118471">Buy Queue</strong> page, set queue parameters as follows:<ul id="dli_09_0011__en-us_topic_0000001318422057_ul161521581016"><li id="dli_09_0011__en-us_topic_0000001318422057_li5992112615541"><strong id="dli_09_0011__en-us_topic_0000001318422057_b57260570405">Billing Mode</strong>: . </li><li id="dli_09_0011__en-us_topic_0000001318422057_li17566251558"><strong id="dli_09_0011__en-us_topic_0000001318422057_b14700218124717">Region</strong> and <strong id="dli_09_0011__en-us_topic_0000001318422057_b8707718114711">Project</strong>: Retain the default values.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li9378124110319"><strong id="dli_09_0011__en-us_topic_0000001318422057_b10689172254713">Name</strong>: Enter a queue name.<div class="note" id="dli_09_0011__en-us_topic_0000001318422057_note1523218284569"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="dli_09_0011__en-us_topic_0000001318422057_en-us_topic_0069078607_en-us_topic_0069077926_p61185513">The queue name can contain only digits, letters, and underscores (_), but cannot contain only digits or start with an underscore (_). The name must contain 1 to 128 characters.</p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p6288122915199"><strong id="dli_09_0011__en-us_topic_0000001318422057_b18351844164716">The queue name is case-insensitive. Uppercase letters will be automatically converted to lowercase letters.</strong></p>
</div></div>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1761814682119"><strong id="dli_09_0011__en-us_topic_0000001318422057_b6569845104712">Type</strong>: Select <strong id="dli_09_0011__en-us_topic_0000001318422057_b3569154544710">For general purpose</strong>. Select the <strong id="dli_09_0011__en-us_topic_0000001318422057_b9831647164713">Dedicated Resource Mode</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li360481635014"><strong id="dli_09_0011__en-us_topic_0000001318422057_b206149217417">AZ Mode</strong> and <strong id="dli_09_0011__en-us_topic_0000001318422057_b1261415217411">Specifications</strong>: Retain the default values.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li32421882515"><strong id="dli_09_0011__en-us_topic_0000001318422057_b73227505478">Enterprise Project</strong>: Select <strong id="dli_09_0011__en-us_topic_0000001318422057_b532245014712">default</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li6862026155115"><strong id="dli_09_0011__en-us_topic_0000001318422057_b99931759124712">Advanced Settings</strong>: Select <strong id="dli_09_0011__en-us_topic_0000001318422057_b49948599474">Custom</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li615215819015"><strong id="dli_09_0011__en-us_topic_0000001318422057_b9968110154817">CIDR Block</strong>: Specify the queue network segment. For example, <strong id="dli_09_0011__en-us_topic_0000001318422057_b1540812214483">10.0.0.0/16</strong>.<div class="caution" id="dli_09_0011__en-us_topic_0000001318422057_note243428112912"><span class="cautiontitle"><img src="public_sys-resources/caution_3.0-en-us.png"> </span><div class="cautionbody"><p id="dli_09_0011__en-us_topic_0000001318422057_p114344818296">The CIDR block of a queue cannot overlap with the CIDR blocks of DMS Kafka and RDS for MySQL DB instances. Otherwise, datasource connections will fail to be created.</p>
</div></div>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1066019193813">Set other parameters as required.</li></ul>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li83901314181011">Click <strong id="dli_09_0011__en-us_topic_0000001318422057_b4134818489">Buy</strong>. Confirm the configuration and click <strong id="dli_09_0011__en-us_topic_0000001318422057_b41418810486">Submit</strong>.</li></ol>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section78516116518"><a name="dli_09_0011__en-us_topic_0000001318422057_section78516116518"></a><a name="en-us_topic_0000001318422057_section78516116518"></a><h4 class="sectiontitle">Step 2: Create a Kafka Topic</h4><ol id="dli_09_0011__en-us_topic_0000001318422057_ol68922017114"><li id="dli_09_0011__en-us_topic_0000001318422057_li48921006112">On the Kafka management console, click an instance name on the <strong id="dli_09_0011__en-us_topic_0000001318422057_b19371111016486">DMS for Kafka</strong> page. Basic information of the Kafka instance is displayed.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li6892202113">Choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b476461144819">Topics</strong> in the navigation pane on the left. On the displayed page, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b14764101114482">Create Topic</strong>. Configure the following parameters:<ul id="dli_09_0011__en-us_topic_0000001318422057_ul114131915412"><li id="dli_09_0011__en-us_topic_0000001318422057_li105301717185414"><strong id="dli_09_0011__en-us_topic_0000001318422057_b11769613184819">Topic Name</strong>: For this example, enter <strong id="dli_09_0011__en-us_topic_0000001318422057_b84016156489">testkafkatopic</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li20948104275411"><strong id="dli_09_0011__en-us_topic_0000001318422057_b12881141518486">Partitions</strong>: Set the value to <strong id="dli_09_0011__en-us_topic_0000001318422057_b14881121514810">1</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li15662134885416"><strong id="dli_09_0011__en-us_topic_0000001318422057_b16687116164811">Replicas</strong>: Set the value to <strong id="dli_09_0011__en-us_topic_0000001318422057_b1968771611484">1</strong>.</li></ul>
<p id="dli_09_0011__en-us_topic_0000001318422057_p11749490558">Retain default values for other parameters.</p>
</li></ol>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section1627154113018"><a name="dli_09_0011__en-us_topic_0000001318422057_section1627154113018"></a><a name="en-us_topic_0000001318422057_section1627154113018"></a><h4 class="sectiontitle">Step 3: Create an Elasticsearch Index</h4><ol id="dli_09_0011__en-us_topic_0000001318422057_ol28545581764"><li id="dli_09_0011__en-us_topic_0000001318422057_li1644413197483">Log in to the CSS management console and choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b464310294480">Clusters</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b7644329104810">Elasticsearch</strong> from the navigation pane on the left.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1366821895417">On the <strong id="dli_09_0011__en-us_topic_0000001318422057_b1158163219488">Clusters</strong> page, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b161581532154818">Access Kibana</strong> in the <strong id="dli_09_0011__en-us_topic_0000001318422057_b111581332184815">Operation</strong> column of the created CSS cluster.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li195784711550">On the displayed page, choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b1448219429503">Dev Tools</strong> in the navigation pane on the left. The <strong id="dli_09_0011__en-us_topic_0000001318422057_b34895422502">Console</strong> page is displayed.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li44569220573">On the displayed page, run the following command to create index <strong id="dli_09_0011__en-us_topic_0000001318422057_b55731022195119">shoporders</strong>:<pre class="screen" id="dli_09_0011__en-us_topic_0000001318422057_screen014757141119">PUT /shoporders
{
"settings": {
"number_of_shards": 1
},
"mappings": {
"properties": {
"order_id": {
"type": "text"
},
"order_channel": {
"type": "text"
},
"order_time": {
"type": "text"
},
"pay_amount": {
"type": "double"
},
"real_pay": {
"type": "double"
},
"pay_time": {
"type": "text"
},
"user_id": {
"type": "text"
},
"user_name": {
"type": "text"
},
"area_id": {
"type": "text"
}
}
}
}</pre>
</li></ol>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section074025752119"><a name="dli_09_0011__en-us_topic_0000001318422057_section074025752119"></a><a name="en-us_topic_0000001318422057_section074025752119"></a><h4 class="sectiontitle">Step 4: Create an Enhanced Datasource Connection</h4><ul id="dli_09_0011__en-us_topic_0000001318422057_ul1231663714228"><li id="dli_09_0011__en-us_topic_0000001318422057_li193161137122213"><strong id="dli_09_0011__en-us_topic_0000001318422057_b597105517515">Connecting DLI to Kafka</strong><ol id="dli_09_0011__en-us_topic_0000001318422057_ol24611049949"><li id="dli_09_0011__en-us_topic_0000001318422057_li71971337017">On the Kafka management console, click an instance name on the <strong id="dli_09_0011__en-us_topic_0000001318422057_b1275115616519">DMS for Kafka</strong> page. Basic information of the Kafka instance is displayed.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li19197133109">In the <strong id="dli_09_0011__en-us_topic_0000001318422057_b255757175113">Connection</strong> pane, obtain the <strong id="dli_09_0011__en-us_topic_0000001318422057_b5554576514">Instance Address (Private Network)</strong>. In the <strong id="dli_09_0011__en-us_topic_0000001318422057_b1656157105113">Network</strong> pane, obtain the VPC and subnet of the instance.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li557658113910">Click the security group name in the <strong id="dli_09_0011__en-us_topic_0000001318422057_b49025577512">Network</strong> pane. On the displayed page, click the <strong id="dli_09_0011__en-us_topic_0000001318422057_b7902857145110">Inbound Rules</strong> tab and add a rule to allow access from DLI queues. For example, if the CIDR block of the queue is 10.0.0.0/16, set <strong id="dli_09_0011__en-us_topic_0000001318422057_b19318591511">Priority</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b103115912516">1</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b143165955112">Action</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b193205905112">Allow</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b5413597518">Protocol</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b141659105118">TCP</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b34115955115">Type</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b164175910515">IPv4</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b104145965119">Source</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b164165985110">10.0.0.0/16</strong>, and click <strong id="dli_09_0011__en-us_topic_0000001318422057_b448595519">OK</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li10751165711389">Log in to the DLI management console. In the navigation pane on the left, choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b613418065213">Datasource Connections</strong>. On the displayed page, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b2134110175213">Create</strong> in the <strong id="dli_09_0011__en-us_topic_0000001318422057_b41348035215">Enhanced</strong> tab.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li10469451946">In the displayed dialog box, set the following parameters: For details, see the following section:<ul id="dli_09_0011__en-us_topic_0000001318422057_ul1032935016521"><li id="dli_09_0011__en-us_topic_0000001318422057_li1332914503526"><strong id="dli_09_0011__en-us_topic_0000001318422057_b16416143155219">Connection Name</strong>: Enter a name for the enhanced datasource connection. For this example, enter <strong id="dli_09_0011__en-us_topic_0000001318422057_b10423756528">dli_kafka</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li432905017524"><strong id="dli_09_0011__en-us_topic_0000001318422057_b127471287525">Resource Pool</strong>: Select the name of the queue created in <a href="#dli_09_0011__en-us_topic_0000001318422057_section792923214216">Step 1: Create a Queue</a>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1932945012529"><strong id="dli_09_0011__en-us_topic_0000001318422057_b21221014175215">VPC</strong>: Select the VPC of the Kafka instance.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li5329750115213"><strong id="dli_09_0011__en-us_topic_0000001318422057_b13167121514527">Subnet</strong>: Select the subnet of Kafka instance.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li16590071163">Set other parameters as you need.</li></ul>
<p id="dli_09_0011__en-us_topic_0000001318422057_p3174856105519">Click <strong id="dli_09_0011__en-us_topic_0000001318422057_b11702161719520">OK</strong>. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to <strong id="dli_09_0011__en-us_topic_0000001318422057_b9379181985219">Active</strong>.</p>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li18197531400">Choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b16714621125218">Resources</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b1171519211522">Queue Management</strong> from the navigation pane, locate the queue you created in <a href="#dli_09_0011__en-us_topic_0000001318422057_section792923214216">Step 1: Create a Queue</a>. In the <strong id="dli_09_0011__en-us_topic_0000001318422057_b15715182113520">Operation</strong> column, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b157153218521">More</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b11715821185214">Test Address Connectivity</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li171971831506">In the displayed dialog box, enter <em id="dli_09_0011__en-us_topic_0000001318422057_i2066714379524">Kafka instance address (private network)</em><strong id="dli_09_0011__en-us_topic_0000001318422057_b13667143711522">:</strong><em id="dli_09_0011__en-us_topic_0000001318422057_i366843712524">port</em> in the <strong id="dli_09_0011__en-us_topic_0000001318422057_b7668837115215">Address</strong> box and click <strong id="dli_09_0011__en-us_topic_0000001318422057_b1668337135214">Test</strong> to check whether the instance is reachable.</li></ol>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li474495213221"><strong id="dli_09_0011__en-us_topic_0000001318422057_b1416250145213">Connecting DLI to CSS</strong><ol id="dli_09_0011__en-us_topic_0000001318422057_ol17815135442310"><li id="dli_09_0011__en-us_topic_0000001318422057_li14815454112312">On the CSS management console, choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b7414706539">Clusters</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b841512035316">Elasticsearch</strong>. On the displayed page, click the name of the created CSS cluster to view basic information.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li19666016361"><a name="dli_09_0011__en-us_topic_0000001318422057_li19666016361"></a><a name="en-us_topic_0000001318422057_li19666016361"></a>On the <strong id="dli_09_0011__en-us_topic_0000001318422057_b62861516145311">Cluster Information</strong> page, obtain the <strong id="dli_09_0011__en-us_topic_0000001318422057_b10286616125319">Private Network Address</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b6286131635317">VPC</strong>, AND <strong id="dli_09_0011__en-us_topic_0000001318422057_b202861316115316">Subnet</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li4233434123610">Click the security group name. On the displayed page, click the <strong id="dli_09_0011__en-us_topic_0000001318422057_b1676513245538">Inbound Rules</strong> tab and add a rule to allow access from DLI queues. For example, if the CIDR block of the queue is 10.0.0.0/16, set <strong id="dli_09_0011__en-us_topic_0000001318422057_b1491617289537">Priority</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b8916132815311">1</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b1491792817539">Action</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b119171128185313">Allow</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b1917828115312">Protocol</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b1191822805314">TCP</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b1191892895314">Type</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b12918628135313">IPv4</strong>, <strong id="dli_09_0011__en-us_topic_0000001318422057_b991982812532">Source</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b6919128135312">10.0.0.0/16</strong>, and click <strong id="dli_09_0011__en-us_topic_0000001318422057_b1491962816539">OK</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li14803182620216">Check whether the Kafka instance and Elasticsearch instance are in the same VPC and subnet.<ol type="a" id="dli_09_0011__en-us_topic_0000001318422057_ol184431423137"><li id="dli_09_0011__en-us_topic_0000001318422057_li957111932">If they are, go to <a href="#dli_09_0011__en-us_topic_0000001318422057_li9816175412318">7</a>. You do not need to create an enhanced datasource connection again.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1086652519411">If they are not, go to <a href="#dli_09_0011__en-us_topic_0000001318422057_li11976319011">5</a>. Create an enhanced datasource connection to connect DLI to the subnet where the Elasticsearch instance locates.</li></ol>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li11976319011"><a name="dli_09_0011__en-us_topic_0000001318422057_li11976319011"></a><a name="en-us_topic_0000001318422057_li11976319011"></a>Log in to the DLI management console. In the navigation pane on the left, choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b1963435565419">Datasource Connections</strong>. On the displayed page, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b11635175505418">Create</strong> in the <strong id="dli_09_0011__en-us_topic_0000001318422057_b363595520549">Enhanced</strong> tab.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li198151354192319">In the displayed dialog box, set the following parameters: For details, see the following section:<ul id="dli_09_0011__en-us_topic_0000001318422057_ul17815125415233"><li id="dli_09_0011__en-us_topic_0000001318422057_li1181518543233"><strong id="dli_09_0011__en-us_topic_0000001318422057_b199516375516">Connection Name</strong>: Enter a name for the enhanced datasource connection. For this example, enter <strong id="dli_09_0011__en-us_topic_0000001318422057_b196186155516">dli_css</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li681518542232"><strong id="dli_09_0011__en-us_topic_0000001318422057_b192851310105514">Resource Pool</strong>: Select the name of the queue created in <a href="#dli_09_0011__en-us_topic_0000001318422057_section792923214216">Step 1: Create a Queue</a>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li58162542235"><strong id="dli_09_0011__en-us_topic_0000001318422057_b551313155556">VPC</strong>: Select the VPC of the Elasticsearch instance.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li11816105432317"><strong id="dli_09_0011__en-us_topic_0000001318422057_b93915275559">Subnet</strong>: Select the subnet of Elasticsearch instance.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li781675472310">Set other parameters as you need.</li></ul>
<p id="dli_09_0011__en-us_topic_0000001318422057_p1081617549235">Click <strong id="dli_09_0011__en-us_topic_0000001318422057_b1341994516553">OK</strong>. Click the name of the created datasource connection to view its status. You can perform subsequent steps only after the connection status changes to <strong id="dli_09_0011__en-us_topic_0000001318422057_b3683144615520">Active</strong>.</p>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li9816175412318"><a name="dli_09_0011__en-us_topic_0000001318422057_li9816175412318"></a><a name="en-us_topic_0000001318422057_li9816175412318"></a>Choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b10875548185510">Resources</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b1887594814558">Queue Management</strong> from the navigation pane, locate the queue you created in <a href="#dli_09_0011__en-us_topic_0000001318422057_section792923214216">Step 1: Create a Queue</a>. In the <strong id="dli_09_0011__en-us_topic_0000001318422057_b158755489558">Operation</strong> column, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b88754483557">More</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b887618485559">Test Address Connectivity</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li7816454162319">In the displayed dialog box, enter <em id="dli_09_0011__en-us_topic_0000001318422057_i7613185218559">floating IP address</em><strong id="dli_09_0011__en-us_topic_0000001318422057_b106131152145516">:</strong><em id="dli_09_0011__en-us_topic_0000001318422057_i361325295510">database port</em> of the Elasticsearch instance you have obtained in <a href="#dli_09_0011__en-us_topic_0000001318422057_li19666016361">2</a> in the <strong id="dli_09_0011__en-us_topic_0000001318422057_b1161415520556">Address</strong> box and click <strong id="dli_09_0011__en-us_topic_0000001318422057_b146141052105510">Test</strong> to check whether the database is reachable.</li></ol>
</li></ul>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section12448959174212"><a name="dli_09_0011__en-us_topic_0000001318422057_section12448959174212"></a><a name="en-us_topic_0000001318422057_section12448959174212"></a><h4 class="sectiontitle">Step 5: Run a Job</h4><ol id="dli_09_0011__en-us_topic_0000001318422057_ol1313811362437"><li id="dli_09_0011__en-us_topic_0000001318422057_li219661215114">On the DLI management console, choose <strong id="dli_09_0011__en-us_topic_0000001318422057_b208722126563">Job Management</strong> &gt; <strong id="dli_09_0011__en-us_topic_0000001318422057_b1587261235614">Flink Jobs</strong>. On the <strong id="dli_09_0011__en-us_topic_0000001318422057_b19873131218569">Flink Jobs</strong> page, click <strong id="dli_09_0011__en-us_topic_0000001318422057_b687331205611">Create Job</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li18197181225114">In the the<strong id="dli_09_0011__en-us_topic_0000001318422057_b1826091545616"> Create Job</strong> dialog box, set <strong id="dli_09_0011__en-us_topic_0000001318422057_b1261115125610">Type</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b32610157569">Flink OpenSource SQL</strong> and <strong id="dli_09_0011__en-us_topic_0000001318422057_b19261191595613">Name</strong> to <strong id="dli_09_0011__en-us_topic_0000001318422057_b13261111585617">FlinkKafkaES</strong>. Click <strong id="dli_09_0011__en-us_topic_0000001318422057_b9739423105619">OK</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li119731295112">On the job editing page, set the following parameters and retain the default values of other parameters.<ul id="dli_09_0011__en-us_topic_0000001318422057_ul1970291612112"><li id="dli_09_0011__en-us_topic_0000001318422057_li6702216152118"><strong id="dli_09_0011__en-us_topic_0000001318422057_b1160152817567">Queue</strong>: Select the queue created in <a href="#dli_09_0011__en-us_topic_0000001318422057_section792923214216">Step 1: Create a Queue</a>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1629221563615"><strong id="dli_09_0011__en-us_topic_0000001318422057_b74441230105615">Flink Version</strong>: Select <strong id="dli_09_0011__en-us_topic_0000001318422057_b3444930135614">1.12</strong>.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li84401118192110"><strong id="dli_09_0011__en-us_topic_0000001318422057_b122822326561">Save Job Log</strong>: Enable this function.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li12684193718212"><strong id="dli_09_0011__en-us_topic_0000001318422057_b14115203417562">OBS Bucket</strong>: Select an OBS bucket for storing job logs and grant access permissions of the OBS bucket as prompted.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1382275713215"><strong id="dli_09_0011__en-us_topic_0000001318422057_b16621835175618">Enable Checkpointing</strong>: Enable this function.</li><li id="dli_09_0011__en-us_topic_0000001318422057_li1479717252318">Enter a SQL statement in the editing pane. The following is an example. Modify the parameters in bold as you need.<div class="note" id="dli_09_0011__en-us_topic_0000001318422057_note16434951143717"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="dli_09_0011__en-us_topic_0000001318422057_p261743144111">In this example, the syntax version of Flink OpenSource SQL is 1.12. In this example, the data source is Kafka and the result data is written to Elasticsearch.</p>
</div></div>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li9547521705">Create a Kafka source table and connect DLI to the Kafka data source.<pre class="screen" id="dli_09_0011__en-us_topic_0000001318422057_screen18920326515">CREATE TABLE kafkaSource (
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) with (
"connector" = "kafka",
"properties.bootstrap.servers" = "10.128.0.120:9092,10.128.0.89:9092,10.128.0.83:9092",-- Internal network address and port number of the Kafka instance
"properties.group.id" = "click",
"topic" = "testkafkatopic",--Created Kafka topic
"format" = "json",
"scan.startup.mode" = "latest-offset"
);</pre>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li114689571903">Create an Elasticsearch result table to display the data analyzed by DLI.<pre class="screen" id="dli_09_0011__en-us_topic_0000001318422057_screen10254154111115">CREATE TABLE elasticsearchSink (
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'elasticsearch-7',
'hosts' = '192.168.168.125:9200', --Private IP address and port of the CSS cluster
'index' = 'shoporders' --Created Elasticsearch engine
);
--Write Kafka data to Elasticsearch indexes
insert into
elasticsearchSink
select
*
from
kafkaSource;</pre>
</li></ul>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li219761217517">Click <strong id="dli_09_0011__en-us_topic_0000001318422057_b173145335597">Check Semantic</strong> and ensure that the SQL statement passes the check. Click <strong id="dli_09_0011__en-us_topic_0000001318422057_b47511235105915">Save</strong>. Click <strong id="dli_09_0011__en-us_topic_0000001318422057_b182025377592">Start</strong>, confirm the job parameters, and click <strong id="dli_09_0011__en-us_topic_0000001318422057_b13202637195919">Start Now</strong> to execute the job. Wait until the job status changes to <strong id="dli_09_0011__en-us_topic_0000001318422057_b076747205918">Running</strong>.</li></ol>
</div>
<div class="section" id="dli_09_0011__en-us_topic_0000001318422057_section4387527162418"><a name="dli_09_0011__en-us_topic_0000001318422057_section4387527162418"></a><a name="en-us_topic_0000001318422057_section4387527162418"></a><h4 class="sectiontitle">Step 6: Send Data and Query Results</h4><ol id="dli_09_0011__en-us_topic_0000001318422057_ol0558165272410"><li id="dli_09_0011__en-us_topic_0000001318422057_li14615191961612">Kafaka sends data.<p id="dli_09_0011__en-us_topic_0000001318422057_p764982531616"><a name="dli_09_0011__en-us_topic_0000001318422057_li14615191961612"></a><a name="en-us_topic_0000001318422057_li14615191961612"></a>Use the Kafka client to send data to topics created in <a href="#dli_09_0011__en-us_topic_0000001318422057_section78516116518">Step 2: Create a Kafka Topic</a> to simulate real-time data streams.</p>
<p id="dli_09_0011__en-us_topic_0000001318422057_p17221639713">The sample data is as follows:</p>
<pre class="screen" id="dli_09_0011__en-us_topic_0000001318422057_screen1940249155610">{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0002", "user_name":"Jason", "area_id":"330106"}</pre>
</li><li id="dli_09_0011__en-us_topic_0000001318422057_li66601135172">View the data processing result on Elasticsearch.<div class="p" id="dli_09_0011__en-us_topic_0000001318422057_p19781151811173"><a name="dli_09_0011__en-us_topic_0000001318422057_li66601135172"></a><a name="en-us_topic_0000001318422057_li66601135172"></a>After the message is sent to Kafka, run the following statement in Kibana for the CSS cluster and check the result:<pre class="screen" id="dli_09_0011__en-us_topic_0000001318422057_screen1035620187317">GET shoporders/_search</pre>
</div>
<div class="p" id="dli_09_0011__en-us_topic_0000001318422057_p14840141132719">The query result is as follows:<pre class="screen" id="dli_09_0011__en-us_topic_0000001318422057_screen977814284272">{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 2,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "shoporders",
"_type" : "_doc",
"_id" : "6fswzIAByVjqg3_qAyM1",
"_score" : 1.0,
"_source" : {
"order_id" : "202103241000000001",
"order_channel" : "webShop",
"order_time" : "2021-03-24 10:00:00",
"pay_amount" : 100.0,
"real_pay" : 100.0,
"pay_time" : "2021-03-24 10:02:03",
"user_id" : "0001",
"user_name" : "Alice",
"area_id" : "330106"
}
},
{
"_index" : "shoporders",
"_type" : "_doc",
"_id" : "6vs1zIAByVjqg3_qyyPp",
"_score" : 1.0,
"_source" : {
"order_id" : "202103241606060001",
"order_channel" : "appShop",
"order_time" : "2021-03-24 16:06:06",
"pay_amount" : 200.0,
"real_pay" : 180.0,
"pay_time" : "2021-03-24 16:10:06",
"user_id" : "0002",
"user_name" : "Jason",
"area_id" : "330106"
}
}
]
}
}</pre>
</div>
<p id="dli_09_0011__en-us_topic_0000001318422057_p208561455418"></p>
</li></ol>
</div>
</div>
<div>
<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_09_0006.html">Flink OpenSource SQL Jobs</a></div>
</div>
</div>