go kafka group
<p>在以前的文章kafka初探go和C#的实现里面我们用了sarama来消费kafka的消息,但是很遗憾它没有group的概念。没办法 我们只能用sarama-cluster来实现, 注意sarama版本不要太新否则有错误panic: non-positive interval for NewTicker 问题处理,建议大家可以修改go.mod文件如下:</p><div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">require (
github.com</span>/Shopify/sarama v1.<span style="color: rgba(128, 0, 128, 1)">24.1</span><span style="color: rgba(0, 0, 0, 1)">
github.com</span>/bsm/sarama-cluster v2.<span style="color: rgba(128, 0, 128, 1)">1.15</span>+<span style="color: rgba(0, 0, 0, 1)">incompatible<br></span></pre>
</div>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">package main
import (
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">context</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">fmt</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">log</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">os</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">os/signal</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
_ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">regexp</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/Shopify/sarama</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
cluster </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/bsm/sarama-cluster</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)
</span><span style="color: rgba(0, 0, 255, 1)">var</span> Address = []<span style="color: rgba(0, 0, 255, 1)">string</span>{<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">192.168.100.30:9092</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">}
</span><span style="color: rgba(0, 0, 255, 1)">var</span> Topic = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">gavintest</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">panic: non-positive interval for NewTicker
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 修改go.mod
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">github.com/Shopify/sarama v1.24.1
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> github.com/bsm/sarama-cluster v2.1.15+incompatible
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">修改</span><span style="color: rgba(0, 128, 0, 1)">
/*</span><span style="color: rgba(0, 128, 0, 1)">*
消费者
</span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
func main() {
go syncConsumer(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo1</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
go syncConsumer(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo2</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
go ConsumerDemo3()
go syncProducer()
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {}
}
func syncConsumer(groupName </span><span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">) {
config :</span>=<span style="color: rgba(0, 0, 0, 1)"> cluster.NewConfig()
config.Consumer.Return.Errors </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
config.Group.Return.Notifications </span>= <span style="color: rgba(0, 0, 255, 1)">true</span>
<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> init consumer
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">可以订阅多个主题</span>
topics := []<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">{Topic}
consumer, err :</span>=<span style="color: rgba(0, 0, 0, 1)"> cluster.NewConsumer(Address, groupName, topics, config)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
panic(err)
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">这里需要注意的是defer 一定要在panic 之后才能关闭连接</span>
<span style="color: rgba(0, 0, 0, 1)"> defer consumer.Close()
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> trap SIGINT to trigger a shutdown.</span>
signals := make(chan os.Signal, <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">)
signal.Notify(signals, os.Interrupt)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> consume errors</span>
<span style="color: rgba(0, 0, 0, 1)"> go func() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span> err :=<span style="color: rgba(0, 0, 0, 1)"> range consumer.Errors() {
log.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error: %s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err.Error())
}
}()
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> consume notifications</span>
<span style="color: rgba(0, 0, 0, 1)"> go func() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span> ntf :=<span style="color: rgba(0, 0, 0, 1)"> range consumer.Notifications() {
log.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Rebalanced: %+v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ntf)
}
}()
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 循环从通道中获取message
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Topic 消息主题
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Partition消息分区
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Offset
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Key
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Value 消息值</span>
<span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">case</span> msg, ok := <-<span style="color: rgba(0, 0, 0, 1)">consumer.Messages():
</span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> ok {
fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">%s receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span>, groupName, msg.Topic, msg.Partition, msg.Offset, <span style="color: rgba(0, 0, 255, 1)">string</span>(msg.Key), <span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">(msg.Value))
consumer.MarkOffset(msg, </span><span style="color: rgba(128, 0, 0, 1)">""</span>) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 上报offset</span>
<span style="color: rgba(0, 0, 0, 1)"> }
</span><span style="color: rgba(0, 0, 255, 1)">case</span> err := <-<span style="color: rgba(0, 0, 0, 1)">consumer.Errors():
{
fmt.Println(fmt.Sprintf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">consumer error:%v</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err))
}
</span><span style="color: rgba(0, 0, 255, 1)">case</span> <-<span style="color: rgba(0, 0, 0, 1)">signals:
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">同步消息模式</span>
<span style="color: rgba(0, 0, 0, 1)">func syncProducer() {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">指定配置</span>
config :=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewConfig()
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 等待服务器所有副本都保存成功后的响应</span>
config.Producer.RequiredAcks =<span style="color: rgba(0, 0, 0, 1)"> sarama.WaitForAll
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区</span>
config.Producer.Partitioner =<span style="color: rgba(0, 0, 0, 1)"> sarama.NewRandomPartitioner
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 是否等待成功和失败后的响应</span>
config.Producer.Return.Successes = <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
config.Producer.Timeout </span>= <span style="color: rgba(128, 0, 128, 1)">5</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second
producer, err :</span>=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewSyncProducer(Address, config)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
log.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">sarama.NewSyncProducer err, message=%s \n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
defer producer.Close()
msg :</span>= &<span style="color: rgba(0, 0, 0, 1)">sarama.ProducerMessage{
Topic: Topic,
}
</span><span style="color: rgba(0, 0, 255, 1)">var</span> i = <span style="color: rgba(128, 0, 128, 1)">1100</span>
<span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
i</span>++
<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">将字符串转换为字节数组</span>
msg.Value = sarama.ByteEncoder(fmt.Sprintf(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">this is a message:%d</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, i))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">SendMessage:该方法是生产者生产给定的消息
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">partition, offset, err := producer.SendMessage(msg)</span>
_, _, err :=<span style="color: rgba(0, 0, 0, 1)"> producer.SendMessage(msg)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">生产失败的时候返回error</span>
<span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
fmt.Println(fmt.Sprintf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Send message Fail %v</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err))
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">生产成功的时候返回该消息的分区和所在的偏移量
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">fmt.Printf("send message Partition = %d, offset=%d\n", partition, offset)</span>
<span style="color: rgba(0, 0, 0, 1)">
time.Sleep(time.Second </span>* <span style="color: rgba(128, 0, 128, 1)">5</span><span style="color: rgba(0, 0, 0, 1)">)
}
}
func ConsumerDemo3() {
config :</span>=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewConfig()
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Version 必须大于等于V0_10_2_0</span>
config.Version =<span style="color: rgba(0, 0, 0, 1)"> sarama.V0_10_2_1
config.Consumer.Return.Errors </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">start connect kafka</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开始连接kafka服务器
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">group, err := sarama.NewConsumerGroup(Address, "demo3", config)</span>
client, err :=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewClient(Address, config)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">connect kafka failed; err</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
defer func() { _ </span>=<span style="color: rgba(0, 0, 0, 1)"> client.Close() }()
</span><span style="color: rgba(128, 128, 128, 1)">///</span><span style="color: rgba(0, 128, 0, 1)">/</span>
group, err := sarama.NewConsumerGroupFromClient(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo3</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, client)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">connect kafka failed; err</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 检查错误</span>
<span style="color: rgba(0, 0, 0, 1)"> go func() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span> err :=<span style="color: rgba(0, 0, 0, 1)"> range group.Errors() {
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">group errors : </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
}
}()
ctx :</span>=<span style="color: rgba(0, 0, 0, 1)"> context.Background()
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">start get msg</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> for 是应对 consumer rebalance</span>
<span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 需要监听的主题</span>
topics := []<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">{Topic}
handler :</span>=<span style="color: rgba(0, 0, 0, 1)"> ConsumerGroupHandler{}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里</span>
err :=<span style="color: rgba(0, 0, 0, 1)"> group.Consume(ctx, topics, handler)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">consume failed; err : </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
}
}
type ConsumerGroupHandler </span><span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)">{}
func (ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> nil
}
func (ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> nil
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 这个方法用来消费消息的</span>
<span style="color: rgba(0, 0, 0, 1)">func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 获取消息</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> msg :=<span style="color: rgba(0, 0, 0, 1)"> range claim.Messages() {
fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo3 receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span>, msg.Topic, msg.Partition, msg.Offset, <span style="color: rgba(0, 0, 255, 1)">string</span>(msg.Key), <span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">(msg.Value))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 将消息标记为已使用</span>
sess.MarkMessage(msg, <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)">)
}
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> nil
}</span></pre>
</div>
<p>运行结果: </p>
<p><img src="https://img2020.cnblogs.com/blog/209993/202103/209993-20210316150957561-1538423078.png"></p>
<p><img src="https://img2020.cnblogs.com/blog/209993/202110/209993-20211027215328244-1106016143.png"></p>
<p> https://github.com/bsm/sarama-cluster</p>
</div>
<div id="MySignature" role="contentinfo">
windows技术爱好者<br><br>
来源:https://www.cnblogs.com/majiang/p/14543566.html
頁:
[1]