王春亮 發表於 2020-12-18 17:28:00

golang confluent-kafka-go

<div id="cnblogs_post_body" class="blogpost-body cnblogs-markdown">
<p>针对golang的 kafka client 有很多开源package,例如sarama, confluent等等。在使用sarama 包时,高并发中偶尔遇到crash。于是改用confluent-kafka-go,其简单易用,并且表现稳定。</p>
<p>本文主要介绍confluent-kafka-go的使用方法。<br>confluent-kafka-go,是kafka官网推荐的golang package。</p>
<blockquote>
<p>confluent-kafka-go is Confluent's Golang client for Apache Kafka and the Confluent Platform.</p>

</blockquote>
<h3 id="编译环境搭建">编译环境搭建</h3>
<h4 id="安装librdkafka">安装librdkafka</h4>
<p><strong>下载</strong></p>
<pre><code class="hljs shell"><span class="hljs-meta">$<span class="bash"> git <span class="hljs-built_in">clone https://github.com/edenhill/librdkafka.git
<span class="hljs-meta">$<span class="bash"> <span class="hljs-built_in">cd librdkafka
</span></span></span></span></span></span></code></pre>
<p><strong>配置、编译、安装</strong></p>
<pre><code class="hljs ruby">$ ./configure --prefix /usr
$ make
$ sudo make install
</code></pre>
<p><strong>配置PKG_CONFIG_PATH</strong></p>
<p>在文件~/.bashrc 末尾添加</p>
<blockquote>
<p>export PKG_CONFIG_PATH=/usr/lib/pkgconfig</p>
</blockquote>
<h4 id="下载go-client">下载go client</h4>
<pre><code class="hljs go">$ <span class="hljs-keyword">go get -u github.com/confluentinc/confluent-kafka-<span class="hljs-keyword">go/kafka
</span></span></code></pre>
<p>自动下载到GOPATH目录下,也可到github上自行下载,然后放到GOPATH中。</p>
<h3 id="example">Example</h3>
<pre><code class="hljs go"><span class="hljs-comment">// Example function-based Apache Kafka producer
<span class="hljs-keyword">package main

<span class="hljs-comment">/**
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

<span class="hljs-keyword">import (
        <span class="hljs-string">"fmt"
        <span class="hljs-string">"github.com/confluentinc/confluent-kafka-go/kafka"
        <span class="hljs-string">"os"
)

<span class="hljs-function"><span class="hljs-keyword">func <span class="hljs-title">main<span class="hljs-params">() {

        <span class="hljs-keyword">if <span class="hljs-built_in">len(os.Args) != <span class="hljs-number">3 {
                fmt.Fprintf(os.Stderr, <span class="hljs-string">"Usage: %s &lt;broker&gt; &lt;topic&gt;\n",
                        os.Args[<span class="hljs-number">0])
                os.Exit(<span class="hljs-number">1)
        }

        broker := os.Args[<span class="hljs-number">1]
        topic := os.Args[<span class="hljs-number">2]

        p, err := kafka.NewProducer(&amp;kafka.ConfigMap{<span class="hljs-string">"bootstrap.servers": broker})

        <span class="hljs-keyword">if err != <span class="hljs-literal">nil {
                fmt.Printf(<span class="hljs-string">"Failed to create producer: %s\n", err)
                os.Exit(<span class="hljs-number">1)
        }

        fmt.Printf(<span class="hljs-string">"Created Producer %v\n", p)

        <span class="hljs-comment">// Optional delivery channel, if not specified the Producer object's
        <span class="hljs-comment">// .Events channel is used.
        deliveryChan := <span class="hljs-built_in">make(<span class="hljs-keyword">chan kafka.Event)

        value := <span class="hljs-string">"Hello Go!"
        err = p.Produce(&amp;kafka.Message{TopicPartition: kafka.TopicPartition{Topic: &amp;topic, Partition: kafka.PartitionAny}, Value: []<span class="hljs-keyword">byte(value)}, deliveryChan)

        e := &lt;-deliveryChan
        m := e.(*kafka.Message)

        <span class="hljs-keyword">if m.TopicPartition.Error != <span class="hljs-literal">nil {
                fmt.Printf(<span class="hljs-string">"Delivery failed: %v\n", m.TopicPartition.Error)
        } <span class="hljs-keyword">else {
                fmt.Printf(<span class="hljs-string">"Delivered message to topic %s [%d] at offset %v\n",
                        *m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset)
        }

        <span class="hljs-built_in">close(deliveryChan)
}
</span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></span></code></pre>
<p><strong>注意:</strong><br>如果需要链接静态库,可删除/usr/lib/下面关于rdkafka的动态库文件(.so文件)。然后,go build编译时加上选项 –tags static<br>例如:</p>
<blockquote>
<p>go build -tags static produer.go</p>

</blockquote>
<p>更多example,可参考<br>https://github.com/confluentinc/confluent-kafka-go/tree/master/examples</p>
<h3 id="参考">参考</h3>
<p>https://github.com/confluentinc/confluent-kafka-go</p>

</div>
<div id="MySignature">Just try, don't shy.</div><br><br>
来源:https://www.cnblogs.com/ExMan/p/14156056.html
頁: [1]
查看完整版本: golang confluent-kafka-go