雲小雲 發表於 2019-7-12 06:44:00

C#8.0: 在 LINQ 中支持异步的 IAsyncEnumerable

<p>C# 8.0中,提供了一种新的<code>IAsyncEnumerable&lt;T&gt;</code>接口,在对集合进行迭代时,支持异步操作。比如在读取文本中的多行字符串时,如果读取每行字符串的时候使用同步方法,那么会导致线程堵塞。<code>IAsyncEnumerable&lt;T&gt;</code>可以解决这种情况,在迭代的时候支持使用异步方法。也就是说,之前我们使用<code>foreach</code>来对<code>IEnumerable</code>进行迭代,现在可以使用<code>await foreach</code>来对<code>IAsyncEnumerable&lt;T&gt;</code>进行迭代,每个项都是可等待的。这种新的接口称为<code>async-streams</code>,将会随<code>.NET Core 3</code>发布。我们来看一下如何在<code>LINQ</code>中实现异步的迭代。</p>
<h2><span>使用常规的<span><code>IEnumerable&lt;T&gt;</code></span></span></h2>
<p><span>首先我们创建一个新的<span><code>Console</code><span>项目,基于<span><code>.NET Core 3</code><span>:</span></span></span></span></span></p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">namespace</span><span style="color: rgba(0, 0, 0, 1)"> AsyncLinqDemo
{
   </span><span style="color: rgba(0, 0, 255, 1)">class</span><span style="color: rgba(0, 0, 0, 1)"> Program
{
       </span><span style="color: rgba(0, 0, 255, 1)">static</span> <span style="color: rgba(0, 0, 255, 1)">void</span> Main(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">[] args)
      {
         Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Input the file path:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
         </span><span style="color: rgba(0, 0, 255, 1)">var</span> file =<span style="color: rgba(0, 0, 0, 1)"> Console.ReadLine();
         </span><span style="color: rgba(0, 0, 255, 1)">var</span> lines =<span style="color: rgba(0, 0, 0, 1)"> ReadAllLines(file);
         </span><span style="color: rgba(0, 0, 255, 1)">foreach</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> line <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> lines)
          {
               Console.WriteLine(line);
          }
      }

       </span><span style="color: rgba(0, 0, 255, 1)">static</span> IEnumerable&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>&gt; ReadAllLines(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> file)
      {
         </span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> fs =<span style="color: rgba(0, 0, 0, 1)"> File.OpenRead(file))
          {
               </span><span style="color: rgba(0, 0, 255, 1)">using</span> (<span style="color: rgba(0, 0, 255, 1)">var</span> sr = <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)"> StreamReader(fs))
            {
                   </span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">)
                  {
                     </span><span style="color: rgba(0, 0, 255, 1)">string</span> line =<span style="color: rgba(0, 0, 0, 1)"> sr.ReadLine();
                     </span><span style="color: rgba(0, 0, 255, 1)">if</span>(line == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">)
                      {
                           </span><span style="color: rgba(0, 0, 255, 1)">break</span><span style="color: rgba(0, 0, 0, 1)">;
                      }
                     </span><span style="color: rgba(0, 0, 255, 1)">yield</span> <span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> line;
                  }
            }
          }
      }
}
}</span></pre>
</div>
<p>&nbsp;</p>
<p><span>这是一个很简单的Console程序,实现了一个简单的返回类型为<span><code>IEnumerable&lt;string&gt;</code><span>的<span><code>ReadAllLines(string file)</code><span>方法,从文本文件中逐行读取文本,并逐行输出。如果文本内容较少的话,没什么问题。但如果我们使用过<span><code>aync/await</code><span>,就会了解,在IO操作如读取或写入文件的时候,最好使用异步方法以避免线程阻塞。让我们来改进一下。</span></span></span></span></span></span></span></p>
<h2><span>使用异步的<span><code>IAsyncEnumerable&lt;T&gt;</code></span></span></h2>
<p><span>可以优化的是下面这句:</span></p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">string</span> line = sr.ReadLine();</pre>
</div>
<p>&nbsp;</p>
<p><span>对于IO操作,最好使用异步方式。这里可使用相应的异步方法:</span></p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 255, 1)">string</span> line = <span style="color: rgba(0, 0, 255, 1)">await</span> sr.ReadLineAsync();</pre>
</div>
<p>&nbsp;</p>
<p><span>我们说“异步是传染的”,如果这里使用异步,那么相应的该方法的返回值也要使用异步,所以需要将返回值改为<span><code>static async Task&lt;IEnumerable&lt;string&gt;&gt;</code><span>,但这样会得到一个错误:</span></span></span></p>
<div class="cnblogs_code">
<pre>ErrorCS1624The body of <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">Program.ReadAllLines(string)</span><span style="color: rgba(128, 0, 0, 1)">'</span> cannot be an iterator block because <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">Task&lt;IEnumerable&lt;string&gt;&gt;</span><span style="color: rgba(128, 0, 0, 1)">'</span> is not an iterator interface typeAsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs23Active</pre>
</div>
<p>&nbsp;</p>
<p><span>因为<span><code>Task&lt;IEnumerable&lt;string&gt;&gt;</code><span>并不是一个可以迭代的接口类型,所以我们无法在方法内部使用<span><code>yield</code><span>关键字。解决问题的办法是使用新的<span><code>IAsyncEnumerable</code><span>接口:</span></span></span></span></span></span></span></p>
<div class="cnblogs_code">
<pre>static async IAsyncEnumerable&lt;<span style="color: rgba(0, 0, 255, 1)">string</span>&gt; ReadAllLines(<span style="color: rgba(0, 0, 255, 1)">string</span> <span style="color: rgba(0, 0, 255, 1)">file</span><span style="color: rgba(0, 0, 0, 1)">)
{
   using (var fs </span>= File.OpenRead(<span style="color: rgba(0, 0, 255, 1)">file</span><span style="color: rgba(0, 0, 0, 1)">))
{
       using (var sr </span>=<span style="color: rgba(0, 0, 0, 1)"> new StreamReader(fs))
      {
         </span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">)
          {
               </span><span style="color: rgba(0, 0, 255, 1)">string</span> line =<span style="color: rgba(0, 0, 0, 1)"> await sr.ReadLineAsync();
               </span><span style="color: rgba(0, 0, 255, 1)">if</span>(line == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">)
            {
                   break;
            }
               yield return line;
          }

      }
}
}</span></pre>
</div>
<p>&nbsp;</p>
<p><span>按<span><code>F12</code><span>查看该接口的定义:</span></span></span></p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">namespace System.Collections.Generic
{
   public interface IAsyncEnumerable</span>&lt;out T&gt;<span style="color: rgba(0, 0, 0, 1)">
{
       IAsyncEnumerator</span>&lt;T&gt; GetAsyncEnumerator(CancellationTokencancellationToken =<span style="color: rgba(0, 0, 0, 1)"> default);
}
}</span></pre>
</div>
<p>&nbsp;</p>
<p><span>这是一个异步的迭代器,并提供了<span><code>CancellationToken</code><span>。再按<span><code>F12</code><span>查看<span><code>IAsyncEnumerator&lt;T&gt;</code><span>的定义,可发现里面是这样的:</span></span></span></span></span></span></span></p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">namespace System.Collections.Generic
{
   public interface IAsyncEnumerator</span>&lt;out T&gt;<span style="color: rgba(0, 0, 0, 1)"> : IAsyncDisposable
{
       T Current { get; }
       ValueTask</span>&lt;<span style="color: rgba(0, 0, 255, 1)">bool</span>&gt;<span style="color: rgba(0, 0, 0, 1)"> MoveNextAsync();
}
}</span></pre>
</div>
<p>&nbsp;</p>
<p><span>这里<span><code>MoveNextAsync()</code><span>方法实际是返回了一个结果类型为<span><code>bool</code><span>的<span><code>Task</code><span>,每次迭代都是可等待的,从而实现了迭代器的异步。</span></span></span></span></span></span></span></p>
<h2><span>使用<span><code>await foreach</code><span>消费<span><code>IAsyncEnumerable&lt;T&gt;</code></span></span></span></span></h2>
<p><span>当我们做了以上改动之后,<span><code>ReadAllLines()</code><span>方法返回的是一个支持异步的<span><code>IAsyncEnumerable</code><span>,那么在使用的时候,也不能简单的使用<span><code>foreach</code><span>了。修改<span><code>Main</code><span>方法如下:</span></span></span></span></span></span></span></span></span></p>
<div class="cnblogs_code">
<pre>static async Task Main(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)">[] args)
{
   Console.WriteLine(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Input the file path:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">);
   var </span><span style="color: rgba(0, 0, 255, 1)">file</span> =<span style="color: rgba(0, 0, 0, 1)"> Console.ReadLine();
   var lines </span>= ReadAllLines(<span style="color: rgba(0, 0, 255, 1)">file</span><span style="color: rgba(0, 0, 0, 1)">);
   await foreach (var line </span><span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> lines)
{
       Console.WriteLine(line);
}
}</span></pre>
</div>
<p>&nbsp;</p>
<p><span>首先在<span><code>foreach</code><span>之前添加<span><code>await</code><span>关键字,还要需要将<span><code>Main</code><span>方法由<span><code>void</code><span>改为<span><code>async Task</code><span>。这样整个程序都是异步执行了,不会再导致堵塞了。这个例子只是一个简单的demo,是否使用异步并不会感觉到明显的区别。如果在迭代内部需要比较重的操作,如从网络获取大量数据或读取大量磁盘文件,异步的优势还是会比较明显的。</span></span></span></span></span></span></span></span></span></span></span></p>
<h2><span>使用<span><code>LINQ</code><span>消费<span><code>IAsyncEnumerable&lt;T&gt;</code></span></span></span></span></h2>
<p><span>使用<span><code>LINQ</code><span>来操作集合是常用的功能。如果使用<span><code>IEnumberable</code><span>,在<span><code>Main</code><span>方法中可以做如下改动:</span></span></span></span></span></span></span></p>
<div class="cnblogs_code">
<pre>var lines = ReadAllLines(<span style="color: rgba(0, 0, 255, 1)">file</span><span style="color: rgba(0, 0, 0, 1)">);
var res </span>= from line <span style="color: rgba(0, 0, 255, 1)">in</span> lines where line.StartsWith(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">ERROR: </span><span style="color: rgba(128, 0, 0, 1)">"</span>) selectline.Substring(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">ERROR: </span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">.Length);
foreach (var line </span><span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> res)
{
   Console.WriteLine(line);
}</span></pre>
</div>
<p>&nbsp;</p>
<p><span>或:</span></p>
<div class="cnblogs_code">
<pre>var res = lines.Where(x =&gt; x.StartsWith(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">ERROR: </span><span style="color: rgba(128, 0, 0, 1)">"</span>)).Select(x =&gt; x.Substring(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">ERROR: </span><span style="color: rgba(128, 0, 0, 1)">"</span>.Length));</pre>
</div>
<p>&nbsp;</p>
<p><span>如果使用了新的<span><code>IAsyncEnumerable</code><span>,你会发现无法使用<span><code>Where</code><span>等操作符了:</span></span></span></span></span></p>
<div class="cnblogs_code">
<pre>ErrorCS1936Could not <span style="color: rgba(0, 0, 255, 1)">find</span> an implementation of the query pattern <span style="color: rgba(0, 0, 255, 1)">for</span> source type <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">IAsyncEnumerable&lt;string&gt;</span><span style="color: rgba(128, 0, 0, 1)">'</span>. <span style="color: rgba(128, 0, 0, 1)">'</span><span style="color: rgba(128, 0, 0, 1)">Where</span><span style="color: rgba(128, 0, 0, 1)">'</span> not found.AsyncLinqDemoC:\Source\Workspaces\Console\AsyncLinqDemo\AsyncLinqDemo\Program.cs16Active</pre>
</div>
<p>&nbsp;</p>
<p><span>目前<span><code>LINQ</code><span>还没有提供对<span><code>IAsyncEnumerable</code><span>的原生支持,不过微软提供了一个Nuget包来实现此功能。在项目中打开Nuget Package Manger搜索安装<span><code>System.Linq.Async</code><span>,注意该包目前还是预览版,所以要勾选<span><code>include prerelease</code><span>才能看到。安装该Nuget包后,Linq查询语句中的错误就消失了。</span></span></span></span></span></span></span></span></span></p>
<p><span>在<span><code>System.Linq.Async</code><span>这个包中,对每个同步的<span><code>LINQ</code><span>方法都做了相应的扩展。所以基本上代码无需什么改动即可正常编译。</span></span></span></span></span></p>
<p><span>对于<span><code>LINQ</code><span>中的条件语句,也可以使用<span><code>WhereAwait()</code><span>方法来支持<span><code>await</code><span>:</span></span></span></span></span></span></span></p>
<div class="cnblogs_code">
<pre>public static IAsyncEnumerable&lt;TSource&gt; WhereAwait&lt;TSource&gt;(thisIAsyncEnumerable&lt;TSource&gt; source, Func&lt;TSource, <span style="color: rgba(0, 0, 255, 1)">int</span>, ValueTask&lt;<span style="color: rgba(0, 0, 255, 1)">bool</span>&gt;&gt;predicate);</pre>
</div>
<p>&nbsp;</p>
<p><span>如需要在条件语句中进行IO或网络请求等异步操作,可以这样用:</span></p>
<div class="cnblogs_code">
<pre>var res = lines.WhereAwait(async x =&gt; await DoSomeHeavyOperationsAsync(x));</pre>
</div>
<p>&nbsp;</p>
<p><span><code>DoSomeHeavyOperationsAsync</code><span>方法的签名如下:</span></span></p>
<div class="cnblogs_code">
<pre>private static ValueTask&lt;<span style="color: rgba(0, 0, 255, 1)">bool</span>&gt; DoSomeHeavyOperationsAsync(<span style="color: rgba(0, 0, 255, 1)">string</span><span style="color: rgba(0, 0, 0, 1)"> x)
{
   </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">Do some works...</span>
}</pre>
</div>
<p>&nbsp;</p>
<h2><span>小结</span></h2>
<p><span>通过以上的示例,我们简要了解了如何使用<span><code>IAsyncEnumerable</code><span>接口以及如何在<span><code>LINQ</code><span>中实现异步查询。在使用该接口时,我们需要创建一个自定义方法返回<span><code>IAsyncEnumerable&lt;T&gt;</code><span>来代替<span><code>IEnumberable&lt;T&gt;</code><span>,这个方法可称为<span><code>async-iterator</code><span>方法,需要注意以下几点:</span></span></span></span></span></span></span></span></span></span></span></p>
<ul class="list-paddingleft-2" data-mark="*">
<li>
<p><span>该方法应该被声明为<span><code>async</code><span>。</span></span></span></p>
</li>
<li>
<p><span>返回<span><code>IAsyncEnumerable&lt;T&gt;</code><span>。</span></span></span></p>
</li>
<li>
<p><span>同时使用<span><code>await</code><span>及<span><code>yield</code><span>。如<span><code>await foreach</code><span>,<span><code>yield return</code><span>或<span><code>yield break</code><span>等。</span></span></span></span></span></span></span></span></span></span></span></p>
</li>
</ul>
<p><span>例如:</span></p>
<div class="cnblogs_code">
<pre>async IAsyncEnumerable&lt;<span style="color: rgba(0, 0, 255, 1)">int</span>&gt;<span style="color: rgba(0, 0, 0, 1)"> GetValuesFromServer()
{
   </span><span style="color: rgba(0, 0, 255, 1)">while</span> (<span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">)
{
       IEnumerable</span>&lt;<span style="color: rgba(0, 0, 255, 1)">int</span>&gt; batch =<span style="color: rgba(0, 0, 0, 1)"> await GetNextBatch();
       </span><span style="color: rgba(0, 0, 255, 1)">if</span> (batch == <span style="color: rgba(0, 0, 255, 1)">null</span><span style="color: rgba(0, 0, 0, 1)">) yield break;

       foreach (</span><span style="color: rgba(0, 0, 255, 1)">int</span> item <span style="color: rgba(0, 0, 255, 1)">in</span><span style="color: rgba(0, 0, 0, 1)"> batch)
      {
         yield return item;
      }
}
}</span></pre>
</div>
<p>&nbsp;</p>
<p><span>此外还有一些限制:</span></p>
<ul class="list-paddingleft-2" data-mark="*">
<li>
<p><span>无法在<span><code>try</code><span>的<span><code>finally</code><span>块中使用任何形式的<span><code>yield</code><span>语句。</span></span></span></span></span></span></span></p>
</li>
<li>
<p><span>无法在包含任何<span><code>catch</code><span>语句的<span><code>try</code><span>语句中使用<span><code>yield return</code><span>语句。</span></span></span></span></span></span></span></p>
<p>&nbsp;</p>
</li>
</ul>
<p>期待.NET Core 3的正式发布!</p>
<p>&nbsp;</p>
<p><img style="display: block; margin-left: auto; margin-right: auto" src="https://mmbiz.qpic.cn/mmbiz_jpg/ahAATVIdckRjwCicQictHicK9Ebav2YbXIiblyZ45Q7n50Jv20xzXsj94fueJ8SQcL1ZQibEZLKs1p55kzQiaBFyMDBQ/640?wx_fmt=jpeg&amp;tp=webp&amp;wxfrom=5&amp;wx_lazy=1&amp;wx_co=1" alt="" width="287" height="196" data-copyright="0" data-ratio="0.6822074215033301" data-s="300,640" data-type="jpeg" data-w="1051" data-src="https://mmbiz.qpic.cn/mmbiz_jpg/ahAATVIdckRjwCicQictHicK9Ebav2YbXIiblyZ45Q7n50Jv20xzXsj94fueJ8SQcL1ZQibEZLKs1p55kzQiaBFyMDBQ/640?wx_fmt=jpeg" data-fail="0"></p>
<p style="text-align: center">了解新西兰IT行业真实码农生活</p>
<p style="text-align: center">请长按上方二维码关注“程序员在新西兰”</p><br><br>
来源:https://www.cnblogs.com/yanxiaodi/p/Support-IAsyncEnumerable-with-LINQ.html
頁: [1]
查看完整版本: C#8.0: 在 LINQ 中支持异步的 IAsyncEnumerable