卑微如尘土 發表於 2023-8-28 09:18:00

细聊C# AsyncLocal如何在异步间进行数据流转

<h3 id="前言">前言</h3>
<p>&nbsp;&nbsp;&nbsp;&nbsp;在异步编程中,处理异步操作之间的数据流转是一个比较常用的操作。<code>C#</code>异步编程提供了一个强大的工具来解决这个问题,那就是<code>AsyncLocal</code>。它是一个线程本地存储的机制,可以在异步操作之间传递数据。它为我们提供了一种简单而可靠的方式来共享数据,而不必担心线程切换或异步上下文的变化。本文我们将探究<code>AsyncLocal</code>的原理和用法,并进行相关源码解析。探讨它如何在异步操作之间实现数据的流转,以及它是如何在底层工作的。</p>
<h3 id="使用方式">使用方式</h3>
<p>上面我们提到了<code>AsyncLocal</code>可以在异步操作间传递数据,我们在之前的文章&lt;研究c#异步操作async await状态机的总结&gt;一文中提到过异步操作会涉及到线程切换的问题,接下来通过Task来模拟一个简单异步示例,来看一下它的工作方式是什么样的,以便加深对它的理解,先看一下示例</p>
<pre><code class="language-csharp">AsyncLocal&lt;Person&gt; context = new AsyncLocal&lt;Person&gt;();
context.Value = new Person { Id = 1, Name = "张三" };
Console.WriteLine($"Main之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
await Task.Run(() =&gt;
{
    Console.WriteLine($"Task1之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value.Name = "李四";
    Console.WriteLine($"Task1之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});

await Task.Run(() =&gt;
{
    Console.WriteLine($"Task2之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value.Name = "王五";
    Console.WriteLine($"Task2之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine($"Main之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
</code></pre>
<p>在上面的示例中,我们创建了一个<code>AsyncLocal</code>实例,并赋值了一个<code>Person</code>对象,然后我们创建了两个<code>Task</code>,分别执行了两个异步操作,并分别修改了<code>AsyncLocal</code>中的<code>Person</code>对象的值,分别在执行异步之前执行异步过程中和执行异步之后打印值来观察变化,执行程序输出结果如下</p>
<pre><code>Main之前:张三,ThreadId=1
Task1之前:张三,ThreadId=4
Task1之后:李四,ThreadId=4
Task2之前:李四,ThreadId=6
Task2之后:王五,ThreadId=6
Main之后:王五,ThreadId=6
</code></pre>
<p>从输出结果来看,虽然我们在异步中修改了<code>AsyncLocal</code>里<code>Person</code>对象的值,并且也发生了线程切换。但是它可以在异步操作之间的数据共享和传递,使得我们在异步间进行的数据就和在一个线程里操作数据一样,让我们可以忽略掉其实已经发生了多次线程切换。</p>
<h3 id="探究本质">探究本质</h3>
<p>通过上面的示例,我们发现<code>AsyncLocal</code>确实可以实现异步之间的数据共享和传递,那么它是如何实现的呢?接下来,我们通过先查看<code>AsyncLocal</code>涉及到的相关源码来探究一下。想弄明白它的流转问题,需要研究两个大方向,一个是<code>AsyncLocal</code>的本身实现,一个是<code>AsyncLocal</code>的流转涉及到的异步或者多线程相关这里涉及到的主要是<code>Task</code>和<code>线程池</code>里的相关实现。由于异步相关涉及到了一整个体系,所以但看某一点的时候可能不太容易理解,我们先从<code>AsyncLocal</code>本身入手,然后从<code>Task</code>入手,最后从<code>线程池</code>入手,逐步探究<code>AsyncLocal</code>如何进行数据流转的。但是仍然希望能在阅读本文之前先了解一下设计到该话题的相关文章,先对整体有一个整体的把握</p>
<ul>
<li>AsyncLocal细节的文章可以看一下黑洞大佬的&lt;浅析 .NET 中 AsyncLocal 的实现原理&gt;</li>
<li>异步状态机整体介绍可以看一下我之前的文章&lt;研究c#异步操作async await状态机的总结&gt;</li>
</ul>
<h4 id="asynclocal">AsyncLocal</h4>
<p>虽然强烈建议先看一下上面推荐的文章,但是在这里我们还是简单介绍一下<code>AsyncLocal</code>的实现,所以这里我们简单介绍一下,方便大家能直观的看到。其实涉及到的比较简单,就是看一下<code>AsyncLocal</code>里涉及到关于Value的操作即可[点击查看AsyncLocal.Value源码👈]</p>
<pre><code class="language-csharp">public sealed class AsyncLocal&lt;T&gt; : IAsyncLocal
{
   
    public T Value
    {
      get
      {
            object? value = ExecutionContext.GetLocalValue(this);
            if (typeof(T).IsValueType &amp;&amp; value is null)
            {
                return default;
            }

            return (T)value!;
      }
      set
      {
            ExecutionContext.SetLocalValue(this, value, _valueChangedHandler is not null);
      }
    }
}
</code></pre>
<p>通过上面的源码可以看到<code>AsyncLocal</code>的Value属性的能力来自于<code>ExecutionContext</code>,也可理解为<code>AsyncLocal</code>是对<code>ExecutionContext</code>能力的包装。</p>
<blockquote>
<p>在C#中,ExecutionContext是用于多线程和异步编程的类,用于保存和还原线程的执行状态。它的主要功能是确保在线程切换时,状态得以保留和恢复,以便线程能够在正确的上下文中继续执行。这有助于管理线程的数据、状态以及异步任务的正确执行。</p>
</blockquote>
<p>所以我们可以继续简单的看一下<code>ExecutionContext</code>中关于<code>GetLocalValue</code>方法和<code>SetLocalValue</code>方法的大致实现,这里我们不在进行全部代码展示,只展示核心实现[点击查看ExecutionContext.LocalValue源码👈]</p>
<pre><code class="language-csharp">public sealed class ExecutionContext : IDisposable, ISerializable
{
    private readonly IAsyncLocalValueMap? m_localValues;

    private ExecutionContext(
      IAsyncLocalValueMap localValues,)
    {
      m_localValues = localValues;
    }

    //获取值的方法
    internal static object? GetLocalValue(IAsyncLocal local)
    {
      //捕获当前线程的执行上下文
      ExecutionContext? current = Thread.CurrentThread._executionContext;
      if (current == null)
      {
            return null;
      }
      //在执行上下文中获取值
      current.m_localValues.TryGetValue(local, out object? value);
      return value;
    }

    //设置值的方法
    internal static void SetLocalValue(IAsyncLocal local, object? newValue, bool needChangeNotifications)
    {
      ExecutionContext? current = Thread.CurrentThread._executionContext;
      //判断设置的心值和旧值是否相同
      object? previousValue = null;
      bool hadPreviousValue = false;
      if (current != null)
      {
            hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue);
      }
      //相同的话不在进行设置直接返回
      if (previousValue == newValue)
      {
            return;
      }

      if (current != null)
      {
            //设置新值
            newValues = current.m_localValues.Set(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
      }
      else
      {
            //如果没有使用过先初始化在存储
            newValues = AsyncLocalValueMap.Create(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
      }
      //给当前线程执行上下文赋值新值
      Thread.CurrentThread._executionContext = (!isFlowSuppressed &amp;&amp; AsyncLocalValueMap.IsEmpty(newValues)) ?
            null : new ExecutionContext(newValues, newChangeNotifications, isFlowSuppressed);

    }
}
</code></pre>
<p>通过上面的代码我们可以知道GetLocalValue函数用于从当前线程的执行上下文中获取异步本地对象的值。通过检索执行状态并查找本地值字典,该函数能够获取正确的值,实现了上下文数据的提取。SetLocalValue函数用于设置异步本地对象的值。它通过比较新旧值、操作本地值字典,并根据情况创建新的执行上下文,确保了数据正确地传递和存储。而异步操作过程中无非也正是不同线程上下文之间切换的问题。</p>
<blockquote>
<p>有关<code>ExecutionContext</code>更详细的源码可以仔细阅读一下,上面开头提到的黑洞大佬文章地址。</p>
</blockquote>
<h4 id="在异步中流转">在异步中流转</h4>
<p>上面我们展示了<code>AsyncLocal</code>相关的代码实现,知道了<code>AsyncLocal</code>本质是对<code>ExecutionContext</code>能力的封装。每个线程<code>Thread</code>对象都包含了<code>_executionContext</code>类存储<code>ExecutionContext</code>执行上下问信息。接下来我们就来研究一下<code>AsyncLocal</code>中的数据是如何在异步过程中流转的。首先我们来大致回顾一下异步编译之后形成状态机的执行过程。</p>
<pre><code>IAsyncStateMachine状态机实例
-&gt;AsyncTaskMethodBuilder属性类型AwaitUnsafeOnCompleted方法-&gt;
    -&gt;AsyncTaskMethodBuilder&lt;VoidTaskResult&gt;.AwaitUnsafeOnCompleted方法
      判断是否是以下类型
      ITaskAwaiter
      IConfiguredTaskAwaiter
      IStateMachineBoxAwareAwaiter
            -&gt;Task是类型ITaskAwaiter类型所以调用UnsafeOnCompletedInternal方法
                -&gt;Task.UnsafeSetContinuationForAwait
                  -&gt;判断交由哪种执行策略执行比如TaskScheduler或ThreadPool
</code></pre>
<p>到了<code>Task.UnsafeSetContinuationForAwait</code>方法这一步会涉及到异步代码如何被调度的问题也就是会被自定义<code>调度策略</code>调度还是被<code>线程池</code>调度等等。我们来看一下这个方法的实现,这个方法的实现代码,在上面的&lt;研究c#异步操作async await状态机的总结&gt;一文中也有介绍,咱们简单看一下这里面的代码[点击查看Task.UnsafeSetContinuationForAwait源码👈]</p>
<pre><code class="language-csharp">internal void UnsafeSetContinuationForAwait(IAsyncStateMachineBox stateMachineBox, bool continueOnCapturedContext)
{
    //是否捕获同步上下文
    if (continueOnCapturedContext)
    {
      //在异步执行完成后通过同步上下文执行后续结果
      SynchronizationContext? syncCtx = SynchronizationContext.Current;
      if (syncCtx != null &amp;&amp; syncCtx.GetType() != typeof(SynchronizationContext))
      {
            var tc = new SynchronizationContextAwaitTaskContinuation(syncCtx, stateMachineBox.MoveNextAction, flowExecutionContext: false);
            if (!AddTaskContinuation(tc, addBeforeOthers: false))
            {
                tc.Run(this, canInlineContinuationTask: false);
            }
            return;
      }
      else
      {
            //选择执行默认的TaskScheduler还是自定义的Scheduler
            TaskScheduler? scheduler = TaskScheduler.InternalCurrent;
            if (scheduler != null &amp;&amp; scheduler != TaskScheduler.Default)
            {
                var tc = new TaskSchedulerAwaitTaskContinuation(scheduler, stateMachineBox.MoveNextAction, flowExecutionContext: false);
                if (!AddTaskContinuation(tc, addBeforeOthers: false))
                {
                  tc.Run(this, canInlineContinuationTask: false);
                }
                return;
            }
      }
    }

    if (!AddTaskContinuation(stateMachineBox, addBeforeOthers: false))
    {
      //兜底的线程池策略
      ThreadPool.UnsafeQueueUserWorkItemInternal(stateMachineBox, preferLocal: true);
    }
}
</code></pre>
<blockquote>
<p>在许多情况下,特定的代码需要在特定的线程上执行,例如UI操作需要在UI线程上执行,以避免UI冲突和渲染问题。SynchronizationContext就是为了解决这样的问题而引入的。它允许您捕获和存储特定线程的上下文,并在需要时将任务切换到正确的线程。</p>
</blockquote>
<p>上面的这段源码是<code>Task</code>执行操作的核心策略,咱们简单的分析一下这段代码涉及到的几个核心的逻辑</p>
<ul>
<li>首先是<code>continueOnCapturedContext</code>判断,我们使用<code>task.ConfigureAwait(false)</code>这里设置的<code>true或false</code>设置的就是<code>continueOnCapturedContext</code>的值,如果为<code>true</code>则表示当前<code>Task</code>的执行需要切换到当前<code>SynchronizationContext</code>的线程,如果用一段代码描述默认情况下一步执行的原理可以大致理解为下面的代码。<pre><code class="language-csharp">SynchronizationContext sc = SynchronizationContext.Current;
ThreadPool.QueueUserWorkItem(_ =&gt;
{
    try
    {
      DoWorker();
    }
    finally
    {
      sc.Post(_ =&gt; callback(), null);
    }
});
</code></pre>
</li>
<li>其次是<code>scheduler != TaskScheduler.Default</code>判断,如果自定义了<code>TaskScheduler</code>则使用自定义的<code>TaskScheduler</code>执行,否则使用<code>ThreadPool</code>的线程池执行。比如经典问题<code>Task.Factory.StartNew()</code>方法中<code>await</code>前后如果不想切换线程可以只用自定义<code>TaskScheduler</code>的方式只用一个<code>Thread</code>执行所有任务,示例代码如下所示。<pre><code class="language-csharp">await Task.Factory.StartNew(async () =&gt;
{
    while (true)
    {
      Console.WriteLine($"Task之前Current Thread:{Thread.CurrentThread.ManagedThreadId}");
      await Task.Delay(2000);
      Console.WriteLine($"Task之后Current Thread:{Thread.CurrentThread.ManagedThreadId}");
    }
}, CancellationToken.None, TaskCreationOptions.None, new SingleThreadScheduler());

public class SingleThreadScheduler : TaskScheduler
{
    private readonly BlockingCollection&lt;Task&gt; _tasks = new BlockingCollection&lt;Task&gt;();

    public SingleThreadScheduler()
    {
      var thread = new Thread(() =&gt;
      {
            foreach (var task in _tasks.GetConsumingEnumerable())
            {
                if (!TryExecuteTask(task))
                {
                  _tasks.Add(task);
                }
            }
      })
      {
            IsBackground = true
      };
      thread.Start();
    }

    protected override IEnumerable&lt;Task&gt;? GetScheduledTasks()
    {
      return _tasks;
    }

    protected override void QueueTask(Task task)
    {
      _tasks.Add(task);
    }

    protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued)
    {
      return false;
    }
}
</code></pre>
</li>
<li>最后兜底的策略就是使用<code>ThreadPool</code>线程池去执行异步任务。</li>
</ul>
<p>好了接下来我们把探索的重心就在线程池的里,我们知道自从有了<code>Task</code>之后<code>ThreadPool</code>就是和<code>Task</code>关联起来的,关联的核心逻辑就是在<code>ThreadPoolWorkQueue的DispatchWorkItem</code>方法中[点击查看ThreadPoolWorkQueue.DispatchWorkItem源码👈]</p>
<pre><code class="language-csharp">private static void DispatchWorkItem(object workItem, Thread currentThread)
{
    //判断如果线程池执行的任务是Task任务则执行Task里的ExecuteFromThreadPool方法
    if (workItem is Task task)
    {
      //传递当前的线程池里的线程
      task.ExecuteFromThreadPool(currentThread);
    }
    else
    {
      Debug.Assert(workItem is IThreadPoolWorkItem);
      Unsafe.As&lt;IThreadPoolWorkItem&gt;(workItem).Execute();
    }
}
</code></pre>
<p>通过上面的源码我们可以看到如果线程池执行的任务是Task任务则执行<code>Task</code>里的<code>ExecuteFromThreadPool</code>方法里,从这里我们也可以看到<code>Task</code>和<code>ThreadPool</code>的关联性。需要注意的是这里虽然关联的<code>Task</code>类型但是并非是<code>Task</code>类的实例本身,而是实现了<code>Task</code>类的状态机类型<code>AsyncStateMachineBox&lt;TStateMachine&gt;</code>,通过跟踪生成的状态机代码我们可以看到,实际添加到线程池的是<code>IAsyncStateMachineBox</code>实例,而<code>AsyncStateMachineBox&lt;TStateMachine&gt;</code>即继承了<code>Task</code>也实现了<code>IAsyncStateMachineBox</code>接口,由于逻辑较多只粘贴咱们关注的部分[点击查看AsyncTaskMethodBuilderT.GetStateMachineBox源码👈]</p>
<pre><code class="language-csharp">private static IAsyncStateMachineBox GetStateMachineBox&lt;TStateMachine&gt;(
            ref TStateMachine stateMachine,
             ref Task&lt;TResult&gt;? taskField)
            where TStateMachine : IAsyncStateMachine
{
    //捕获当前线程上下文
    ExecutionContext? currentContext = ExecutionContext.Capture();

    //创建AsyncStateMachineBox实例
    AsyncStateMachineBox&lt;TStateMachine&gt; box = AsyncMethodBuilderCore.TrackAsyncMethodCompletion ?
    CreateDebugFinalizableAsyncStateMachineBox&lt;TStateMachine&gt;() : new AsyncStateMachineBox&lt;TStateMachine&gt;();
    taskField = box;
    box.StateMachine = stateMachine;
    //传递当前捕获的ExecutionContext执行上下文
    box.Context = currentContext;

    return box;
}
</code></pre>
<p>在上面的方法中我们看到在初始化<code>AsyncStateMachineBox&lt;TStateMachine&gt;</code>实例之前先使用<code>ExecutionContext.Capture()</code>方法捕获执行上下文传递进来,这个时候还不存在被线程池执行一说,所以捕获的肯定是初始化<code>Task</code>的线程,注意这个时候还没有执行Task里的任何逻辑。所以我们关注一下<code>ExecutionContext.Capture()</code>方法的实现[点击查看EExecutionContext.Capture源码👈]</p>
<pre><code class="language-csharp">public static ExecutionContext? Capture()
{
    //捕获当前线程的执行上下文
    ExecutionContext? executionContext = Thread.CurrentThread._executionContext;
    if (executionContext == null)
    {
      executionContext = Default;
    }
    //如果设置ExecutionContext.RestoreFlow()则不进行捕获
    else if (executionContext.m_isFlowSuppressed)
    {
      executionContext = null;
    }

    return executionContext;
}
</code></pre>
<p>通过上面的代码我们看到了<code>ExecutionContext.Capture()</code>就是捕获当前线程的执行上下文,如果设置了<code>ExecutionContext.RestoreFlow()</code>上面逻辑里的<code>m_isFlowSuppressed</code>值则为<code>true</code>这个时候则不进行上下文捕获。好了我们继续往下看,上面的<code>GetStateMachineBox</code>方法返回的正是<code>AsyncStateMachineBox&lt;TStateMachine&gt;</code>类实例,它是线程池线程中真正执行的<code>Task</code>实例,我们看一下的定义[点击查看AsyncStateMachineBox源码👈]</p>
<pre><code class="language-csharp">private class AsyncStateMachineBox&lt;TStateMachine&gt; :
            Task&lt;TResult&gt;, IAsyncStateMachineBox
            where TStateMachine : IAsyncStateMachine
{
}
</code></pre>
<p>这里我们可以看到<code>AsyncStateMachineBox&lt;TStateMachine&gt;</code>类是继承自<code>Task</code>类也实现了<code>IAsyncStateMachine </code>,所以上面的<code>ThreadPoolWorkQueue.DispatchWorkItem</code>方法中调用的<code>ExecuteFromThreadPool</code>方法,本质是调用的<code>AsyncStateMachineBox&lt;TStateMachine&gt;.ExecuteFromThreadPool</code>方法,我们看一下它的实现方式[点击查看AsyncStateMachineBox.ExecuteFromThreadPool源码👈]</p>
<pre><code class="language-csharp">internal sealed override void ExecuteFromThreadPool(Thread threadPoolThread) =&gt; MoveNext(threadPoolThread);
public void MoveNext() =&gt; MoveNext(threadPoolThread: null);

private void MoveNext(Thread? threadPoolThread)
{
    //获取之前捕获的ExecutionContext执行上下文
    ExecutionContext? context = Context;
    if (context == null)
    {
      Debug.Assert(StateMachine != null);
      StateMachine.MoveNext();
    }
    else
    {
      //判断是否是线程池代码
      if (threadPoolThread is null)
      {
            ExecutionContext.RunInternal(context, s_callback, this);
      }
      else
      {
            //默认是线程池线程,会走到这里的逻辑
            ExecutionContext.RunFromThreadPoolDispatchLoop(threadPoolThread, context, s_callback, this);
      }
    }
}
</code></pre>
<p>源码中的<code>s_callback</code>本质是调用状态机生成的<code>MoveNext</code>方法,也就是在线程池线程里需要被执行的逻辑,我们看一下它的定义</p>
<pre><code class="language-csharp">private static readonly ContextCallback s_callback = ExecutionContextCallback;
private static void ExecutionContextCallback(object? s)
{
    //本质调用的状态机生成的MoveNext方法
    Unsafe.As&lt;AsyncStateMachineBox&lt;TStateMachine&gt;&gt;(s).StateMachine!.MoveNext();
}
</code></pre>
<blockquote>
<p>上面的这段代码可以清楚的看到线程池线程里执行的逻辑是<code>async await</code>生成的状态机里的代码,完成了多线程执行状态机逻辑的关联。</p>
</blockquote>
<p>咱们再继续看<code>AsyncStateMachineBox.MoveNext</code>方法里的执行逻辑。由于咱们是默认机制所以这段逻辑肯定是在线程池里的线程执行,所以会执行到<code>ExecutionContext.RunFromThreadPoolDispatchLoop()</code>方法里,我们看一下它的逻辑[点击查看ExecutionContext.RunFromThreadPoolDispatchLoop源码👈]</p>
<pre><code class="language-csharp">internal static void RunFromThreadPoolDispatchLoop(Thread threadPoolThread, ExecutionContext executionContext, ContextCallback callback, object state)
{
    //threadPoolThread是线程池线程,executionContext是Task.CapturedContext捕获的执行上下文
    if (executionContext != null &amp;&amp; !executionContext.m_isDefault)
    {
      //如果线程存在ExecutionContext则把捕获到的执行上下文赋值给当前线程池线程的执行上下文ExecutionContext
      RestoreChangedContextToThread(threadPoolThread, contextToRestore: executionContext, currentContext: null);
    }

    ExceptionDispatchInfo? edi = null;
    try
    {
      //执行Task里的逻辑
      callback.Invoke(state);
    }
    catch (Exception ex)
    {
      edi = ExceptionDispatchInfo.Capture(ex);
    }

    //捕获当前线程池线程
    Thread currentThread = threadPoolThread;
    //获取当前线程池里的执行上下文
    ExecutionContext? currentExecutionCtx = currentThread._executionContext;
    currentThread._synchronizationContext = null;
    if (currentExecutionCtx != null)
    {
      //将当前线程池里的执行上下文清空,方便下次在线程池里获取到当前线程处于初始化状态
      RestoreChangedContextToThread(currentThread, contextToRestore: null, currentExecutionCtx);
    }
    edi?.Throw();
}

internal static void RestoreChangedContextToThread(Thread currentThread, ExecutionContext? contextToRestore, ExecutionContext? currentContext)
{
    //把捕获到的执行上下文赋值给当前线程池线程的执行上下文ExecutionContext
    currentThread._executionContext = contextToRestore;
    if ((currentContext != null &amp;&amp; currentContext.HasChangeNotifications) ||
      (contextToRestore != null &amp;&amp; contextToRestore.HasChangeNotifications))
    {
      OnValuesChanged(currentContext, contextToRestore);
    }
}
</code></pre>
<p>从上面的<code>ExecutionContext.ExecuteFromThreadPool</code>里的逻辑我们可以清楚的看到我们想要的结果,由于上面提供了大片的源码,看起来容易混乱,老规矩我们在这里总结一下核心逻辑的执行流程</p>
<ul>
<li>在线程池线程执行当前Task里的任务之前即<code>AsyncStateMachineBox</code>实例,因为它就是<code>Task子类</code>。先通过<code>ExecutionContext.Capture()</code>捕获当前线程的<code>ExecutionContext</code>执行上下文,方便给接下来线程池里的线程使用。</li>
<li>把上一步里捕获到的<code>ExecutionContext</code>执行上下文,填充到在<code>ThreadPool</code>里得到的线程的执行上下文<code>_executionContext</code>里,这样就完成了不同线程之间的执行上下文流转。</li>
<li>执行完当前<code>Task</code>之后,把当前线程池中捕获的线程执行上下文给还原掉,也就是上面的<code>RestoreChangedContextToThread(currentThread, contextToRestore: null, currentExecutionCtx)</code>使用<code>null</code>赋值。</li>
</ul>
<p>通过上面的总结相信大家对执行上下文数据流转有个很好的理解了。先捕获当前线程执行上下文,然后把捕获的执行上下文填充到要执行任务的线程池的线程里,这样就完成了不同线程中执行上下文的流转,执行完Task任务之后把线程池里线程的执行上下文还原掉方便下次执行的时候是初始化状态。</p>
<h3 id="一个常见的坑">一个常见的坑</h3>
<p>通过上面的源码解析我们清楚的了解到了<code>AsyncLocal</code>在异步中是如何传递的,其实本质也就是在不同的线程里传递。那么接下来我们看一个大家在使用的过程中容易出错的地方,还是刚开始的例子,我们改造一下示例代码,如下所示</p>
<pre><code class="language-csharp">AsyncLocal&lt;Person&gt; context = new AsyncLocal&lt;Person&gt;();
context.Value = new Person { Id = 1, Name = "张三" };
Console.WriteLine($"Main之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
await Task.Run(() =&gt;
{
    Console.WriteLine($"Task1之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value = new Person { Id = 2, Name = "李四" };
    Console.WriteLine($"Task1之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});

await Task.Run(() =&gt;
{
    Console.WriteLine($"Task2之前:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
    context.Value = new Person { Id = 3, Name = "王五" };;
    Console.WriteLine($"Task2之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
});
Console.WriteLine($"Main之后:{context.Value.Name},ThreadId={Thread.CurrentThread.ManagedThreadId}");
</code></pre>
<p>这段代码的执行结果大家猜到了吗?不卖关子了,上面的示例代码执行结果如下所示</p>
<pre><code>Main之前:张三,ThreadId=1
Task1之前:张三,ThreadId=6
Task1之后:李四,ThreadId=6
Task2之前:张三,ThreadId=8
Task2之后:王五,ThreadId=8
Main之后:张三,ThreadId=8
</code></pre>
<p>这里我们可以看到,虽然我们在不同的Task里改变了<code>AsyncLocal里的Value值</code>比如改成了李四王五这种,但是执行完Task之后仿佛值又被还原成最初初始化时候的样子也就是上面说的张三,为什么会这个样子呢?我们来分析一下</p>
<ul>
<li>1.初始化线程我们叫<code>线程A</code>,<code>线程A.ExecutionContext</code>存储的是<code>Person { Id = 1, Name = "张三" }</code>内存区域的引用。</li>
<li>2.第一个Task中执行逻辑之前捕获了<code>线程A.ExecutionContext</code>赋值给再线程池中线程<code>线程B</code>,现在<code>线程A.ExecutionContext</code>和<code>线程B.ExecutionContext</code>都指向内存区域<code>Person { Id = 1, Name = "张三" }</code>,因为数据是直接流转过来的,上面的逻辑里我们提到过。</li>
<li>3.在接下来的Task里我们得到线程池线程<code>线程B</code>在这里我们实例化了一个新的<code>Person { Id = 2, Name = "李四" }</code>实例,此时<code>线程B.ExecutionContext</code>的引用让指向<code>Person { Id = 2, Name = "李四" }</code>内存区域,<code>线程A.ExecutionContext</code>指向的依然的是<code>Person { Id = 1, Name = "张三" }</code>内存区域。</li>
<li>4.<code>线程B</code>执行完成之后要还原掉执行上下文赋值<code>null</code>,这个时候<code>线程B.ExecutionContext</code>的引用让指向<code>null</code>,<code>线程A.ExecutionContext</code>指向的依然的是<code>Person { Id = 1, Name = "张三" }</code>内存区域。</li>
<li>5.进入另一个Task之后我们得到线程池线程<code>线程C</code>,接下来<code>线程C</code>重复执行上面的<code>2、3、4</code>步骤。</li>
</ul>
<p>画个图简单的演示一下,首先是初始化的时候这个时候<code>线程A.ExecutionContext</code>和<code>线程B.ExecutionContext</code>都指向内存区域<code>Person { Id = 1, Name = "张三" }</code>如下所示<img src="https://img2023.cnblogs.com/blog/2042116/202308/2042116-20230824164503792-253035228.png" alt="" loading="lazy"></p>
<p>在<code>线程B</code>里重新实例化了一个新的<code>Person</code>实例,此时的引用指向发生了变化,如下所示<img src="https://img2023.cnblogs.com/blog/2042116/202308/2042116-20230824164731290-1344614928.png" alt="" loading="lazy"></p>
<p>这个时候<code>线程A.ExecutionContext</code>和<code>线程B.ExecutionContext</code>已经没啥关系了,所以你无论怎么操作<code>线程B.ExecutionContext</code>也和<code>线程A.ExecutionContext</code>没有任何关系了。</p>
<h3 id="总结">总结</h3>
<p>&nbsp;&nbsp;&nbsp;&nbsp;通过本文我们探究了<code>AsyncLocal</code>中的数据如何在异步之间如何流转数据的,本质还是在多个线程之间流转数据。接下来我们大致的总结一下本文的核心内容</p>
<ul>
<li>首先我们探究了<code>AsyncLocal</code>知道了它是对<code>ExecutionContext</code>执行上下文能力的包装,每个线程都会包含一个执行上下文,即<code>Thread._executionContext</code>属性。</li>
<li>当使用异步或者线程池线程执行Task里的任务之前,即<code>AsyncStateMachineBox</code>实例,因为它就是<code>Task子类</code>。先通过<code>ExecutionContext.Capture()</code>捕获当前线程的<code>ExecutionContext</code>执行上下文,方便给接下来线程池里的线程使用。</li>
<li>然后把上一步里捕获到的<code>ExecutionContext</code>执行上下文,填充到在<code>ThreadPool</code>里得到的线程的执行上下文<code>_executionContext</code>里,这样就完成了不同线程之间的执行上下文流转。</li>
<li>执行完当前<code>Task</code>之后,把当前线程池中捕获的线程执行上下文给还原掉,也就是上面的<code>RestoreChangedContextToThread(currentThread, contextToRestore: null, currentExecutionCtx)</code>使用<code>null</code>赋值。</li>
</ul>
<p>也就是先捕获当前线程执行上下文,然后把捕获的执行上下文填充到要执行任务的线程池的线程里,这样就完成了不同线程中执行上下文的流转,执行完Task任务之后把线程池里线程的执行上下文还原掉方便下次执行的时候是初始化状态。</p>
<p>&nbsp;&nbsp;&nbsp;&nbsp;分享一下近期通过雷总的演讲得到的一点感悟,周边知识不足,认知体系不全,有时单看某个点确实很难理解。跳出来看全局,补齐体系,可以不深,但一定得有整体认识,回头再看由浅入深,再由点带面,有时候更容易贯通。<br>
<br></p>
<div align="center">
<span style="font-size: 15px">👇欢迎扫码关注我的公众号👇</span>
<img src="https://img2020.cnblogs.com/blog/2042116/202006/2042116-20200622133425514-1420050576.png">
</div><br><br>
来源:https://www.cnblogs.com/wucy/p/17654645.html
頁: [1]
查看完整版本: 细聊C# AsyncLocal如何在异步间进行数据流转