forked from docs/doc-exports
Reviewed-by: Antonova, Ekaterina <ekantono@noreply.gitea.eco.tsi-dev.otc-service.com> Co-authored-by: Chen, Junjie <chenjunjie@huawei.com> Co-committed-by: Chen, Junjie <chenjunjie@huawei.com>
322 lines
17 KiB
HTML
322 lines
17 KiB
HTML
<a name="kafka-go"></a><a name="kafka-go"></a>
|
|
|
|
<h1 class="topictitle1">Go</h1>
|
|
<div id="body0000001081563264"><p id="kafka-go__p189211411174010">This section takes Linux CentOS as an example to describe how to access a Kafka instance using a Kafka client in Go 1.16.5, including how to obtain the demo code, and produce and consume messages.</p>
|
|
<p id="kafka-go__p465419346576">Before getting started, ensure that you have collected the information listed in <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<div class="section" id="kafka-go__section4574152863919"><h4 class="sectiontitle">Preparing the Environment</h4><ul id="kafka-go__ul1205424194112"><li id="kafka-go__li162059247412">Run the following command to check whether Go has been installed:<pre class="screen" id="kafka-go__screen549817508437">go version</pre>
|
|
<p id="kafka-go__p0879124464319">If the following information is displayed, Go has been installed.</p>
|
|
<pre class="screen" id="kafka-go__screen98677221458">[root@ecs-test confluent-kafka-go]# go version
|
|
go version go1.16.5 linux/amd64</pre>
|
|
<p id="kafka-go__p4425159184510">If Go is not installed, do as follows to install it:</p>
|
|
<pre class="screen" id="kafka-go__screen9667111713221"># Download the Go installation package.
|
|
wget https://go.dev/dl/go1.16.5.linux-amd64.tar.gz
|
|
|
|
# Decompress it to the <strong id="kafka-go__b1649172564415">/usr/local</strong> directory. The <strong id="kafka-go__b207449305441">/usr/local</strong> directory can be changed as required.
|
|
sudo tar -C /usr/local -xzf go1.16.5.linux-amd64.tar.gz
|
|
|
|
# Set the environment variable.
|
|
echo 'export PATH=$PATH:/usr/local/go/bin' >> ~/.profile
|
|
source ~/.profile</pre>
|
|
</li><li id="kafka-go__li791118451193">Run the following command to obtain the code used in the demo:<pre class="screen" id="kafka-go__screen118625185206">go get github.com/confluentinc/confluent-kafka-go/kafka</pre>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="kafka-go__section186152224220"><h4 class="sectiontitle">Producing Messages</h4><ul id="kafka-go__ul72740212348"><li id="kafka-go__li1274821143420">With SASL<pre class="screen" id="kafka-go__screen35531122111218">package main
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b4187917133412">ip1:port1</strong>,<strong id="kafka-go__b1485722110341">ip2:port2</strong>,<strong id="kafka-go__b1662172517346">ip3:port3</strong>"
|
|
topics = "<strong id="kafka-go__b202881446203420">topic_name</strong>"
|
|
user = "<strong id="kafka-go__b1483718500344">username</strong>"
|
|
password = "<strong id="kafka-go__b1537065583417">password</strong>"
|
|
caFile = "<strong id="kafka-go__b1726142423614">phy_ca.crt</strong>" // Obtain the SSL certificate by referring to section "Collecting Connection Information". If <strong id="kafka-go__b5211720155218">Security Protocol</strong> is set to <strong id="kafka-go__b78251825135216">SASL_PLAINTEXT</strong>, delete this parameter.
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka producer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
"security.protocol": "<strong id="kafka-go__b35618811244">SASL_SSL</strong>",
|
|
"sasl.mechanism": "<strong id="kafka-go__b1771614013315">PLAIN</strong>",
|
|
"sasl.username": user,
|
|
"sasl.password": password,
|
|
"ssl.ca.location": caFile, // If <strong id="kafka-go__b14101131710534">Security Protocol</strong> is set to <strong id="kafka-go__b1910114173537">SASL_PLAINTEXT</strong>, delete this parameter.
|
|
"ssl.endpoint.identification.algorithm": "none"
|
|
}
|
|
producer, err := kafka.NewProducer(config)
|
|
if err != nil {
|
|
log.Panicf("producer error, err: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for e := range producer.Events() {
|
|
switch ev := e.(type) {
|
|
case *kafka.Message:
|
|
if ev.TopicPartition.Error != nil {
|
|
log.Printf("Delivery failed: %v\n", ev.TopicPartition)
|
|
} else {
|
|
log.Printf("Delivered message to %v\n", ev.TopicPartition)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Produce messages to topic (asynchronously)
|
|
fmt.Println("please enter message:")
|
|
go func() {
|
|
for {
|
|
err := producer.Produce(&kafka.Message{
|
|
TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
|
|
Value: GetInput(),
|
|
}, nil)
|
|
if err != nil {
|
|
log.Panicf("send message fail, err: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
// Wait for message deliveries before shutting down
|
|
producer.Flush(15 * 1000)
|
|
producer.Close()
|
|
}
|
|
|
|
func GetInput() []byte {
|
|
reader := bufio.NewReader(os.Stdin)
|
|
data, _, _ := reader.ReadLine()
|
|
return data
|
|
}</pre>
|
|
<p id="kafka-go__p1123354315615">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-go__ul14961114618578"><li id="kafka-go__li29611746185714"><strong id="kafka-go__b186921533578">brokers</strong>: instance connection address and port</li><li id="kafka-go__li92790567577"><strong id="kafka-go__b14734191911598">topics</strong>: topic name</li><li id="kafka-go__li370312335810"><strong id="kafka-go__b12116140175115">user/password</strong>: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.</li><li id="kafka-go__li1471831925819"><strong id="kafka-go__b1608352125919">caFile</strong>: certificate file This parameter is mandatory if <strong id="kafka-go__b9919115425310">Security Protocol</strong> is set to <strong id="kafka-go__b10672115719530">SASL_SSL</strong>.</li><li id="kafka-go__li103611113142417"><strong id="kafka-go__b8639212115610">security.protocol</strong>: Kafka security protocol. Obtain it from the <strong id="kafka-go__b993801542103927">Basic Information</strong> page on the Kafka console. For Kafka instances that were created much earlier, if <strong id="kafka-go__b1382451599115259">Security Protocol</strong> is not displayed on the instance details page, SASL_SSL is used by default.<ul id="kafka-go__ul14433141182112"><li id="kafka-go__li61051119172115">When <strong id="kafka-go__b3206131811541">Security Protocol</strong> is set to <strong id="kafka-go__b420619185547">SASL_SSL</strong>, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.</li><li id="kafka-go__li1343315111218">When <strong id="kafka-go__b8877142318549">Security Protocol</strong> is set to <strong id="kafka-go__b487892315416">SASL_PLAINTEXT</strong>, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.</li></ul>
|
|
</li><li id="kafka-go__li04101535163315"><strong id="kafka-go__b2076410337367">sasl.mechanism</strong>: SASL authentication mechanism. View it on the <strong id="kafka-go__b1144606194512">Basic Information</strong> page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if <strong id="kafka-go__b967992073115253">SASL Mechanism</strong> is not displayed on the instance details page, PLAIN is used by default.</li></ul>
|
|
</li><li id="kafka-go__li3815417348">Without SASL<pre class="screen" id="kafka-go__screen53201715243">package main
|
|
|
|
import (
|
|
"bufio"
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b21051353173710">ip1:port1</strong>,<strong id="kafka-go__b454315383814">ip2:port2</strong>,<strong id="kafka-go__b953314816383">ip3:port3</strong>"
|
|
topics = "<strong id="kafka-go__b151061558193710">topic_name</strong>"
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka producer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
}
|
|
producer, err := kafka.NewProducer(config)
|
|
if err != nil {
|
|
log.Panicf("producer error, err: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for e := range producer.Events() {
|
|
switch ev := e.(type) {
|
|
case *kafka.Message:
|
|
if ev.TopicPartition.Error != nil {
|
|
log.Printf("Delivery failed: %v\n", ev.TopicPartition)
|
|
} else {
|
|
log.Printf("Delivered message to %v\n", ev.TopicPartition)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Produce messages to topic (asynchronously)
|
|
fmt.Println("please enter message:")
|
|
go func() {
|
|
for {
|
|
err := producer.Produce(&kafka.Message{
|
|
TopicPartition: kafka.TopicPartition{Topic: &topics, Partition: kafka.PartitionAny},
|
|
Value: GetInput(),
|
|
}, nil)
|
|
if err != nil {
|
|
log.Panicf("send message fail, err: %v", err)
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
// Wait for message deliveries before shutting down
|
|
producer.Flush(15 * 1000)
|
|
producer.Close()
|
|
}
|
|
|
|
func GetInput() []byte {
|
|
reader := bufio.NewReader(os.Stdin)
|
|
data, _, _ := reader.ReadLine()
|
|
return data
|
|
}</pre>
|
|
<p id="kafka-go__p104291540195313">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-go__ul12429154012539"><li id="kafka-go__li44294408531"><strong id="kafka-go__b123071103592">brokers</strong>: instance connection address and port</li><li id="kafka-go__li124298407534"><strong id="kafka-go__b10443286594">topics</strong>: topic name</li></ul>
|
|
</li></ul>
|
|
</div>
|
|
<div class="section" id="kafka-go__section12564133114116"><h4 class="sectiontitle">Consuming Messages</h4><ul id="kafka-go__ul79383504417"><li id="kafka-go__li109381250174112">With SASL<pre class="screen" id="kafka-go__screen1590418017254">package main
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b10914185218399">ip1:port1</strong>,<strong id="kafka-go__b99947577398">ip2:port2</strong>,<strong id="kafka-go__b173711025400">ip3:port3</strong>"
|
|
group = "<strong id="kafka-go__b1364013754013">group-id</strong>"
|
|
topics = "<strong id="kafka-go__b1746141316402">topic_name</strong>"
|
|
user = "<strong id="kafka-go__b1479313179407">username</strong>"
|
|
password = "<strong id="kafka-go__b1270622194015">password</strong>"
|
|
caFile = "<strong id="kafka-go__b10949536145212">phy_ca.crt</strong>" // Obtain the SSL certificate by referring to section "Collecting Connection Information". If <strong id="kafka-go__b1594933645213">Security Protocol</strong> is set to <strong id="kafka-go__b3949203612522">SASL_PLAINTEXT</strong>, delete this parameter.
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka consumer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
"group.id": group,
|
|
"auto.offset.reset": "earliest",
|
|
"security.protocol": "<strong id="kafka-go__b134682414269">SASL_SSL</strong>",
|
|
"sasl.mechanism": "<strong id="kafka-go__b5307152214412">PLAIN</strong>",
|
|
"sasl.username": user,
|
|
"sasl.password": password,
|
|
"ssl.ca.location": caFile, // If <strong id="kafka-go__b173101346105415">Security Protocol</strong> is set to <strong id="kafka-go__b17310046135420">SASL_PLAINTEXT</strong>, delete this parameter.
|
|
"ssl.endpoint.identification.algorithm": "none"
|
|
}
|
|
|
|
consumer, err := kafka.NewConsumer(config)
|
|
if err != nil {
|
|
log.Panicf("Error creating consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
err = consumer.SubscribeTopics([]string{topics}, nil)
|
|
if err != nil {
|
|
log.Panicf("Error subscribe consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := consumer.ReadMessage(-1)
|
|
if err != nil {
|
|
log.Printf("Consumer error: %v (%v)", err, msg)
|
|
} else {
|
|
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
if err = consumer.Close(); err != nil {
|
|
log.Panicf("Error closing consumer: %v", err)
|
|
}
|
|
}</pre>
|
|
<p id="kafka-go__p186531628337">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-go__ul365352813320"><li id="kafka-go__li1865362812313"><strong id="kafka-go__b331030115910">brokers</strong>: instance connection address and port</li><li id="kafka-go__li222105416318"><strong id="kafka-go__b936819329015">group</strong>: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.</li><li id="kafka-go__li46533284310"><strong id="kafka-go__b54722855911">topics</strong>: topic name</li><li id="kafka-go__li4653728735"><strong id="kafka-go__b12617911195120">user/password</strong>: The username and password set when ciphertext access is enabled for the first time, or the ones set in user creation. For security purposes, you are advised to encrypt the username and password.</li><li id="kafka-go__li665312818314"><strong id="kafka-go__b177356010017">caFile</strong>: certificate file This parameter is mandatory if <strong id="kafka-go__b15301129145418">Security Protocol</strong> is set to <strong id="kafka-go__b330116945418">SASL_SSL</strong>.</li><li id="kafka-go__li1360715193273"><strong id="kafka-go__b13887151618566">security.protocol</strong>: Kafka security protocol. Obtain it from the <strong id="kafka-go__b756183154103927">Basic Information</strong> page on the Kafka console. For Kafka instances that were created much earlier, if <strong id="kafka-go__b730003903115259">Security Protocol</strong> is not displayed on the instance details page, SASL_SSL is used by default.<ul id="kafka-go__ul113512172715"><li id="kafka-go__li13532142712">When <strong id="kafka-go__b061517202545">Security Protocol</strong> is set to <strong id="kafka-go__b14615202065410">SASL_SSL</strong>, SASL is used for authentication. Data is encrypted with SSL certificates for high-security transmission. You need to configure the instance username, password, and certificate file.</li><li id="kafka-go__li1635162122719">When <strong id="kafka-go__b623014245549">Security Protocol</strong> is set to <strong id="kafka-go__b42301824165415">SASL_PLAINTEXT</strong>, SASL is used for authentication. Data is transmitted in plaintext with high performance. You need to configure the instance username and password.</li></ul>
|
|
</li><li id="kafka-go__li85341154124416"><strong id="kafka-go__b2559133913361">sasl.mechanism</strong>: SASL authentication mechanism. View it on the <strong id="kafka-go__b933610917452">Basic Information</strong> page of the Kafka instance console. If both SCRAM-SHA-512 and PLAIN are enabled, use either of them in connection configurations. For instances that were created much earlier, if <strong id="kafka-go__b970004872115253">SASL Mechanism</strong> is not displayed on the instance details page, PLAIN is used by default.</li></ul>
|
|
</li><li id="kafka-go__li1529265364117">Without SASL<pre class="screen" id="kafka-go__screen115581152182712">package main
|
|
|
|
import (
|
|
"fmt"
|
|
"github.com/confluentinc/confluent-kafka-go/kafka"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
)
|
|
|
|
var (
|
|
brokers = "<strong id="kafka-go__b998019010441">ip1:port1</strong>,<strong id="kafka-go__b1995025204420">ip2:port2</strong>,<strong id="kafka-go__b1696199184416">ip3:port3</strong>"
|
|
group = "<strong id="kafka-go__b8911181417445">group-id</strong>"
|
|
topics = "<strong id="kafka-go__b172951519134418">topic_name</strong>"
|
|
)
|
|
|
|
func main() {
|
|
log.Println("Starting a new kafka consumer")
|
|
|
|
config := &kafka.ConfigMap{
|
|
"bootstrap.servers": brokers,
|
|
"group.id": group,
|
|
"auto.offset.reset": "earliest",
|
|
}
|
|
|
|
consumer, err := kafka.NewConsumer(config)
|
|
if err != nil {
|
|
log.Panicf("Error creating consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
err = consumer.SubscribeTopics([]string{topics}, nil)
|
|
if err != nil {
|
|
log.Panicf("Error subscribe consumer: %v", err)
|
|
return
|
|
}
|
|
|
|
go func() {
|
|
for {
|
|
msg, err := consumer.ReadMessage(-1)
|
|
if err != nil {
|
|
log.Printf("Consumer error: %v (%v)", err, msg)
|
|
} else {
|
|
fmt.Printf("Message on %s: %s\n", msg.TopicPartition, string(msg.Value))
|
|
}
|
|
}
|
|
}()
|
|
|
|
sigterm := make(chan os.Signal, 1)
|
|
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
|
|
select {
|
|
case <-sigterm:
|
|
log.Println("terminating: via signal")
|
|
}
|
|
if err = consumer.Close(); err != nil {
|
|
log.Panicf("Error closing consumer: %v", err)
|
|
}
|
|
}</pre>
|
|
<p id="kafka-go__p11281459519">The parameters in the example code are described as follows. For details about how to obtain the parameter values, see <a href="Kafka-config.html">Collecting Connection Information</a>.</p>
|
|
<ul id="kafka-go__ul101281054519"><li id="kafka-go__li6128851658"><strong id="kafka-go__b831320018591">brokers</strong>: instance connection address and port</li><li id="kafka-go__li012810511514"><strong id="kafka-go__b6154121218210">group</strong>: custom consumer group name. If the specified consumer group does not exist, Kafka automatically creates one.</li><li id="kafka-go__li141291751851"><strong id="kafka-go__b649192815596">topics</strong>: topic name</li></ul>
|
|
</li></ul>
|
|
</div>
|
|
</div>
|
|
<div></div>
|
|
|