李丽丽 發表於 2021-3-8 21:14:00

如何在 C# 中使用 Channels

<p>在面对<code>生产者-消费者</code> 的场景下, netcore 提供了一个新的命名空间 <code>System.Threading.Channels</code> 来帮助我们更高效的处理此类问题,有了这个 Channels 存在, <code>生产者</code> 和 <code>消费者</code> 可以各自处理自己的任务而不相互干扰,有利于两方的并发处理,这篇文章我们就来讨论下如何使用 <code>System.Threading.Channels</code>。</p>
<h2 id="dataflow-vs-channel">Dataflow vs Channel</h2>
<p>在 <code>System.Threading.Tasks.Dataflow</code> 命名空间下提供了一个数据流库,主要封装了 <code>存储</code> 和 <code>处理</code> 两大块,该库专注于 pipeline 处理,而 <code>System.Threading.Tasks.Channels</code> 主要专注于 <code>存储</code> 这块,从单一职责上来说,在 <code>生产者-消费者</code> 场景下,Channels 比 Dataflow 性能要高得多。</p>
<h2 id="为什么要使用-channels">为什么要使用 Channels</h2>
<p>可以利用 Channels 来实现 <code>生产者和消费者</code> 之间的解耦,大体上有两个好处:</p>
<ul>
<li>
<p>生产者 和 消费者 是相互独立的,两者可以并行执行。</p>
</li>
<li>
<p>如果生产者不给力,可以创建多个的生产者,如果消费者不给力,可以创建更多的消费者。</p>
</li>
</ul>
<p>总的来说,在 <code>生产者-消费者</code> 模式下可以帮助我们提高应用程序的吞吐率。</p>
<h2 id="安装-systemthreadingchannels">安装 System.Threading.Channels</h2>
<p>要想使用 Channel,需要用 nuget 引用 <code>System.Threading.Channels</code> 包,还可以通过 Visual Studio 2019 的 <code>NuGet package manager</code> 可视化界面安装 或者 通过 <code>NuGet package manager</code> 命令行工具输入以下命令:</p>
<pre><code class="language-C#">
dotnet add package System.Threading.Channels

</code></pre>
<h2 id="创建-channel">创建 channel</h2>
<p>本质上来说,你可以创建两种类型的 channel,一种是有限容量的 <code>bound channel</code>,一种是无限容量的 <code>unbound channel</code>,接下来的问题是,如何创建呢?Channels 提供了两种 <strong>工厂方法</strong> 用于创建,如下代码所示:</p>
<ul>
<li>
<p><code>CreateBounded&lt;T&gt;</code> 创建的 channel 是一个有消息上限的通道。</p>
</li>
<li>
<p><code>CreateUnbounded&lt;T&gt;</code> 创建的 channel 是一个无消息上限的通道。</p>
</li>
</ul>
<p>下面的代码片段展示了如何创建 <code>unbounded channel</code>,并且只能存放 string 类型。</p>
<pre><code class="language-C#">
      static void Main(string[] args)
      {
            var channel = Channel.CreateUnbounded&lt;string&gt;();
      }
</code></pre>
<p>对了,<code>Bounded channel</code> 还提供了一个 FullMode 属性,用于指定当 channel 已满时该如何对插入的 message 进行处理,通常有四种做法。</p>
<ul>
<li>
<p>Wait</p>
</li>
<li>
<p>DropWrite</p>
</li>
<li>
<p>DropNewest</p>
</li>
<li>
<p>DropOldest</p>
</li>
</ul>
<p>下面的代码片段展示了如何在 <code>Bounded channel</code> 上使用 FullMode。</p>
<pre><code class="language-C#">
      static void Main(string[] args)
      {
            var channel = Channel.CreateBounded&lt;string&gt;(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });
      }

</code></pre>
<h2 id="将消息写入到-channel">将消息写入到 channel</h2>
<p>要想将 message 写入到 channel,可以使用 <code>WriteAsync()</code> 方法,如下代码所示:</p>
<pre><code class="language-C#">
      static async Task Main(string[] args)
      {
            var channel = Channel.CreateBounded&lt;string&gt;(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });

            await channel.Writer.WriteAsync("Hello World!");
      }

</code></pre>
<h2 id="从-channel-中读取消息">从 channel 中读取消息</h2>
<p>要想从 channel 中读取 message,可以使用 <code>ReadAsync()</code>,如下代码所示:</p>
<pre><code class="language-C#">
      static async Task Main(string[] args)
      {
            var channel = Channel.CreateBounded&lt;string&gt;(new BoundedChannelOptions(1000)
            {
                FullMode = BoundedChannelFullMode.Wait
            });

            while (await channel.Reader.WaitToReadAsync())
            {
                if (channel.Reader.TryRead(out var message))
                {
                  Console.WriteLine(message);
                }
            }
      }

</code></pre>
<h2 id="systemthreadingchannels-例子">System.Threading.Channels 例子</h2>
<p>下面是完整的代码清单,展示了如何从 channel 中读写 message。</p>
<pre><code class="language-C#">
    class Program
    {
      static async Task Main(string[] args)
      {
            await SingleProducerSingleConsumer();

            Console.ReadKey();
      }

      public static async Task SingleProducerSingleConsumer()
      {
            var channel = Channel.CreateUnbounded&lt;int&gt;();
            var reader = channel.Reader;
            for (int i = 0; i &lt; 10; i++)
            {
                await channel.Writer.WriteAsync(i + 1);
            }

            while (await reader.WaitToReadAsync())
            {
                if (reader.TryRead(out var number))
                {
                  Console.WriteLine(number);
                }
            }
      }
    }

</code></pre>
<p><img src="https://p3-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/ddd5fbe003fc4b7db33c5ef3faa118be~tplv-k3u1fbpfcp-zoom-1.image"></p>
<p>可以看到,控制台中输出了数字 <code>1-10</code>,这些数字正是 Writer 写入到 channel 中的,对吧。</p>
<p>总的来说,要想使用 <code>生产者-消费者</code> 场景,有几种实现途径,比如:BlockingCollection 和 TPL Dataflow,但本篇介绍的 Channels 要比前面的两种性能更高,关于 Channels 更多的细节,我会在未来的文章中进行讨论,如果您现在想急于了解的话,可以参考MSDN: https://docs.microsoft.com/en-us/dotnet/api/system.threading.channels?view=netcore-3.0</p>
<blockquote>
<p>译文链接:https://www.infoworld.com/article/3445156/how-to-use-systemthreadingchannels-in-net-core.html</p>
</blockquote><br><br>
来源:https://www.cnblogs.com/ireadme/p/14502286.html
頁: [1]
查看完整版本: 如何在 C# 中使用 Channels