forked from docs/doc-exports
Reviewed-by: Hasko, Vladimir <vladimir.hasko@t-systems.com> Co-authored-by: Yang, Tong <yangtong2@huawei.com> Co-committed-by: Yang, Tong <yangtong2@huawei.com>
43 lines
12 KiB
HTML
43 lines
12 KiB
HTML
<a name="mrs_01_1580"></a><a name="mrs_01_1580"></a>
|
|
|
|
<h1 class="topictitle1">Configuring Kafka</h1>
|
|
<div id="body8662426"><p id="mrs_01_1580__en-us_topic_0000001219230857_a3bc83341b26d4081b38a633c33ea0eb6">Sample project data of Flink is stored in Kafka. A user with Kafka permission can send data to Kafka and receive data from it.</p>
|
|
<ol id="mrs_01_1580__en-us_topic_0000001219230857_ol774114513169"><li id="mrs_01_1580__en-us_topic_0000001219230857_li20741135201612"><span>Ensure that clusters, including HDFS, Yarn, Flink, and Kafka are installed.</span></li><li id="mrs_01_1580__en-us_topic_0000001219230857_li3741258161"><span>Create a topic.</span><p><ul id="mrs_01_1580__en-us_topic_0000001219230857_ue65d10b6661a4e98b18ab5a81ff179f1"><li id="mrs_01_1580__en-us_topic_0000001219230857_ld59e85e76e7a4059bc0ff5b022bc7ae7">Run Linux command line to create a topic. Before running commands, ensure that the kinit command, for example, <b><span class="cmdname" id="mrs_01_1580__en-us_topic_0000001219230857_cmdname9861781464">kinit flinkuser</span></b>, is run for authentication.<div class="note" id="mrs_01_1580__en-us_topic_0000001219230857_n001b765e03e5493a91f359fcfe864866"><img src="public_sys-resources/note_3.0-en-us.png"><span class="notetitle"> </span><div class="notebody"><p id="mrs_01_1580__en-us_topic_0000001219230857_p1949341413286">To create a Flink user, you need to have the permission to create Kafka topics.</p>
|
|
</div></div>
|
|
<p id="mrs_01_1580__en-us_topic_0000001219230857_a09f0c45f1b9f4b848a54366aa318271b">The format of the command is shown as follows, in which <strong id="mrs_01_1580__en-us_topic_0000001219230857_b1285922615418">{zkQuorum}</strong> indicates ZooKeeper cluster information and the format is <em id="mrs_01_1580__en-us_topic_0000001219230857_i1759163211548">IP</em>:<em id="mrs_01_1580__en-us_topic_0000001219230857_i5121336155413">port</em>, and <strong id="mrs_01_1580__en-us_topic_0000001219230857_b94281541175420">{Topic}</strong> indicates the topic name.</p>
|
|
<p id="mrs_01_1580__en-us_topic_0000001219230857_a90a4a4e932a44292aa16e51bf13e22cb"><b><span class="cmdname" id="mrs_01_1580__en-us_topic_0000001219230857_cmdname2011718064716">bin/kafka-topics.sh --create --zookeeper {zkQuorum}/kafka --replication-factor 1 --partitions 5 --topic {Topic}</span></b></p>
|
|
<div class="p" id="mrs_01_1580__en-us_topic_0000001219230857_ab89fdb062e6744c28897b63296dd0316">Assume the topic name is <strong id="mrs_01_1580__en-us_topic_0000001219230857_b65457798710039">topic 1</strong>. The command for creating this topic is displayed as follows:<pre class="screen" id="mrs_01_1580__en-us_topic_0000001219230857_s1cfc491c6d7d4a3691f1440c98de5b6d">/opt/client/Kafka/kafka/bin/kafka-topics.sh --create --zookeeper 10.96.101.32:2181,10.96.101.251:2181,10.96.101.177:2181,10.91.8.160:2181/kafka --replication-factor 1 --partitions 5 --topic topic1</pre>
|
|
</div>
|
|
</li><li id="mrs_01_1580__en-us_topic_0000001219230857_lb80be27d9c9347a890b0bdf583c204fe">Configure the permission of the topic on the server.<p id="mrs_01_1580__en-us_topic_0000001219230857_a51f2aa7f18e5486f9eae2998c510d45f"><a name="mrs_01_1580__en-us_topic_0000001219230857_lb80be27d9c9347a890b0bdf583c204fe"></a><a name="en-us_topic_0000001219230857_lb80be27d9c9347a890b0bdf583c204fe"></a>Set the <strong id="mrs_01_1580__en-us_topic_0000001219230857_b34808704110039">allow.everyone.if.no.acl.found</strong> parameter of Kafka Broker to <strong id="mrs_01_1580__en-us_topic_0000001219230857_b1285629810039">true</strong>.</p>
|
|
</li></ul>
|
|
</p></li><li id="mrs_01_1580__en-us_topic_0000001219230857_li17427531616"><span>Perform the security authentication.</span><p><p id="mrs_01_1580__en-us_topic_0000001219230857_aa6efa6fe350c48a9bf36cd5ff955366e">The Kerberos authentication, SSL encryption authentication, or Kerberos + SSL authentication mode can be used.</p>
|
|
<ul id="mrs_01_1580__en-us_topic_0000001219230857_uda72767d9a344cb7abc24aeba4e53662"><li id="mrs_01_1580__en-us_topic_0000001219230857_l709508b095f74de6be3b7d16ca5fcb98"><strong id="mrs_01_1580__en-us_topic_0000001219230857_a60d2535af40d4b11997a470df31a406e">Kerberos authentication</strong><ul id="mrs_01_1580__en-us_topic_0000001219230857_ue144e402a2b54478af2fda100c7e3535"><li id="mrs_01_1580__en-us_topic_0000001219230857_lc79746189b45425388e5d05ec52acbf0">Client configuration<p id="mrs_01_1580__en-us_topic_0000001219230857_a27295504086945aa9678d4ffa9c1353c"><a name="mrs_01_1580__en-us_topic_0000001219230857_lc79746189b45425388e5d05ec52acbf0"></a><a name="en-us_topic_0000001219230857_lc79746189b45425388e5d05ec52acbf0"></a>In the Flink configuration file <strong id="mrs_01_1580__en-us_topic_0000001219230857_b183121337143517">flink-conf.yaml</strong>, add configurations about Kerberos authentication. For example, add <strong id="mrs_01_1580__en-us_topic_0000001219230857_b731816376353">KafkaClient</strong> in <strong id="mrs_01_1580__en-us_topic_0000001219230857_b23184375356">contexts</strong> as follows:</p>
|
|
<pre class="screen" id="mrs_01_1580__en-us_topic_0000001219230857_s0ac758566e6a48d9adcee1749df59133">security.kerberos.login.keytab: /home/demo//keytab/flinkuser.keytab
|
|
security.kerberos.login.principal: flinkuser
|
|
security.kerberos.login.contexts: Client,KafkaClient
|
|
security.kerberos.login.use-ticket-cache: false</pre>
|
|
</li><li id="mrs_01_1580__en-us_topic_0000001219230857_l546bc4fa30024ad1ac54779fdd0dca91">Running parameter<p id="mrs_01_1580__en-us_topic_0000001219230857_afc928e22c6784372acacf8ece5549e58"><a name="mrs_01_1580__en-us_topic_0000001219230857_l546bc4fa30024ad1ac54779fdd0dca91"></a><a name="en-us_topic_0000001219230857_l546bc4fa30024ad1ac54779fdd0dca91"></a>Running parameters about the <span class="parmname" id="mrs_01_1580__en-us_topic_0000001219230857_pacd9bd042f564935961d16580e715dcf"><b>SASL_PLAINTEXT</b></span> protocol are as follows:</p>
|
|
<pre class="screen" id="mrs_01_1580__en-us_topic_0000001219230857_sd9ea4ea6fb1b4ed3916c82adf966b6d4">--topic topic1 --bootstrap.servers 10.96.101.32:21007 --security.protocol SASL_PLAINTEXT --sasl.kerberos.service.name kafka //10.96.101.32:21007 indicates the <em id="mrs_01_1580__en-us_topic_0000001219230857_i199738281708">IP</em>:<em id="mrs_01_1580__en-us_topic_0000001219230857_i1766312321209">port</em> of the Kafka server.</pre>
|
|
</li></ul>
|
|
</li><li id="mrs_01_1580__en-us_topic_0000001219230857_lf691d7dcd4f14532b618c4c6f83da8c9"><strong id="mrs_01_1580__en-us_topic_0000001219230857_a0bc27d41f5ae4ec49942c045e9e8edaf">SSL encryption</strong><ul id="mrs_01_1580__en-us_topic_0000001219230857_uc00cde88549b4385bd63593ef627de47"><li id="mrs_01_1580__en-us_topic_0000001219230857_l5b3cfe832bf94c049e1365bc47f2a820">Configuration on the server<p id="mrs_01_1580__en-us_topic_0000001219230857_p145127192210"><a name="mrs_01_1580__en-us_topic_0000001219230857_l5b3cfe832bf94c049e1365bc47f2a820"></a><a name="en-us_topic_0000001219230857_l5b3cfe832bf94c049e1365bc47f2a820"></a>Log in to FusionInsight Manager, choose <strong id="mrs_01_1580__en-us_topic_0000001219230857_b06931424185311">Cluster</strong> > <strong id="mrs_01_1580__en-us_topic_0000001219230857_b167791752205">Services</strong> > <strong id="mrs_01_1580__en-us_topic_0000001219230857_b2995155420019">Kafka</strong> > <strong id="mrs_01_1580__en-us_topic_0000001219230857_b164201574016">Configurations</strong>, and set <strong id="mrs_01_1580__en-us_topic_0000001219230857_b770810712116">Type</strong> to <strong id="mrs_01_1580__en-us_topic_0000001219230857_b141911210112">All</strong>. Search for <strong id="mrs_01_1580__en-us_topic_0000001219230857_b185714152113">ssl.mode.enable</strong> and set it to <strong id="mrs_01_1580__en-us_topic_0000001219230857_b79268201115">true</strong>.</p>
|
|
</li><li id="mrs_01_1580__en-us_topic_0000001219230857_l75894405f6db4adea0e92f151c849b86">Configuration on the client<ol type="a" id="mrs_01_1580__en-us_topic_0000001219230857_obb1f8b2fa9f7416aa2e0fd40edf7a60f"><li id="mrs_01_1580__en-us_topic_0000001219230857_l4d1d938106954e69813a6a5a16a21b07">Log in to FusionInsight Manager, choose <strong id="mrs_01_1580__en-us_topic_0000001219230857_b197507608310039">Cluster > <em id="mrs_01_1580__en-us_topic_0000001219230857_i96421974810039">Name of the desired cluster</em> > Services > Kafka > More > Download Client</strong> to download Kafka client.</li><li id="mrs_01_1580__en-us_topic_0000001219230857_l7d4ed49a1ea24de0ba996eb3eae9c736">Use the <span class="parmname" id="mrs_01_1580__en-us_topic_0000001219230857_parmname182851561810039"><b>ca.crt</b></span> certificate file in the client root directory to generate the <span class="parmname" id="mrs_01_1580__en-us_topic_0000001219230857_parmname75481113510039"><b>truststore</b></span> file for the client.<div class="p" id="mrs_01_1580__en-us_topic_0000001219230857_a3e0e732924d34bc9884f3685aad826e6">Run the following command:<pre class="screen" id="mrs_01_1580__en-us_topic_0000001219230857_sc434d2f340694670926730ccf8c28c8d">keytool -noprompt -import -alias myservercert -file ca.crt -keystore truststore.jks </pre>
|
|
</div>
|
|
<p id="mrs_01_1580__en-us_topic_0000001219230857_a27187e56b1af4e3ebe0fff39d8b49a4c">The command execution result is similar to the following:</p>
|
|
<p id="mrs_01_1580__en-us_topic_0000001219230857_a7f681e92e3d84b27be5b62bef5f88e74"><span><img id="mrs_01_1580__en-us_topic_0000001219230857_i7767064234834d3380fdcb628f68e1c4" src="en-us_image_0000001349139389.png"></span></p>
|
|
</li><li id="mrs_01_1580__en-us_topic_0000001219230857_lcb660a996df54c7693a4c1b23054021f">Run parameters.<p id="mrs_01_1580__en-us_topic_0000001219230857_a7f9c9c7985c04354ad9582bebec68d4c"><a name="mrs_01_1580__en-us_topic_0000001219230857_lcb660a996df54c7693a4c1b23054021f"></a><a name="en-us_topic_0000001219230857_lcb660a996df54c7693a4c1b23054021f"></a>The value of <strong id="mrs_01_1580__en-us_topic_0000001219230857_b124521947310039">ssl.truststore.password</strong> must be the same as the password you entered when creating <strong id="mrs_01_1580__en-us_topic_0000001219230857_b180121281910039">truststore</strong>. Run the following command to run parameters:</p>
|
|
<pre class="screen" id="mrs_01_1580__en-us_topic_0000001219230857_saf821d32efcc472e9a0ecdb22f3197e1">--topic topic1 --bootstrap.servers 10.96.101.32:9093 --security.protocol SSL --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password <em id="mrs_01_1580__en-us_topic_0000001219230857_i121015810531">XXX</em></pre>
|
|
<p id="mrs_01_1580__en-us_topic_0000001219230857_p246750101610"></p>
|
|
</li></ol>
|
|
</li></ul>
|
|
</li><li id="mrs_01_1580__en-us_topic_0000001219230857_l7b01ca2a7e83487b80f1eef37f476fb2"><strong id="mrs_01_1580__en-us_topic_0000001219230857_b24111690210039">Kerberos+SSL</strong> <strong id="mrs_01_1580__en-us_topic_0000001219230857_b173431923410039">encryption</strong><p id="mrs_01_1580__en-us_topic_0000001219230857_ad011ae6cdbe143d889a43b1d461f52ad">After completing preceding configurations of the client and server of Kerberos and SSL, modify the port number and protocol type in running parameters to enable the Kerberos+SSL encryption mode.</p>
|
|
<pre class="screen" id="mrs_01_1580__en-us_topic_0000001219230857_sce43a0b6b66d48ae89b9160521ecb9a7">--topic topic1 --bootstrap.servers 10.96.101.32:21009 --security.protocol SASL_SSL --sasl.kerberos.service.name kafka --ssl.truststore.location /home/zgd/software/FusionInsight_Kafka_ClientConfig/truststore.jks --ssl.truststore.password <em id="mrs_01_1580__en-us_topic_0000001219230857_i125920599522">XXX</em></pre>
|
|
</li></ul>
|
|
</p></li></ol>
|
|
</div>
|
|
<div>
|
|
<div class="familylinks">
|
|
<div class="parentlink"><strong>Parent topic:</strong> <a href="mrs_01_0593.html">Security Configuration</a></div>
|
|
</div>
|
|
</div>
|
|
|