Files
doc-exports/docs/dli/sqlreference/dli_08_15018.html
Su, Xiaomeng be9eabe464 dli_sqlreference_20250305
Reviewed-by: Pruthi, Vineet <vineet.pruthi@t-systems.com>
Co-authored-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
Co-committed-by: Su, Xiaomeng <suxiaomeng1@huawei.com>
2025-03-25 09:06:21 +00:00

220 lines
19 KiB
HTML

<a name="dli_08_15018"></a><a name="dli_08_15018"></a>
<h1 class="topictitle1">Confluent Avro</h1>
<div id="body0000001310095793"><div class="section" id="dli_08_15018__section152911439362"><h4 class="sectiontitle">Function</h4><p id="dli_08_15018__p10105172018016">The Avro Schema Registry (<strong id="dli_08_15018__b22351291516">avro-confluent</strong>) format allows you to read records that were serialized by the <strong id="dli_08_15018__b102311271512">io.confluent.kafka.serializers.KafkaAvroSerializer</strong> and to write records that can in turn be read by the <strong id="dli_08_15018__b112381231512">io.confluent.kafka.serializers.KafkaAvroDeserializer</strong>.</p>
<p id="dli_08_15018__p21051320605">When reading (deserializing) a record with this format the Avro writer schema is fetched from the configured Confluent Schema Registry based on the schema version ID encoded in the record while the reader schema is inferred from table schema.</p>
<p id="dli_08_15018__p1110512201402">When writing (serializing) a record with this format the Avro schema is inferred from the table schema and used to retrieve a schema ID to be encoded with the data The lookup is performed with in the configured Confluent Schema Registry under the <a href="https://docs.confluent.io/current/schema-registry/index.html#schemas-subjects-and-topics" target="_blank" rel="noopener noreferrer">subject</a>. The subject is specified by the avro-confluent.subject parameter.</p>
</div>
<div class="section" id="dli_08_15018__section382515542379"><h4 class="sectiontitle">Supported Connectors</h4><ul id="dli_08_15018__ul123811916163812"><li id="dli_08_15018__li203823165389">kafka</li><li id="dli_08_15018__li115222184514">upsert kafka</li></ul>
</div>
<div class="section" id="dli_08_15018__section17656257385"><h4 class="sectiontitle">Parameters</h4>
<div class="tablenoborder"><table cellpadding="4" cellspacing="0" summary="" id="dli_08_15018__table1035742171510" frame="border" border="1" rules="all"><caption><b>Table 1 </b>Parameter description</caption><thead align="left"><tr id="dli_08_15018__row836164215156"><th align="left" class="cellrowborder" valign="top" width="20%" id="mcps1.3.3.2.2.6.1.1"><p id="dli_08_15018__p11366429154">Parameter</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="10.68%" id="mcps1.3.3.2.2.6.1.2"><p id="dli_08_15018__p3365427156">Mandatory</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="8.83%" id="mcps1.3.3.2.2.6.1.3"><p id="dli_08_15018__p63654215155">Default Value</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="8.81%" id="mcps1.3.3.2.2.6.1.4"><p id="dli_08_15018__p1536174212155">Type</p>
</th>
<th align="left" class="cellrowborder" valign="top" width="51.68000000000001%" id="mcps1.3.3.2.2.6.1.5"><p id="dli_08_15018__p193674241515">Description</p>
</th>
</tr>
</thead>
<tbody><tr id="dli_08_15018__row3363421157"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p15761135519166">format</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p6361842181519">Yes</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p43674214154">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p2036104231511">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p2360426155">Format to be used. Set this parameter to <strong id="dli_08_15018__b714532114156">'avro-confluent'</strong>.</p>
</td>
</tr>
<tr id="dli_08_15018__row103616421152"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p88823464213">avro-confluent.basic-auth.credentials-source</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p636124214156">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p388517227307">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p1236642161514">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p19361442151517">Basic auth credentials source for Schema Registry</p>
</td>
</tr>
<tr id="dli_08_15018__row16366424156"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p423041436">avro-confluent.basic-auth.user-info</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p93617427154">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p18851922163012">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p93624211512">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p63634211157">Basic auth user info for schema registry</p>
</td>
</tr>
<tr id="dli_08_15018__row2036114281513"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p19810917332">avro-confluent.bearer-auth.credentials-source</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p936134216150">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p6885192210308">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p636194219155">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p19368420157">Bearer auth credentials source for Schema Registry</p>
</td>
</tr>
<tr id="dli_08_15018__row33613422159"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p161251531131">avro-confluent.bearer-auth.token</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p10360422158">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p2088472218304">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p636542181513">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p1936174210159">Bearer auth token for Schema Registry</p>
</td>
</tr>
<tr id="dli_08_15018__row1536194217158"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p83261740135">avro-confluent.properties</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p1036134231510">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p11884132283019">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p536742151512">Map</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p8365427158">Properties map that is forwarded to the underlying Schema Registry. This is useful for options that are not officially exposed via Flink config options. However, note that Flink options have higher precedence.</p>
</td>
</tr>
<tr id="dli_08_15018__row1736144212156"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p1318313587310">avro-confluent.ssl.keystore.location</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p1736124214154">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p19883222183010">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p1236642131517">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p12361542131520">Location/File of SSL keystore</p>
</td>
</tr>
<tr id="dli_08_15018__row1036104217154"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p28584912417">avro-confluent.ssl.keystore.password</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p63617428159">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p10883192214305">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p113617429153">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p16361742101518">Password for SSL keystore</p>
</td>
</tr>
<tr id="dli_08_15018__row1536204220158"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p109491122047">avro-confluent.ssl.truststore.location</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p143684291512">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p1088382212301">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p123694231515">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p236104210155">Location/File of SSL truststore</p>
</td>
</tr>
<tr id="dli_08_15018__row936742191511"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p843111401143">avro-confluent.ssl.truststore.password</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p436542171513">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p19882522163011">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p12371642181513">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p437144217155">Password for SSL truststore</p>
</td>
</tr>
<tr id="dli_08_15018__row153712429152"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p1832213565413">avro-confluent.subject</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p143764241513">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p158821922133012">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p1837164211511">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p13704291515">The Confluent Schema Registry subject under which to register the schema used by this format during serialization. By default, 'kafka' and 'upsert-kafka' connectors use '&lt;topic_name&gt;-value' or '&lt;topic_name&gt;-key' as the default subject name if this format is used as the value or key format. But for other connectors (e.g. 'filesystem'), the subject option is required when used as sink.</p>
</td>
</tr>
<tr id="dli_08_15018__row1937154241516"><td class="cellrowborder" valign="top" width="20%" headers="mcps1.3.3.2.2.6.1.1 "><p id="dli_08_15018__p10242917252">avro-confluent.url</p>
</td>
<td class="cellrowborder" valign="top" width="10.68%" headers="mcps1.3.3.2.2.6.1.2 "><p id="dli_08_15018__p17371442191515">No</p>
</td>
<td class="cellrowborder" valign="top" width="8.83%" headers="mcps1.3.3.2.2.6.1.3 "><p id="dli_08_15018__p3881132203018">None</p>
</td>
<td class="cellrowborder" valign="top" width="8.81%" headers="mcps1.3.3.2.2.6.1.4 "><p id="dli_08_15018__p337114211518">String</p>
</td>
<td class="cellrowborder" valign="top" width="51.68000000000001%" headers="mcps1.3.3.2.2.6.1.5 "><p id="dli_08_15018__p1137124271517">The URL of the Confluent Schema Registry to fetch/register schemas.</p>
</td>
</tr>
</tbody>
</table>
</div>
</div>
<div class="section" id="dli_08_15018__section202951635560"><h4 class="sectiontitle">Data Type Mapping</h4><p id="dli_08_15018__p125071640668">Currently, Apache Flink always uses the table schema to derive the Avro reader schema during deserialization and Avro writer schema during serialization. Explicitly defining an Avro schema is not supported yet. See <a href="dli_08_15016.html">Avro</a> for the mapping between Avro and Flink DataTypes.</p>
<p id="dli_08_15018__p175071640566">In addition to the types listed there, Flink supports reading/writing nullable types. Flink maps nullable types to Avro <strong id="dli_08_15018__b10967142182816">union(something, null)</strong>, where <strong id="dli_08_15018__b199614852816">something</strong> is the Avro type converted from Flink type.</p>
</div>
<div class="section" id="dli_08_15018__section8159127174419"><h4 class="sectiontitle">Example</h4><p id="dli_08_15018__p5192123014443">Read JSON data from the source topic in Kafka and write the data in Confluent Avro format to the sink topic.</p>
<ol id="dli_08_15018__ol194212211460"><li id="dli_08_15018__li12421152112612">Create a datasource connection for the communication with the VPC and subnet where Kafka and ECS locate and bind the connection to the queue. Set a security group and inbound rule to allow access of the queue and test the connectivity of the queue using the Kafka and ECS IP addresses. For example, locate a general-purpose queue where the job runs and choose <strong id="dli_08_15018__b1858085791517">More</strong> &gt; <strong id="dli_08_15018__b958016579151">Test Address Connectivity</strong> in the <strong id="dli_08_15018__b65811757101514">Operation</strong> column. If the connection is successful, the datasource is bound to the queue. Otherwise, the binding fails.</li><li id="dli_08_15018__li5421102111611">Purchase an ECS cluster, download <a href="https://packages.confluent.io/archive/5.5/" target="_blank" rel="noopener noreferrer">Confluent</a> 5.5.2 and jdk1.8.0_232, and upload them to the ECS cluster. Run the following command to decompress the packages (assume that the decompression directories are <strong id="dli_08_15018__b872314619146">confluent-5.5.2</strong> and <strong id="dli_08_15018__b44181549101410">jdk1.8.0_232</strong>):<pre class="screen" id="dli_08_15018__screen1034417402518">tar zxvf confluent-5.5.2-2.11.tar.gz
tar zxvf jdk1.8.0_232.tar.gz</pre>
</li><li id="dli_08_15018__li1042172114614">Run the following commands to install jdk1.8.0_232 in the current ECS cluster. You can run the <strong id="dli_08_15018__b197915276284">pwd</strong> command in the <strong id="dli_08_15018__b88072732819">jdk1.8.0_232 folder</strong> to view the value of <strong id="dli_08_15018__b1180627152818">yourJdkPath</strong>.<pre class="screen" id="dli_08_15018__screen1675211085518">export JAVA_HOME=&lt;yourJdkPath&gt;
export PATH=$JAVA_HOME/bin:$PATH
export CLASSPATH=.:$JAVA_HOME/lib:$JAVA_HOME/jre/lib</pre>
</li><li id="dli_08_15018__li1342116213613">Go to the <strong id="dli_08_15018__b958744421518">confluent-5.5.2/etc/schema-registry/</strong> directory and modify the following configuration items in the <strong id="dli_08_15018__b1734995316156">schema-registry.properties</strong> file:<pre class="screen" id="dli_08_15018__screen1970435215579">listeners=http://&lt;yourEcsIp&gt;:8081
<strong id="dli_08_15018__b0791253166">kafkastore.bootstrap.servers</strong>=&lt;yourKafkaAddress1&gt;:&lt;yourKafkaPort&gt;,&lt;yourKafkaAddress2&gt;:&lt;yourKafkaPort&gt;</pre>
</li><li id="dli_08_15018__li1342112118610">Switch to the <strong id="dli_08_15018__b29231832192812">confluent-5.5.2</strong> directory and run the following command to start Confluent:<pre class="screen" id="dli_08_15018__screen7999174822310">bin/schema-registry-start etc/schema-registry/schema-registry.properties</pre>
</li><li id="dli_08_15018__li134219213620">Create a Flink OpenSource SQL job, select the Flink 1.15 version, and allow DLI to save job logs in OBS. Add the following statement to the job and submit it:<pre class="screen" id="dli_08_15018__screen3886121512115">CREATE TABLE kafkaSource (
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15018__i1292114410166"><strong id="dli_08_15018__b157441843161614">kafkaSourceTopic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15018__i366724417446"><strong id="dli_08_15018__b1466716449449">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'properties.group.id' = '<em id="dli_08_15018__i163677476168"><strong id="dli_08_15018__b482774616166">GroupId</strong></em>',
'scan.startup.mode' = 'latest-offset',
'format' = 'json'
);
CREATE TABLE kafkaSink (
order_id string,
order_channel string,
order_time string,
pay_amount double,
real_pay double,
pay_time string,
user_id string,
user_name string,
area_id string
) WITH (
'connector' = 'kafka',
'topic' = '<em id="dli_08_15018__i142031958111614"><strong id="dli_08_15018__b1281957161615">kafkaSinkTopic</strong></em>',
'properties.bootstrap.servers' = '<em id="dli_08_15018__i1221713131712"><strong id="dli_08_15018__b1821716181711">KafkaAddress1:KafkaPort,KafkaAddress2:KafkaPort</strong></em>',
'format' = 'avro-confluent',
'avro-confluent.url' = 'http://<em id="dli_08_15018__i1478021181712"><strong id="dli_08_15018__b2996142016173">EcsIp</strong></em>:8081'
);
insert into kafkaSink select * from kafkaSource;</pre>
</li><li id="dli_08_15018__li16421112116612">Insert the following data into Kafka:<pre class="screen" id="dli_08_15018__screen1416212441032">{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2021-03-24 10:00:00", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241606060001", "order_channel":"appShop", "order_time":"2021-03-24 16:06:06", "pay_amount":"200.00", "real_pay":"180.00", "pay_time":"2021-03-24 16:10:06", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}</pre>
</li><li id="dli_08_15018__li12421421463">Read the data of the sink Kafka topic. You will find that the data has been written and the schema has been saved to the <strong id="dli_08_15018__b214761531912">_schema</strong> topic of Kafka.</li></ol>
</div>
</div>
<div>
<div class="familylinks">
<div class="parentlink"><strong>Parent topic:</strong> <a href="dli_08_15014.html">Formats</a></div>
</div>
</div>