枫也木胜 發表於 2020-5-17 22:51:00

.Net(c#)使用 Kafka 小结

<h1 id="netc使用-kafka-小结">.Net(c#)使用 Kafka 小结</h1>
<h2 id="1开篇">1.开篇</h2>
<p>由于项目中必须使用 kafka 来作为消息组件,所以使用 kafka 有一段时间了。不得不感叹 kafka 是一个相当优秀的消息系统。下面直接对使用过程做一总结,希望对大家有用。</p>
<h3 id="11kafka-部署">1.1.kafka 部署</h3>
<p>kafka 的简单搭建我们使用 docker 进行,方便快捷单节点。生产环境不推荐这样的单节点 kafka 部署。</p>
<h4 id="111确保安装了-docker-和-docker-compose">1.1.1.确保安装了 docker 和 docker-compose</h4>
<p>网上很多教程,安装也简单,不作为重点赘述。</p>
<h4 id="112编写-docker-composeyml">1.1.2.编写 docker-compose.yml</h4>
<p>将以下内容直接复制到新建空文件<code>docker-compose.yml</code>中。</p>
<pre><code class="language-yml">version: "3"
services:
zookeeper:
    image: wurstmeister/zookeeper
    ports:
      - "2181:2181"
kafka:
    image: wurstmeister/kafka
    depends_on:
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: localhost
      KAFKA_CREATE_TOPICS: "test"
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
</code></pre>
<h4 id="113容器构建提交">1.1.3.容器构建提交</h4>
<p>在<code>docker-compose.yml</code>文件的目录下执行以下命令:</p>
<pre><code class="language-sh">docker-compose build # 打包
docker-compose up # 启动, 添加 -d 可以后台启动。
</code></pre>
<p>看到日志输出:</p>
<pre><code class="language-sh">Creating network "desktop_default" with the default driver
Creating desktop_zookeeper_1 ... done
Creating desktop_kafka_1   ... done
Attaching to desktop_zookeeper_1, desktop_kafka_1
zookeeper_1| ZooKeeper JMX enabled by default
zookeeper_1| Using config: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
zookeeper_1| 2020-05-17 03:34:31,794 - INFO - Reading configuration from: /opt/zookeeper-3.4.13/bin/../conf/zoo.cfg
...
zookeeper_1| 2020-05-17 03:34:31,872 - INFO - tickTime set to 2000
...
kafka_1      | Excluding KAFKA_VERSION from broker config
</code></pre>
<p>没有错误输出说明部署成功。</p>
<h2 id="2kafka-客户端选择">2.kafka 客户端选择</h2>
<p>在 github 上能够找到好几个 c#可以使用的 kafka 客户端。大家可以去搜一下,本文就只说明rdkafka-dotnet和confluent-kafka-dotnet。</p>
<h3 id="21rdkafka-dotnet">2.1.rdkafka-dotnet</h3>
<p>我们生产环境中就使用的该客户端。在该项目 github 首页中可以看到:</p>
<pre><code class="language-csharp">var config = new Config() { GroupId = "example-csharp-consumer" };
using (var consumer = new EventConsumer(config, "127.0.0.1:9092"))
{
    consumer.OnMessage += (obj, msg) =&gt;
    {
      //...
    };
}
</code></pre>
<p>没错,使用它的原因就是它提供了<strong>EventConsumer</strong>,可以直接异步订阅消息。整体上来说该客户端非常的稳定,性能优良。使用过程中比较难缠的就是它的配置,比较不直观。它基于librdkafka(C/C++)实现,配置 Config 类中显式配置比较少,大多数是通过字典配置的,比如:</p>
<pre><code class="language-csharp">var config = new Config();
config["auto.offset.reset"] = "earliest";//配置首次消息偏移位置为最早
</code></pre>
<p>这对于新手来说并不是很友好,很难想到去这样配置。当然如果有 <strong>librdkafka</strong> 的使用经验会好很多。大多数配置在 <strong>librdkafka</strong> 项目的CONFIGURATION。</p>
<p>还有一个需要注意的是 Broker 的版本支持Broker version support: &gt;=0.8,也在 librdkafka 项目中可以找到。</p>
<h3 id="22-confluent-kafka-dotnet">2.2 confluent-kafka-dotnet</h3>
<p>confluent-kafka-dotnet 是 rdkafka-dotnet(好几年没有维护了)的官方后续版本。推荐使用 confluent-kafka-dotnet,因为配置相对友好,更加全面。比如:</p>
<pre><code class="language-csharp">var conf = new ConsumerConfig
{
    AutoOffsetReset = AutoOffsetReset.Earliest//显式强类型赋值配置
};
</code></pre>
<p>对于 EventConsumer 怎么办呢?在项目变更记录中已经明确提出移除了 OnMessage 多播委托,而 EventConsumer,也就不存在了。但这不难,我们可以参照基项目写一个:</p>
<pre><code class="language-csharp">public class EventConsumer&lt;TKey, TValue&gt; : IDisposable
{
    private Task _consumerTask;
    private CancellationTokenSource _consumerCts;
    public IConsumer&lt;TKey, TValue&gt; Consumer { get; }
    public ConsumerBuilder&lt;TKey, TValue&gt; Builder { get; set; }
    public EventConsumer(IEnumerable&lt;KeyValuePair&lt;string, string&gt;&gt; config)
    {
      Builder = new ConsumerBuilder&lt;TKey, TValue&gt;(config);
      Consumer = Builder.Build();
    }
    public event EventHandler&lt;ConsumeResult&lt;TKey, TValue&gt;&gt; OnConsumeResult;
    public event EventHandler&lt;ConsumeException&gt; OnConsumeException;
    public void Start()
    {
      if (Consumer.Subscription?.Any() != true)
      {
            throw new InvalidOperationException("Subscribe first using the Consumer.Subscribe() function");
      }
      if (_consumerTask != null)
      {
            return;
      }
      _consumerCts = new CancellationTokenSource();
      var ct = _consumerCts.Token;
      _consumerTask = Task.Factory.StartNew(() =&gt;
      {
            while (!ct.IsCancellationRequested)
            {
                try
                {
                  var cr = Consumer.Consume(TimeSpan.FromSeconds(1));
                  if (cr == null) continue;
                  OnConsumeResult?.Invoke(this, cr);
                }
                catch (ConsumeException e)
                {
                  OnConsumeException?.Invoke(this, e);
                }
            }
      }, ct, TaskCreationOptions.LongRunning, TaskScheduler.Default);
    }
    public async Task Stop()
    {
      if (_consumerCts == null || _consumerTask == null) return;
      _consumerCts.Cancel();
      try
      {
            await _consumerTask;
      }
      finally
      {
            _consumerTask = null;
            _consumerCts = null;
      }
    }
    public void Dispose()
    {
      if (_consumerTask != null)
      {
            Stop().Wait();
      }
      Consumer?.Dispose();
    }
}
</code></pre>
<p>使用测试:</p>
<pre><code class="language-csharp">static async Task Main(string[] args)
{
    Console.WriteLine("Hello World!");
    var conf = new ConsumerConfig
    {
      GroupId = "test-consumer-group",
      BootstrapServers = "localhost:9092",
      AutoOffsetReset = AutoOffsetReset.Earliest,
    };
    var eventConsumer = new EventConsumer&lt;Ignore, string&gt;(conf);
    eventConsumer.Consumer.Subscribe(new[] {"test"});
    eventConsumer.OnConsumeResult += (sen, cr) =&gt;
    {
      Console.WriteLine($"Receive '{cr.Message.Value}' from '{cr.TopicPartitionOffset}'");
    };
    do
    {
      var line = Console.ReadLine();
      switch (line)
      {
            case "stop":
                eventConsumer.Stop();
                break;
            case "start":
                eventConsumer.Start();
                break;
      }
    } while (true);
}
</code></pre>
<h2 id="3功能扩展">3.功能扩展</h2>
<p><strong>!!!以下讨论都是对confluent-kafka-dotnet。</strong></p>
<p>由于用户终端也使用了 kafka 客户端订阅消息。如果终端长时间没有上线,并且消息过期时间也较长,服务端会存有大量消息。终端一上线就会读取到大量的堆积消息,很容易就把内存耗尽了。考虑到客户端不是长期在线的场景,无需不间断的处理所有消息,服务端才适合这个角色(:。所以客户端只需每次从登录时的最新点开始读取就可以了,历史性统计就交给服务器去做。</p>
<p>最便捷的方法是每次客户端连接都使用新的groupid,用时间或者guid撒盐。但这样会使服务端记录大量的group信息(如果终端很多m个,并且终端断开连接重连的次数也会很多随机n次,那么也是m*n个group信息),势必对服务端性能造成影响。</p>
<p>另一种方法是在保持groupid不变的情况下,修改消费偏移。那如何去设置位置偏移为最新点呢?</p>
<h3 id="31-错误思路-autooffsetreset">3.1 错误思路 AutoOffsetReset</h3>
<p>在配置中存在一个让新手容易产生误解的配置项<strong>AutoOffsetReset.Latest</strong>自动偏移到最新位置。当你兴冲冲的准备大干一番时发现只有首次创建<strong>GroupId</strong>时会起作用,当 groupid 已经存在 kafka 记录中时它就不管用了。</p>
<h3 id="32-提交偏移-commit">3.2 提交偏移 Commit</h3>
<p>我们能够在<code>IConsumer&lt;TKey, TValue&gt;</code>中找到该 commit 方法,它有三个重载:</p>
<pre><code>1. 无参函数。就是提交当前客户端`IConsumer&lt;TKey, TValue&gt;.Assignment`记录的偏移。
2. 参数ConsumeResult&lt;TKey, TValue&gt;。一次仅提交一个偏移。当然配置中默认设置为自动提交(`conf.EnableAutoCommit = true;`),无需手动提交。
3. 参数IEnumerable&lt;TopicPartitionOffset&gt; offsets。直接提交到某一个位置。TopicPartitionOffset有三个决定性属性:话题topic、分区:partition、偏移offset。
</code></pre>
<p>第三个函数就是我们想要的,我们只需得到对应参数TopicPartitionOffset的值就可以。</p>
<h4 id="321topicpartition的获取">3.2.1.TopicPartition的获取</h4>
<p>topic 是我们唯一可以确定的。在<code>IConsumer&lt;TKey, TValue&gt;.Assignment</code>中可以得到 topic 和 partition。但遗憾的是它只有不会立即有值。我们只能主动去服务端获取,在<code>IAdminClient</code>中找到了可获取该信息的方法,所以我们做一扩展:</p>
<pre><code class="language-csharp">public static IEnumerable&lt;TopicPartition&gt; GetTopicPartitions(ConsumerConfig config, string topic, TimeSpan timeout)
{
    using var adv = new AdminClientBuilder(config).Build();
    var topPns = adv.GetTopicPartition(topic, timeout);
    return topPns;
}

public static IEnumerable&lt;TopicPartition&gt; GetTopicPartition(this AdminClient client, string topic, TimeSpan timeout)
{
    var mta = client.GetMetadata(timeout);
    var topicPartitions = mta.Topics
      .Where(t =&gt; topic == t.Topic)
      .SelectMany(t =&gt; t.Partitions.Select(tt =&gt; new TopicPartition(t.Topic, tt.PartitionId)))
      .ToList();
    return topicPartitions;
}
</code></pre>
<h4 id="322-topicpartitionoffset获取">3.2.2. TopicPartitionOffset获取</h4>
<p>我们还差 offset 的值,通过<code>IConsumer&lt;TKey, TValue&gt;.QueryWatermarkOffsets</code>方法可以查到当前水位,而其中 High 水位就是最新偏移。</p>
<p>现在我们可以完成我们的任务了吗?问题再次出现,虽然客户端表现得从最新点消费了,但是在此之前的卡顿和类似与内存溢出让人不得心安。Commit 还是消费了所有消息:(,只不过暗搓搓的进行。在所有消息消费期间读取所有未消费,然后拼命提交。客户端哪有这么大的内存和性能呢。最终,找到一个和第三个 commit 方法一样接受参数的方法<code>Assign</code>,一试果然灵验。</p>
<pre><code class="language-csharp">public static void AssignOffsetToHighWatermark&lt;TKey, TValue&gt;(this IConsumer&lt;TKey, TValue&gt; consumer, TopicPartition partition, TimeSpan timeout)
{
    var water = consumer.QueryWatermarkOffsets(partition, timeout);
    if (water == null || water.High == 0) return;
    var offset = new TopicPartitionOffset(partition.Topic, partition.Partition, water.High);
    consumer.Assign(offset);
}
</code></pre>
<h4 id="323实际使用">3.2.3.实际使用</h4>
<p>最终的使用示例:</p>
<pre><code class="language-csharp">//...
var topicPartitions = ConsumerEx.GetTopicPartitions(conf, "test", TimeSpan.FromSeconds(5));
topicPartitions?.ToList().ForEach(t =&gt;
{
    eventConsumer.Consumer.AssignOffsetToHighWatermark(t, TimeSpan.FromSeconds(5));
});
eventConsumer.Start();//在消费事件开始之前就可以进行偏移设置
//...
</code></pre>
<p>请注意,如果您关闭了自动提交功能,并且不主动提交任何偏移信息,那么服务端对该 group 的偏移记录将一直不变,Assign 函数并不会改变任何服务的偏移记录。</p>
<h2 id="4总结">4.总结</h2>
<p>这一圈下来整个 kafka 的基本消费流程也就搞清楚了。kafka 消费者需要对消费的消息进行提交。事实上,每个消息体里都有偏移信息。不提交对于服务端来说就是客户端没有处理过该消息,将不会更改已消费偏移。以此来保证消息消费的可靠性。这和 tcp 中三次握手有异曲同工之妙。</p>
<p>服务端保存着每一个 groupid 对应的已经提交偏移<code>Committed Offset</code>。当然客户端不提交它是不会变更的(不考虑直接操作服务端的形式)。</p>
<p>客户端保存自己的当前偏移<code>Current Offset</code>,可以通过<code>Assign</code>和<code>Commit</code>进行更改,二者区别是<code>Commit</code>将连同提交到服务端对应的偏移中进行更改,而<code>Assign</code>仅改变客户端偏移,这一更改记录在<code>IConsumer&lt;TKey, TValue&gt;.Assignment</code>中,首次启动时客户端异步向服务端请求<code>Committed Offset</code>来对其赋值。这就是在 3.2 节中我们没有立即得到该值的的原因,该值将在可能在几秒中后被赋值,所以写了一个主动获取的方法<code>GetTopicPartition</code>。客户端下一次消费将根据<code>IConsumer&lt;TKey, TValue&gt;.Assignment</code>进行。</p>
<p>使用<code>AdminClientBuilder.GetMetadata</code>函数可以得到对应话题的元数据,包括:topic、partition、Brokers 等。</p>
<p>使用<code>IConsumer&lt;TKey, TValue&gt;.QueryWatermarkOffsets</code>函数可以得到当前服务端的水位,low 为最早的偏移(可能不是 0,考虑消息过期被删除的情况),high 为最新的偏移。</p><br><br>
来源:https://www.cnblogs.com/hsxian/p/12907542.html
頁: [1]
查看完整版本: .Net(c#)使用 Kafka 小结