香菲儿 發表於 2021-3-16 15:01:00

go kafka group

<p>在以前的文章kafka初探go和C#的实现里面我们用了sarama来消费kafka的消息,但是很遗憾它没有group的概念。没办法 我们只能用sarama-cluster来实现, 注意sarama版本不要太新否则有错误panic: non-positive interval for NewTicker 问题处理,建议大家可以修改go.mod文件如下:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">require (
    github.com</span>/Shopify/sarama v1.<span style="color: rgba(128, 0, 128, 1)">24.1</span><span style="color: rgba(0, 0, 0, 1)">
    github.com</span>/bsm/sarama-cluster v2.<span style="color: rgba(128, 0, 128, 1)">1.15</span>+<span style="color: rgba(0, 0, 0, 1)">incompatible<br></span></pre>
</div>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">package main

import (
    </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">context</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">fmt</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">log</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">os</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">os/signal</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
    _ </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">regexp</span><span style="color: rgba(128, 0, 0, 1)">"</span>
    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span>

    <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/Shopify/sarama</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
    cluster </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">github.com/bsm/sarama-cluster</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)

</span><span style="color: rgba(0, 0, 255, 1)">var</span> Address = []<span style="color: rgba(0, 0, 255, 1)">string</span>{<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">192.168.100.30:9092</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">}
</span><span style="color: rgba(0, 0, 255, 1)">var</span> Topic = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">gavintest</span><span style="color: rgba(128, 0, 0, 1)">"</span>

<span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">panic: non-positive interval for NewTicker
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 修改go.mod
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">github.com/Shopify/sarama v1.24.1
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> github.com/bsm/sarama-cluster v2.1.15+incompatible
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">修改</span><span style="color: rgba(0, 128, 0, 1)">
/*</span><span style="color: rgba(0, 128, 0, 1)">*
消费者
</span><span style="color: rgba(0, 128, 0, 1)">*/</span><span style="color: rgba(0, 0, 0, 1)">
func main() {
    go syncConsumer(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo1</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
    go syncConsumer(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo2</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
    go ConsumerDemo3()
    go syncProducer()
    </span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {}

}

func syncConsumer(groupName </span><span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">) {
    config :</span>=<span style="color: rgba(0, 0, 0, 1)"> cluster.NewConfig()
    config.Consumer.Return.Errors </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    config.Group.Return.Notifications </span>= <span style="color: rgba(0, 0, 255, 1)">true</span>

    <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> init consumer
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">可以订阅多个主题</span>
    topics := []<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">{Topic}
    consumer, err :</span>=<span style="color: rgba(0, 0, 0, 1)"> cluster.NewConsumer(Address, groupName, topics, config)
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      panic(err)
    }
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">这里需要注意的是defer 一定要在panic 之后才能关闭连接</span>
<span style="color: rgba(0, 0, 0, 1)">    defer consumer.Close()

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> trap SIGINT to trigger a shutdown.</span>
    signals := make(chan os.Signal, <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">)
    signal.Notify(signals, os.Interrupt)

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> consume errors</span>
<span style="color: rgba(0, 0, 0, 1)">    go func() {
      </span><span style="color: rgba(0, 0, 255, 1)">for</span> err :=<span style="color: rgba(0, 0, 0, 1)"> range consumer.Errors() {
            log.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Error: %s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err.Error())
      }
    }()

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> consume notifications</span>
<span style="color: rgba(0, 0, 0, 1)">    go func() {
      </span><span style="color: rgba(0, 0, 255, 1)">for</span> ntf :=<span style="color: rgba(0, 0, 0, 1)"> range consumer.Notifications() {
            log.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Rebalanced: %+v\n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, ntf)
      }
    }()

    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 循环从通道中获取message
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Topic 消息主题
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Partition消息分区
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Offset
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Key
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">msg.Value 消息值</span>
    <span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
      </span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
      </span><span style="color: rgba(0, 0, 255, 1)">case</span> msg, ok := &lt;-<span style="color: rgba(0, 0, 0, 1)">consumer.Messages():
            </span><span style="color: rgba(0, 0, 255, 1)">if</span><span style="color: rgba(0, 0, 0, 1)"> ok {
                fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">%s receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span>, groupName, msg.Topic, msg.Partition, msg.Offset, <span style="color: rgba(0, 0, 255, 1)">string</span>(msg.Key), <span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">(msg.Value))
                consumer.MarkOffset(msg, </span><span style="color: rgba(128, 0, 0, 1)">""</span>) <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 上报offset</span>
<span style="color: rgba(0, 0, 0, 1)">            }
      </span><span style="color: rgba(0, 0, 255, 1)">case</span> err := &lt;-<span style="color: rgba(0, 0, 0, 1)">consumer.Errors():
            {
                fmt.Println(fmt.Sprintf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">consumer error:%v</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err))
            }
      </span><span style="color: rgba(0, 0, 255, 1)">case</span> &lt;-<span style="color: rgba(0, 0, 0, 1)">signals:
            </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
      }
    }
}

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">同步消息模式</span>
<span style="color: rgba(0, 0, 0, 1)">func syncProducer() {
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">指定配置</span>
    config :=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewConfig()
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 等待服务器所有副本都保存成功后的响应</span>
    config.Producer.RequiredAcks =<span style="color: rgba(0, 0, 0, 1)"> sarama.WaitForAll
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 随机的分区类型:返回一个分区器,该分区器每次选择一个随机分区</span>
    config.Producer.Partitioner =<span style="color: rgba(0, 0, 0, 1)"> sarama.NewRandomPartitioner
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 是否等待成功和失败后的响应</span>
    config.Producer.Return.Successes = <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    config.Producer.Timeout </span>= <span style="color: rgba(128, 0, 128, 1)">5</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second
    producer, err :</span>=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewSyncProducer(Address, config)
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      log.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">sarama.NewSyncProducer err, message=%s \n</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
    }
    defer producer.Close()
    msg :</span>= &amp;<span style="color: rgba(0, 0, 0, 1)">sarama.ProducerMessage{
      Topic: Topic,
    }

    </span><span style="color: rgba(0, 0, 255, 1)">var</span> i = <span style="color: rgba(128, 0, 128, 1)">1100</span>
    <span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
      i</span>++
      <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">将字符串转换为字节数组</span>
      msg.Value = sarama.ByteEncoder(fmt.Sprintf(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">this is a message:%d</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, i))
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">SendMessage:该方法是生产者生产给定的消息
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">partition, offset, err := producer.SendMessage(msg)</span>
      _, _, err :=<span style="color: rgba(0, 0, 0, 1)"> producer.SendMessage(msg)
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">生产失败的时候返回error</span>
      <span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
            fmt.Println(fmt.Sprintf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Send message Fail %v</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err))
      }
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">生产成功的时候返回该消息的分区和所在的偏移量
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">fmt.Printf("send message Partition = %d, offset=%d\n", partition, offset)</span>
<span style="color: rgba(0, 0, 0, 1)">
      time.Sleep(time.Second </span>* <span style="color: rgba(128, 0, 128, 1)">5</span><span style="color: rgba(0, 0, 0, 1)">)
    }
}

func ConsumerDemo3() {
    config :</span>=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewConfig()
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Version 必须大于等于V0_10_2_0</span>
    config.Version =<span style="color: rgba(0, 0, 0, 1)"> sarama.V0_10_2_1
    config.Consumer.Return.Errors </span>= <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
    fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">start connect kafka</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 开始连接kafka服务器
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">group, err := sarama.NewConsumerGroup(Address, "demo3", config)</span>
    client, err :=<span style="color: rgba(0, 0, 0, 1)"> sarama.NewClient(Address, config)
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">connect kafka failed; err</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
    }
    defer func() { _ </span>=<span style="color: rgba(0, 0, 0, 1)"> client.Close() }()
    </span><span style="color: rgba(128, 128, 128, 1)">///</span><span style="color: rgba(0, 128, 0, 1)">/</span>
    group, err := sarama.NewConsumerGroupFromClient(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo3</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, client)
    </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
      fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">connect kafka failed; err</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
      </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
    }
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 检查错误</span>
<span style="color: rgba(0, 0, 0, 1)">    go func() {
      </span><span style="color: rgba(0, 0, 255, 1)">for</span> err :=<span style="color: rgba(0, 0, 0, 1)"> range group.Errors() {
            fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">group errors : </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
      }
    }()

    ctx :</span>=<span style="color: rgba(0, 0, 0, 1)"> context.Background()
    fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">start get msg</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> for 是应对 consumer rebalance</span>
    <span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 需要监听的主题</span>
      topics := []<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">{Topic}
      handler :</span>=<span style="color: rgba(0, 0, 0, 1)"> ConsumerGroupHandler{}
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 启动kafka消费组模式,消费的逻辑在上面的 ConsumeClaim 这个方法里</span>
      err :=<span style="color: rgba(0, 0, 0, 1)"> group.Consume(ctx, topics, handler)

      </span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
            fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">consume failed; err : </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, err)
            </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
      }
    }
}

type ConsumerGroupHandler </span><span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)">{}

func (ConsumerGroupHandler) Setup(sess sarama.ConsumerGroupSession) error {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> nil
}

func (ConsumerGroupHandler) Cleanup(sess sarama.ConsumerGroupSession) error {
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> nil
}

</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 这个方法用来消费消息的</span>
<span style="color: rgba(0, 0, 0, 1)">func (h ConsumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
    </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 获取消息</span>
    <span style="color: rgba(0, 0, 255, 1)">for</span> msg :=<span style="color: rgba(0, 0, 0, 1)"> range claim.Messages() {
      fmt.Printf(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">demo3 receive message %s---Partition:%d, Offset:%d, Key:%s, Value:%s\n</span><span style="color: rgba(128, 0, 0, 1)">"</span>, msg.Topic, msg.Partition, msg.Offset, <span style="color: rgba(0, 0, 255, 1)">string</span>(msg.Key), <span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">(msg.Value))
      </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 将消息标记为已使用</span>
      sess.MarkMessage(msg, <span style="color: rgba(128, 0, 0, 1)">""</span><span style="color: rgba(0, 0, 0, 1)">)
    }
    </span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> nil
}</span></pre>
</div>
<p>运行结果:&nbsp;</p>
<p><img src="https://img2020.cnblogs.com/blog/209993/202103/209993-20210316150957561-1538423078.png"></p>
<p><img src="https://img2020.cnblogs.com/blog/209993/202110/209993-20211027215328244-1106016143.png"></p>
<p>&nbsp;&nbsp;https://github.com/bsm/sarama-cluster</p>

</div>
<div id="MySignature" role="contentinfo">
    windows技术爱好者<br><br>
来源:https://www.cnblogs.com/majiang/p/14543566.html
頁: [1]
查看完整版本: go kafka group