刀古 發表於 2022-11-15 09:09:00

C#多线程(三)线程高级篇

<h3 id="前言">前言</h3>
<p>抛开死锁不谈,只聊性能问题,尽管锁总能粗暴的满足同步需求,但一旦存在竞争关系,意味着一定会有线程被阻塞,竞争越激烈,被阻塞的线程越多,上下文切换次数越多,调度成本越大,显然在高并发的场景下会损害性能。在高并发高性能且要求线程安全的述求下,无锁构造(非阻塞构造)闪亮登场。</p>
<p>如果你对同步&amp;阻塞不是很清晰,请跳转到上一篇:同步基础传送门</p>
<p>参考文档:</p>
<p>C# - 理论与实践中的 C# 内存模型</p>
<p>volatile 关键字</p>
<h2 id="一非阻塞同步">一、非阻塞同步</h2>
<h3 id="重排序与缓存">重排序与缓存</h3>
<p>我们观察下面这个例子:</p>
<pre><code class="language-c#">public class Foo
{
    private int _answer;
    private bool _complete;

    void A() //A 1
    {
      _answer = 10;
      _complete = true;
    }

    void B() //B 2
    {
      if (_complete) Console.WriteLine(_answer);
    }
}
</code></pre>
<p>如果方法<code>A</code>和<code>B</code>在不同的线程上并发运行,<code>B</code>可能会打印 “ 0 “ 吗?答案是会的,原因如下:</p>
<ul>
<li>编译器、CLR 或 CPU 可能会对代码/指令进行<strong>重排序(reorder)</strong>以提高效率。</li>
<li>编译器、CLR 或 CPU 可能会进行<strong>缓存</strong>优化,导致其它线程不能马上看到变量的新值。</li>
</ul>
<p><strong>请务必重视它们,它们将是幽灵般的存在</strong></p>
<pre><code class="language-c#">int x = 0, y = 0, a = 0, b = 0;

var task1 = Task.Run(() =&gt; // A 1
{
    a = 1; // 1
    x = b; // 2
});
var task2 = Task.Run(() =&gt; // B 2
{
    b = 2; // 3
    y = a; // 4
});
Task.WaitAll(task1, task2);
Console.WriteLine("x:" + x + " y:" + y);
</code></pre>
<p>直觉和经验告诉我们,程序至顶向下执行:代码1一定发生在代码2之前,代码3一定发生在代码4之前,然鹅</p>
<p>在一个独立的线程中,每一个语句的执行顺序是可以被保证的,但在不使用lock,waithandle这样的显式同步操作时,我们就没法保证事件在不同的线程中看到的执行顺序是一致的了。尽管线程A中一定需要观察到a=1执行成功之后才会去执行x=b,但它没法确保自己观察得到线程B中对b的写入,所以A还可能会打印出y的一个旧版的值。这就叫指令重排序。</p>
<pre><code class="language-shell">x:0 y:1 #1-2-3-4
x:2 y:0 #3-4-1-2
x:2 y:1 #1-3-2-4
</code></pre>
<p>可实际运行时还是有些让我们惊讶的情况:</p>
<pre><code class="language-shell">x:0 y:0 #??
</code></pre>
<p>这就是缓存问题,如果两个线程在不同的CPU上执行,每一个核心有自己的缓存,这样一个线程的写入对于其它线程,在主存同步之前就是不可见的了。</p>
<blockquote>
<p>C#编译器和CLR运行时会非常小心的保证上述优化不会破坏普通的单线程代码,和正确使用锁的多线程代码。但有时,你仍然需要通过显示的创建<strong>内存屏障(memory barrier,也称作内存栅栏 (memory fence))</strong>来对抗这些优化,限制指令重排序和读写缓存产生的影响。</p>
</blockquote>
<h3 id="内存屏障">内存屏障</h3>
<p>参考博客小林野夫</p>
<p>处理器支持哪种内存重排序(LoadLoad重排序、LoadStore重排序、StoreStore重排序、StoreLoad重排序),就会提供相对应能够禁止重排序的指令,而这些指令就被称之为<strong>内存屏障</strong>(LoadLoad屏障、LoadStore屏障、StoreStore屏障、StoreLoad屏障)</p>
<table>
<thead>
<tr>
<th style="text-align: center">屏障名称</th>
<th>示例</th>
<th>具体作用</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: center">StoreLoad</td>
<td>Store1;Store2;Store3;<strong>StoreLoad</strong>;Load1;Load2;Load3</td>
<td>禁止StoreLoad重排序,确保屏障之前任何一个写(如Store2)的结果都会在屏障后任意一个读操作(如Load1)加载之前被写入</td>
</tr>
<tr>
<td style="text-align: center">StoreStore</td>
<td>Store1;Store2;Store3;<strong>StoreStore</strong>;Store4;Store5;Store6</td>
<td>禁止StoreStore重排序,确保屏障之前任何一个写(如Store1)的结果都会在屏障后任意一个写操作(如Store4)之前被写入</td>
</tr>
<tr>
<td style="text-align: center">LoadLoad</td>
<td>Load1;Load2;Load3;<strong>LoadLoad</strong>;Load4;Load5;Load6</td>
<td>禁止LoadLoad重排序,确保屏障之前任何一个读(如Load1)的数据都会在屏障后任意一个读操作(如Load4)之前被加载</td>
</tr>
<tr>
<td style="text-align: center">LoadStore</td>
<td>Load1;Load2;Load3;<strong>LoadStore</strong>;Store1;Store2;Store3</td>
<td>禁止LoadStore重排序,确保屏障之前任何一个读(如Load1)的数据都会在屏障后任意一个写操作(如Store1)的结果被写入高速缓存(或主内存)前被加载</td>
</tr>
</tbody>
</table>
<p>读屏障告诉处理器在执行任何的加载前,执行所有已经在失效队列(Invalidte Queues)中的失效(I)指令。即:所有load barrier之前的store指令对之后(本核心和其他核心)的指令都是可见的。</p>
<p>Store Memory Barrier:写屏障,等同于前文的StoreStore Barriers 将store buffer都写入缓存。</p>
<p>写屏障告诉处理器在执行这之后的指令之前,执行所有已经在存储缓存(store buffer)中的修改(M)指令。即:所有store barrier之前的修改(M)指令都是对之后的指令可见。</p>
<p>最简单的内存屏障是<strong>完全内存屏障(full memory barrier,或全栅栏(full fence))</strong>,它可以阻止所有跨越栅栏的指令进行重排并<code>提交修改和刷新缓存</code>。内存屏障之前的所有写操作都要写入内存,并将内存中的新值刷到缓存,使得其它CPU核心能够读取到最新值,完全保证了数据的强一致性,进而解决CPU缓存带来的可见性问题。</p>
<p>我们简单修改一下前面的案例</p>
<pre><code class="language-c#">void A()
{
    _answer = 10;
    Thread.MemoryBarrier(); // 1
    _complete = true;
    Thread.MemoryBarrier(); // 3
}
void B()
{
    Thread.MemoryBarrier(); // 2
    if (_complete)
    {
      _testOutputHelper.WriteLine(_answer.ToString());
    }
}
</code></pre>
<p>屏障1,3使得这个例子不可能打印出0,屏障2保证如果B在A之后执行,_complete一定读到的是true</p>
<p><strong>内存屏障离我们并不遥远</strong>,以下方式都会隐式的使用全栅栏:</p>
<ul>
<li>
<p>lock语法糖或<code>Monitor.Enter</code> / <code>Monitor.Exit</code></p>
</li>
<li>
<p><code>Interlocked</code>类中的所有方法</p>
</li>
<li>
<p>使用线程池的异步回调,包括异步委托,APM回调,以及任务延续(task continuations)</p>
</li>
<li>
<p>信号构造的等待/复位</p>
</li>
<li>
<p>任何依赖信号同步的情况,比如启动或等待Task,因此下面的代码也是线程安全的</p>
<pre><code class="language-c#">int x = 0;
Task t = Task.Factory.StartNew (() =&gt; x++);
t.Wait();
Console.WriteLine (x);    // 1
</code></pre>
</li>
</ul>
<h3 id="volatile">volatile</h3>
<p>另一个(更高级的)解决这个问题的方法是对<code>_complete</code>字段使用<code>volatile</code>关键字。</p>
<pre><code class="language-c#">volatile bool _complete;
</code></pre>
<p><code>volatile</code>关键字通知编译器在每个读这个字段的地方使用一个读栅栏(acquire-fence),并且在每个写这个字段的地方使用一个写栅栏(release-fence)。</p>
<p>这种“半栅栏(half-fences)”比全栅栏更快,因为它给了运行时和硬件更大的优化空间。</p>
<p>读栅栏:也就是读屏障(Store Memory Barrier),等同于前文的LoadLoad Barriers 将Invalidate的 都执行完成。告诉处理器在执行任何的加载前,执行所有已经在失效队列(Invalidte Queues)中的失效(I)指令。即:所有load barrier之前的store指令对之后(本核心和其他核心)的指令都是可见的。</p>
<p>写栅栏:也就是写屏障(Store Memory Barrier),等同于前文的StoreStore Barriers 将store buffer都写入主存。<br>
告诉处理器在执行这之后的指令之前,执行所有已经在存储缓存(store buffer)中的修改(M)指令。即:所有store barrier之前的修改(M)指令都是对之后的指令可见。</p>
<blockquote>
<p>巧的是,Intel 的 X86 和 X64 处理器总是在读时使用读栅栏,写时使用写栅栏,无论是否使用<code>volatile</code>关键字。所以在使用这些处理器的情况下,这个关键字对硬件来说是无效的。然而,<code>volatile</code>关键字对编译器和 CLR 进行的优化是有作用的,以及在 64 位 AMD 和 Itanium 处理器上也是有作用的。这意味着不能因为你的客户端运行在特定类型的 CPU 上而放松警惕。</p>
</blockquote>
<p>注意:使用<code>volatile</code>不能阻止写-读被交换</p>
<table>
<thead>
<tr>
<th style="text-align: left">第一条指令</th>
<th style="text-align: left">第二条指令</th>
<th style="text-align: left">是否会被交换</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: left">读</td>
<td style="text-align: left">读</td>
<td style="text-align: left">不会</td>
</tr>
<tr>
<td style="text-align: left">读</td>
<td style="text-align: left">写</td>
<td style="text-align: left">不会</td>
</tr>
<tr>
<td style="text-align: left">写</td>
<td style="text-align: left">写</td>
<td style="text-align: left">不会(CLR 确保写-写操作永远不会被交换,就算是没有<code>volatile</code>关键字)</td>
</tr>
<tr>
<td style="text-align: left">写</td>
<td style="text-align: left">读</td>
<td style="text-align: left"><strong>会!</strong></td>
</tr>
</tbody>
</table>
<p>在下面案例中仍然有可能会打印00的情况(对a的读取可能发生在写入前--重排序)</p>
<pre><code class="language-c#">int a = 0, b = 0;
int x = 0, y = 0;
var task1 = Task.Run(() =&gt;
{
    Thread.VolatileWrite(ref a, 1);
    x = Thread.VolatileRead(ref b);
});
var task2 = Task.Run(() =&gt;
{
    Thread.VolatileWrite(ref b, 2);
    y = Thread.VolatileRead(ref a);
});
Task.WaitAll(task1, task2);

Console.WriteLine("x:" + x + " y:" + y);
</code></pre>
<p><code>volatile</code>关键字不能应用于数组元素,不能用在捕获的局部变量:这些情况下你必须使用<code>VolatileRead</code>和<code>VolatileWrite</code>方法</p>
<p>从上面的例子我们可以看出,写-读操作可能被重新排序,官方的解释是:</p>
<blockquote>
<p>在多处理器系统上,易失性读取操作不保证获取由任何处理器写入该内存位置的最新值。 同样,易失性写入操作不保证写入的值会立即对其他处理器可见。</p>
<p>(我的理解是:<code>volatile</code>关键字只能解决重排序问题,解决不了多处理器的缓存一致性问题)</p>
</blockquote>
<p>注意<code>double</code> 和 <code>long</code>无法标记为 <code>volatile</code>,因为对这些类型的字段的读取和写入不能保证是原子的。 若要保护对这些类型字段的多线程访问,请使用 Interlocked 类成员或使用 <code>lock</code> 语句保护访问权限。</p>
<h3 id="interlocked">Interlocked</h3>
<p>位于<code>System.Threading</code>,为多个线程共享的变量提供原子操作,这也是DOTNET为数不多的线程安全类型之一。</p>
<p><code>Interlocked</code>通过将原子性的需求传达给操作系统和CLR来进行实现其功能,此类的成员不会引发异常。</p>
<p>可以防止 1.线程上下文切换,2.线程更新可由其他线程访问的变量时,或者当两个线程同时在不同的处理器上执行时可能会出现的错误。</p>
<p>场景:</p>
<pre><code class="language-c#">int i = 0;
i ++;
</code></pre>
<p>在大多数计算机上,自增并不是原子操作,需要以下步骤:</p>
<ol>
<li>将变量<code>i</code>的值加载到寄存器中。</li>
<li>计算<code>i + 1</code>。</li>
<li>将上面的计算结果存储在变量<code>i</code>中。</li>
</ol>
<p>假设A线程执行完1-2时被抢占,B线程执行1-2-3,当A线程恢复时继续执行3,此时B线程的值就被覆盖掉了。</p>
<p>使用<code>Increment</code>即可解决,123会被打包成一个操作,以原子的方式实现自增</p>
<h3 id="cas">CAS</h3>
<p>定义(摘自百度百科):</p>
<p>CAS 操作包含三个操作数 —— 内存位置(V)、预期原值(A)和新值(B)。如果内存位置的值与预期原值相匹配,那么处理器会自动将该位置值更新为新值。否则,处理器不做任何操作。无论哪种情况,它都会在 CAS 指令之前返回该位置的值。(在 CAS 的一些特殊情况下将仅返回 CAS 是否成功,而不提取当前值。)CAS 有效地说明了“我认为位置 V 应该包含值 A;如果包含该值,则将 B 放到这个位置;否则,不要更改该位置,只告诉我这个位置的值即可。”</p>
<p>Interlocked.CompareExchange,实现了CAS:比较两个值是否相等,如果相等,则替换第一个值,否则什么都不做,最终返回这个位置的原始值。</p>
<pre><code>Interlocked.CompareExchange(ref _num, 1000, 500);
</code></pre>
<p>CAS在保证原子性读写的同时,没有加锁,保障了程序并发度,但也存在缺陷:</p>
<ul>
<li>ABA问题</li>
<li>只能保证一个地址的读写原子性</li>
<li>自旋CAS时间过长,容易给CPU带来大开销</li>
</ul>
<h2 id="二延迟初始化">二、延迟初始化</h2>
<p>面试时候经常问:<strong>单例模式中的懒汉模式线程安全问题</strong></p>
<p>场景:某个字段构造开销非常大,使得在初始化<code>A</code>时需要承担初始化<code>Expensive</code>的开销,即使Expensive字段不会被用到。</p>
<pre><code class="language-c#">public class A
{
    public readonly Expensive Expensive = new Expensive();
    // ..
}

public class Expensive
{
    // 构造开销非常昂贵
}
</code></pre>
<p>自然会想到懒汉模式:按需加载</p>
<pre><code class="language-c#">public class B
{
    private Expensive _expensive;

    public Expensive GetExpensiveInstance()
    {
      if (_expensive == null) _expensive = new Expensive();

      return _expensive;
    }
}
</code></pre>
<p>新的问题产生:<code>GetExpensiveInstance</code>是线程安全的吗?我们可以通过加锁解决</p>
<pre><code class="language-c#">public class C
{
    private readonly object _locker = new object();
    private Expensive _expensive;

    public Expensive GetExpensiveInstance()
    {
      lock (_locker)
      {
            if (_expensive == null) _expensive = new Expensive();
            return _expensive;
      }
    }
}
</code></pre>
<p>现在面试官继续问:还有性能更好的版本吗?..</p>
<h3 id="lazy">Lazy<t></t></h3>
<p>net standard1.0 提供<code>System.Lazy&lt;T&gt;</code>来帮助你以线程安全且高效的方式(DCL)解决延迟初始化问题,只需</p>
<pre><code class="language-c#">public class D
{
    private Lazy&lt;Expensive&gt; _expensive = new Lazy&lt;Expensive&gt;(() =&gt; new Expensive(), true);

    public Expensive GetExpensiveInstance() =&gt; _expensive.Value;
}
</code></pre>
<p>第一个参数是一个委托,告知如何构建,第二个参数是boolean类型,传<code>false</code>实现的就是上面提到的<code>plain B</code>非线程安全迟初始化</p>
<p>双检锁 double checked locking会进行一次额外的<strong>易失读(volatile read)</strong>,在对象已经完成初始化时,能够避免获取锁产生的开销。</p>
<pre><code class="language-c#">public class E
{
    private readonly object _locker = new object();
    private volatile Expensive _expensive;

    public Expensive GetExpensiveInstance()
    {
      // 额外的易失读(volatile read)
      if (_expensive == null)
      {
            lock (_locker)
            {
                if (_expensive == null) _expensive = new Expensive();
            }
      }
      
      return _expensive;
    }
}
</code></pre>
<h3 id="lazyinitializer">LazyInitializer</h3>
<p><code>LazyInitializer</code>是一个静态类,提供<code>EnsureInitialized</code>方法,第一个参数是需要构造的变量地址,第二个参数是一个委托,告知如何构造</p>
<pre><code class="language-c#">public class F
{
    private Expensive _expensive;

    public Expensive GetExpensiveInstance()
    {
      LazyInitializer.EnsureInitialized(ref _expensive,
            () =&gt; new Expensive());
      return _expensive;
    }
}
</code></pre>
<p>它使用竞争初始化模式的实现,比双检锁更快(在多核心情况下),因为它的实现完全不使用锁。这是一个很少需要用到的极端优化,并且会带来以下代价:</p>
<ul>
<li>当参与初始化的线程数大于核心数时,它会更慢。</li>
<li>可能会因为进行了多余的初始化而浪费 CPU 资源。</li>
<li>初始化逻辑必须是线程安全的(例如,<code>Expensive</code>的构造器对静态字段进行写,就不是线程安全的)。</li>
<li>如果初始化的对象是需要进行销毁的,多余的对象需要额外的逻辑才能被销毁。</li>
</ul>
<p>竞争初始化(race-to-initialize)模式,通过易失性和CAS,实现无锁构造</p>
<pre><code class="language-c#">public class G
{
    private volatile Expensive _expensive;
    public Expensive Expensive
    {
      get
      {
            if (_expensive == null)
            {
                var instance = new Expensive();
                Interlocked.CompareExchange (ref _expensive, instance, null);
            }
            return _expensive;
      }
    }
}
</code></pre>
<h2 id="三线程局部存储">三、线程局部存储</h2>
<p>我们花费了大量篇幅来讲并发访问公共数据问题,前文提到的锁构造,信号构造,无锁构造本质上都是使用同步构造,使得多线程在访问公共数据时能安全的进行,然而有时我们会希望数据在线程间是隔离的,局部变量就能实现这个目的,但他们的生命周期总是那么短暂(随代码块而释放),我们期待更大作用域的隔离数据,<strong>线程局部变量(thread-local storage,TLS)</strong>就可以实现这个目的。</p>
<h3 id="threadstatic">ThreadStatic</h3>
<p>被ThreadStatic标记的static字段不会在线程间共享,每个执行线程都有一个单独的字段实例</p>
<p>Note:</p>
<ul>
<li>被标记的必须是static字段,不能在实例字段上使用(添加了也无效)</li>
<li>请不要给被标记的字段指定初始值,因为这种初始化只会在类被构造时执行一次,影响一个线程,因此他依赖零值</li>
</ul>
<p>如果你需要使用实例字段,或者非零值,请使用<code>ThreadLocal&lt;T&gt;</code></p>
<pre><code class="language-c#">public class ThreadStatic测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    private static int _num;

    public ThreadStatic测试(ITestOutputHelper testOutputHelper)
    {
      _testOutputHelper = testOutputHelper;
    }

   
    void Show()
    {
      void Work()
      {
            for (int i = 0; i &lt; 100000; i++)
            {
                _num++;
                _testOutputHelper.WriteLine(_num.ToString());
            }
      }

      var t1 = new Thread(Work);
      var t2 = new Thread(Work);

      t1.Start();
      t2.Start();
      t1.Join();
      t2.Join();

      _testOutputHelper.WriteLine(_num.ToString());
    }
}
</code></pre>
<p>输出:</p>
<pre><code class="language-shell">100000
100000
0
</code></pre>
<h3 id="localdatastoreslot">LocalDataStoreSlot</h3>
<p>封装内存槽以存储本地数据。 此类不能被继承。.NET Framework 1.1加入,但在standard2.0+才有。</p>
<pre><code class="language-c#">public sealed class LocalDataStoreSlot
</code></pre>
<p>.NET Framework 提供了两种机制,用于使用线程本地存储 (TLS) :<code>LocalDataStoreSlot</code>和<code>ThreadStaticAttribute</code></p>
<p><code>LocalDataStoreSlot</code>比<code>ThreadStaticAttribute</code>更慢,更尴尬。此外,数据存储为类型 <code>Object</code>,因此必须先将其强制转换为正确的类型,然后再使用它。</p>
<p>有关使用 TLS 的详细信息,请参阅 线程本地存储。</p>
<p>同样,.NET Framework 提供了两种使用上下文本地存储的机制:<code>LocalDataStoreSlot</code>和<code>ContextStaticAttribute</code>。 上下文相对静态字段是用属性标记的 ContextStaticAttribute 静态字段。 请参考注解</p>
<pre><code class="language-c#">// 同一个 LocalDataStoreSlot 对象可以跨线程使用。
LocalDataStoreSlot _slot = Thread.AllocateNamedDataSlot("mySlot");
void Work()
{
    for (int i = 0; i &lt; 100000; i++)
    {
      int num = (int)(Thread.GetData(_slot)??0);
      Thread.SetData(_slot, num + 1);
    }
    _testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());
}
var t1 = new Thread(Work);
var t2 = new Thread(Work);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
_testOutputHelper.WriteLine(((int)(Thread.GetData(_slot)??0)).ToString());
</code></pre>
<p>输出效果和<code>ThreadStaticAttribute</code>一样:</p>
<pre><code class="language-shell">100000
100000
0
</code></pre>
<p>使用<code>Thread.FreeNamedDataSlot("mySlot");</code>可以释放所有线程上的指定槽,但是只有在所有对该槽的引用都出了其作用域,并且被垃圾回收后才会真正释放。这确保了只要保持对槽的引用,就能继续使用槽。</p>
<p>你也可以通过<code>Thread.AllocateDataSlot()</code>来创建一个无名槽位,与命名槽的区别是无名槽需要自行控制作用域</p>
<p>当然我们也可以对上面复杂的᠍᠍᠍᠍᠍<code>Thread.GetData</code>,<code>Thread.SetData</code>进行封装</p>
<pre><code class="language-c#">LocalDataStoreSlot _secSlot = Thread.GetNamedDataSlot ("securityLevel");
int Num
{
    get
    {
      object data = Thread.GetData(_secSlot);
      return data == null ? 0 : (int) data;    // null 相当于未初始化。
    }
    set { Thread.SetData (_secSlot, value); }
}
</code></pre>
<h3 id="threadlocal">ThreadLocal<t></t></h3>
<p><code>ThreadLocal&lt;T&gt;</code>是 Framework 4.0 加入的,涵盖在netstandard1.0。它提供了可用于静态字段和实例字段的线程局部存储,并且允许设置默认值。</p>
<pre><code class="language-c#">public class ThreadLocal测试
{
    ThreadLocal&lt;int&gt; _num = new ThreadLocal&lt;int&gt; (() =&gt; 3);
    private readonly ITestOutputHelper _testOutputHelper;


    public ThreadLocal测试(ITestOutputHelper testOutputHelper)
    {
      _testOutputHelper = testOutputHelper;
    }

   
    void Show()
    {
      void Work()
      {
            for (int i = 0; i &lt; 100000; i++)
            {
                _num.Value++;
            }
            _testOutputHelper.WriteLine(_num.ToString());
      }

      var t1 = new Thread(Work);
      var t2 = new Thread(Work);

      t1.Start();
      t2.Start();
      t1.Join();
      t2.Join();

      _testOutputHelper.WriteLine(_num.ToString());
    }
}
</code></pre>
<p>输出</p>
<pre><code class="language-shell">100003
100003
3
</code></pre>
<p>下面这个测试非常有意思</p>
<pre><code class="language-c#">
void Show()
{
    var threadName = new ThreadLocal&lt;string&gt;(() =&gt; "Thread" + Thread.CurrentThread.ManagedThreadId);
    Parallel.For(0, 13, x =&gt;
    {
      bool repeat = threadName.IsValueCreated;
      _testOutputHelper.WriteLine($"ThreadName = {threadName.Value} {(repeat ? "(repeat)" : "")}");
    });
   
    threadName.Dispose();// 释放资源
}
</code></pre>
<p>你会发现当Parallel.For第二个参数超过你的逻辑内核后,repeat出现了!</p>
<pre><code>ThreadName = Thread5
ThreadName = Thread8
ThreadName = Thread31
ThreadName = Thread29
ThreadName = Thread31 (repeat)
ThreadName = Thread30
ThreadName = Thread18
ThreadName = Thread12
ThreadName = Thread32
ThreadName = Thread28
ThreadName = Thread33
ThreadName = Thread35
ThreadName = Thread34
</code></pre>
<p><code>Random</code>类不是线程安全的,所以我们要不然在使用<code>Random</code>时加锁(这样限制了并发),如今我们有了ThreadLocal:</p>
<pre><code class="language-c#">var localRandom = new ThreadLocal&lt;Random&gt;(() =&gt; new Random());
</code></pre>
<p>很轻易的就解决了线程安全问题,但是上面的版本使用的<code>Random</code>的无参构造方法,会依赖系统时间作为生成随机数的种子,在大概 10ms 时间内创建的两个<code>Random</code>对象可能会使用相同的种子,下边是解决这个问题的一个办法:</p>
<pre><code class="language-c#">var localRandom = new ThreadLocal&lt;Random&gt;(() =&gt; new Random (Guid.NewGuid().GetHashCode()) );
</code></pre>
<blockquote>
<p>特别注意,不要以为GUID全局唯一,GUID的HashCode也全局唯一,上面的随机数仍然不是真随机</p>
</blockquote>
<h3 id="asynclocal">AsyncLocal</h3>
<p>MSDN:表示给定异步控制流(如异步方法)的本地环境数据。</p>
<p>说人话就是:类似<code>ThreadLocal</code>,但是<code>ThreadLocal</code>不支持<code>await</code>导致线程切换,而<code>AsyncLocal</code>支持</p>
<p>由于基于<code>Task</code>的异步编程模型倾向于抽象线程的使用,因此可以使用<code>AsyncLocal&lt;T&gt;</code>实例跨线程持久化数据。</p>
<pre><code>public sealed class AsyncLocal&lt;T&gt;
</code></pre>
<p>当与当前线程关联的值发生更改时会调用<code>valueChangedHandler</code></p>
<pre><code>public AsyncLocal(Action&lt;AsyncLocalValueChangedArgs&lt;T&gt;&gt;? valueChangedHandler)
</code></pre>
<p>对比<code>ThreadLocal</code></p>
<pre><code>AsyncLocal&lt;int&gt; _num = new AsyncLocal&lt;int&gt;();
ThreadLocal&lt;int&gt; _num2 = new ThreadLocal&lt;int&gt;();
async Task Work()
{
    for (int i = 0; i &lt; 100000; i++)
    {
      _num.Value++;
      _num2.Value++;
    }
    _testOutputHelper.WriteLine(_num.Value.ToString());
    _testOutputHelper.WriteLine(_num2.Value.ToString());
    await Task.Delay(100);
    for (int i = 0; i &lt; 100000; i++)
    {
      _num.Value++;
      _num2.Value++;
    }
    _testOutputHelper.WriteLine(_num.Value.ToString());
    _testOutputHelper.WriteLine(_num2.Value.ToString());
}
await Work();
</code></pre>
<p>输出:</p>
<pre><code>100000
100000
200000
100000
</code></pre>
<h2 id="四monitor之信号构造">四、Monitor之信号构造</h2>
<p>回顾一下信号构造</p>
<p><strong>信号构造的本质:一个线程阻塞直到收到另一个线程发来的通知。</strong></p>
<p>在同步基础中我们已经讲过了dotnet常见的信号构造api,今天,让我们再次看看<code>Monitor</code>,它是如何完成信号构造的。</p>
<p><code>Wait</code>和<code>Pulse</code>几乎是万能的,通过一个<code>bool</code>标识我们就能实现<code>AutoResetEvent/ManualResetEvent</code>的功能,同理使用一个整形字段,就可以实现<code>CountdownEvent/Semaphore</code>,几乎任何信号构造api无法满足的功能,都能由<code>Monitor</code>完成,<code>Monitor</code>是信号构造的终极,如果你的功力足够,它可以是整个阻塞构造的终极</p>
<h3 id="wait和pulse">Wait和Pulse</h3>
<p><img src="https://img2022.cnblogs.com/blog/1510705/202211/1510705-20221116093207322-1527097438.png" alt="" loading="lazy"></p>
<p>当多线程<code>Wait</code>同一对象时,就形成了一个“<strong>等待队列(waiting queue)</strong>”,和用于等待获得锁的“<strong>就绪队列(ready queue)</strong>”不同,每次调用<code>Pulse</code>时会释放队头线程,它会进入就绪队列,然后重新获取锁。可以把它想象成一个自动停车场,首先你在收费站(等待队列)排队验票,然后在栅栏前(就绪队列)排队等待放行。</p>
<p>这个队列结构天然有序,但是,对于<code>Wait/Pulse</code>应用通常不重要,在这种场景下把它想象成一个等待线程的“<strong>池(pool)</strong>”更好理解,每次调用<code>Pulse</code>都会从池中释放一个等待线程。</p>
<p><code>PulseAll</code>释放整个等待队列或者说等待池。收到<code>Pulse</code>的线程不会完全同时开始执行,而是有序的执行,因为每个<code>Wait</code>语句都要试图重新获取同一把锁。他们的效果就是,<code>PulseAll</code>将线程从等待队列移到就绪队列中,让它们可以继续有序执行。</p>
<ol>
<li>
<p>定义一个字段,作为同步对象</p>
<pre><code class="language-c#">private readonly object _locker = new object();
</code></pre>
</li>
<li>
<p>定义一个或多个字段,作为阻塞条件</p>
<pre><code class="language-c#">private bool _ok;
</code></pre>
</li>
<li>
<p>当你希望阻塞的时候</p>
<p><code>Monitor.Wait</code>在等待脉冲时,同步对象上的锁会被释放,并且进入阻塞状态,直到收到 _locker上的脉冲,收到脉冲后重新获取_locker,如果此时 _locker 已经被别的线程占有,则继续阻塞,直至_获取 _locker</p>
<pre><code class="language-c#">lock (_locker)
{
    while (!_ok)
    {
      Monitor.Wait (_locker);
    }
}
</code></pre>
</li>
<li>
<p>当你希望改变阻塞条件时</p>
<pre><code class="language-c#">lock (_locker)
{
    _ok = true;
    Monitor.Pulse(_locker);// Monitor.PulseAll(_locker);
}
</code></pre>
</li>
</ol>
<p>使用<code>Wait/Pulse</code>需要注意:</p>
<ul>
<li><code>Wait / Pulse</code>不能lock块之外使用,否则会抛异常。</li>
<li><code>Pulse</code>最多释放一个线程,而<code>PulseAll</code>释放所有线程。</li>
<li><code>Wait</code>会立即释放当前持有的锁,然后进入阻塞,等待脉冲</li>
<li>收到脉冲会立即尝试重新获取锁,如果在指定时间内重新获取,则返回<code>true</code>,如果在超过指定时间获取,则返回<code>false</code>,如果没有获取锁,则一直阻塞不会返回</li>
</ul>
<p>性能方面,调用<code>Pulse</code>花费大概约是在等待句柄上调用<code>Set</code>三分之一的时间。但是,使用<code>Wait</code>和<code>Pulse</code>进行信号同步,对比事件等待句柄有以下缺点:</p>
<ul>
<li>
<p><code>Wait / Pulse</code>不能跨越应用程序域和进程使用。</p>
</li>
<li>
<p>必须通过锁保护所有信号同步逻辑涉及的变量。</p>
</li>
</ul>
<h3 id="等待超时">等待超时</h3>
<p>调用<code>Wait</code>方法时,你可以设定一个超时时间,可以是毫秒或<code>TimeSpan</code>的形式。如果因为超时而放弃了等待,那么<code>Wait</code>方法就会返回<code>false</code>。</p>
<pre><code class="language-c#">public static bool Wait(object obj, TimeSpan timeout)
</code></pre>
<p>如果在超时到达时仍然没有获得一个脉冲,CLR会主动给它发送一个<strong>虚拟的脉冲(virtual pulse)</strong>,使其能够重新获得锁,然后继续执行,就像收到一个真实脉冲一样。</p>
<p>下面这个例子非常有用,它可以定期的检查阻塞条件。即使其它线程无法按照预期发送脉冲,例如程序之后被其他人修改,但没能正确使用<code>Pulse</code>,这样也可以在一定程度上免疫 bug。因此在复杂的同步设计中可以给所有<code>Wait</code>指定超时时间。</p>
<pre><code class="language-c#">lock (_locker)
while (/* &lt;blocking-condition&gt; */)
    Monitor.Wait (_locker, /* &lt;timeout&gt; */);
</code></pre>
<blockquote>
<p><code>Monitor.Wait</code>的boolean类型返回值其实还可以这么理解:其返回值意味着是否获得了一个“真实的脉冲“。</p>
<p>如果”虚拟的脉冲“并不是期待的行为,可以记录日志或抛出异常。</p>
</blockquote>
<p><code>Wait</code>等待一个变量上的脉冲,<code>Pulse</code>对一个变量发送脉冲。脉冲也是一种信号形式,相对于事件等待句柄那种<strong>锁存(latching)</strong>信号,脉冲顾名思义是一种非锁存或者说<strong>易失</strong>的信号</p>
<h3 id="双向信号与竞争状态">双向信号与竞争状态</h3>
<p><code>Monitor.Pulse</code>是一种单向通信机制:发送脉冲的线程不关心发出的脉冲被谁收到了,他没有返回值,不会阻塞,内部也没有确认机制。</p>
<p>当一个线程发起一次脉冲:</p>
<ul>
<li>如果等待队列中没有任何线程,那么这次发起的脉冲不会有任何效果。</li>
<li>如果等待队列中有线程,线程发送完脉冲并释放锁后,并不能保证接到脉冲信号的等待线程能立即开始工作。</li>
</ul>
<p>然后我们有一些场景依赖等待线程能够在收到脉冲后及时的响应,此时,双向信号出现了,这是一种自定义的确认机制。</p>
<p>在上文的信号构造基础上改造一个竞争状态的案例:</p>
<pre><code class="language-c#">public class 竞争状态测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new object();
    private bool _ok;

    public 竞争状态测试(ITestOutputHelper testOutputHelper)
    {
      _testOutputHelper = testOutputHelper;
    }

   
    void Show()
    {
      new Thread(() =&gt;// Worker
      {
            for (int i = 0; i &lt; 5; i++)
                lock (_locker)
                {
                  while (!_ok) Monitor.Wait(_locker);
                  _ok = false;
                  _testOutputHelper.WriteLine("Wassup?");
                }
      }).Start();

      for (int i = 0; i &lt; 5; i++)
      {
            lock (_locker)
            {
                _ok = true;
                Monitor.Pulse(_locker);
            }
      }
    }
}
</code></pre>
<p>我们期待的结果:</p>
<pre><code class="language-shell">Wassup?
Wassup?
Wassup?
Wassup?
Wassup?
</code></pre>
<p>实际上这个这个程序可能一次”Wassup?“都不会输出:主线程可能在工作线程启动之前完成,这五次<code>Pulse</code>啥事都没干</p>
<p>还记得我们讲事件等待句柄时,使用<code>AutoResetEvent</code>来模拟的双向信号吗?现在使用Monitor来实现一个扩展性更好的版本</p>
<pre><code class="language-c#">public class 双向信号测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    private readonly object _locker = new();
    private bool _entry; // 我是否可以工作了
    private bool _ready; // 我是否可以继续投递了

    public 双向信号测试(ITestOutputHelper testOutputHelper)
    {
      _testOutputHelper = testOutputHelper;
    }

   
    void Show()
    {
      new Thread(() =&gt;
      {
            Thread.Sleep(100);
            for (int i = 0; i &lt; 5; i++)
            {
                lock (_locker)
                {
                  _ready = true;
                  Monitor.PulseAll(_locker);
                  while (!_entry) Monitor.Wait(_locker);
                  _entry = false;
                  _testOutputHelper.WriteLine("Wassup?");
                }
            }
      }).Start();

      for (int i = 0; i &lt; 5; i++)
      {
            lock (_locker)
            {
                while (!_ready) Monitor.Wait(_locker);
                _ready = false;
                _entry = true;
                Monitor.PulseAll(_locker);
            }
      }
    }
}
</code></pre>
<p>我们仍然使用<code>_ready</code>来作为上游脉冲线程的自旋条件,使用<code>_entry</code>作为下游等待线程的自旋条件。由于我们的逻辑都在lock语句中,即使之后引入了第三个线程,我们的逻辑仍然不会出问题,<code>_ready</code>和<code>_entry</code>的读写总是原子的。</p>
<h3 id="升级生产消费队列">升级生产消费队列</h3>
<ol>
<li>
<p>这次,我们将允许多个消费者,各自拥有独立的消费线程。使用一个数组来存放这些线程,并且他们接收的不再是string,而是更加灵活的委托:</p>
<pre><code class="language-c#">private Thread[] _workers;
private Queue&lt;Action&gt; _queue = new Queue&lt;Action&gt;();
</code></pre>
</li>
<li>
<p>和上次一样,我们传递null来告知消费者线程退出:</p>
<pre><code class="language-c#">foreach (var worker in _workers)
{
    AddTask(null);
}
</code></pre>
</li>
<li>
<p>在告知消费线程退出后<code>Join</code>这些线程,等待未完成的任务被消费:</p>
<pre><code class="language-c#">foreach (var worker in _workers)
{
    worker.Join();
}
</code></pre>
</li>
<li>
<p>每个工作线程会执行一个名为<code>Consume</code>的方法。我们在构造队列时循环创建和启动这些线程:</p>
<pre><code class="language-c#">_workers = new Thread;
for (int i = 0; i &lt; workerCount; i++)
{
    _workers = new Thread(Consume);
    _workers.Start();
}
</code></pre>
</li>
<li>
<p>消费<code>Comsume</code>方法,一个工作线程从队列中取出并执行一个项目。我们希望工作线程没什么事情做的时候,或者说当队列中没有任何项目时,它们应该被阻塞。因此,我们的阻塞条件是<code>_queue.Count == 0</code>:</p>
<pre><code class="language-c#">private void Consume()
{
    while (true)
    {
      Action task;
      lock (_locker)
      {
            while (_queue.Count == 0)
            {
                Monitor.Wait(_locker);// 队列里没任务,释放锁,进入等待
            }
            // 获取新任务,重新持有锁
            task = _queue.Dequeue();
      }
      
      if (task == null) return;// 空任务代表退出
      task();// 执行任务
    }
}
</code></pre>
</li>
<li>
<p>添加一个任务。出于效率考虑,加入一个任务时,我们调用<code>Pulse</code>而不是<code>PulseAll</code>。这是因为每个项目只需要唤醒(至多)一个消费者。如果你只有一个冰激凌,你不会把一个班 30 个正在睡觉的孩子都叫起来排队获取它。</p>
<pre><code class="language-c#">public void AddTask(Action task)
{
    lock (_locker)
    {
      _queue.Enqueue(task);
      Monitor.Pulse(_locker);
    }
}
</code></pre>
</li>
</ol>
<h3 id="模拟等待句柄"><strong>模拟等待句柄</strong></h3>
<p>在双向信号中,你可能注意到了一个模式:<code>_flag</code>在当前线程被作为自旋阻塞条件,在另一线程中被设置为<code>true</code>,跳出自旋</p>
<pre><code class="language-c#">lock(_locker)
{
    while (!_flag) Monitor.Wait(_locker);
        _flag = false;
}
</code></pre>
<h4 id="manualresetevent">ManualResetEvent</h4>
<p>事实上它的工作原理就是模仿<code>AutoResetEvent</code>。如果去掉<code>_flag=false</code>,就得到了<code>ManualResetEvent</code>的基础版本。</p>
<pre><code class="language-c#">private readonly object _locker = new object();
private bool _signal;
void WaitOne()
{
    lock (_locker)
    {
      while (!_signal) Monitor.Wait(_locker);
    }
}
void Set()
{
    lock (_locker)
    {
      _signal = true;
      Monitor.PulseAll(_locker);
    }
}
void Reset()
{
    lock (_locker) _signal = false;
}
</code></pre>
<p>使用<code>PulseAll</code>,是因为可能存在多个被阻塞的等待线程。而<code>EventWaitHandle.WaitOne()</code>的通行条件就是:<code>门</code>是开着的,<code>ManualResetEvent</code>被放行通过后不会自己关门,只能通过<code>Reset</code>将门关上,再次期间其它所有阻塞线程都能通行。</p>
<h4 id="autoresetevent">AutoResetEvent</h4>
<p>实现<code>AutoResetEvent</code>非常简单,只需要将<code>WaitOne</code>方法改为:</p>
<pre><code class="language-c#">lock (_locker)
{
    while (!_signal) Monitor.Wait(_locker);
    _signal = false;// 添加一条,自己关门
}
</code></pre>
<p>然后将<code>Set</code>方法改为:</p>
<pre><code class="language-c#">lock (_locker)
{
    _signal = true;
    Monitor.Pulse(_locker);// PulseAll替换成Pulse:
}
</code></pre>
<h4 id="semaphore">Semaphore</h4>
<p>把<code>_signal</code>替换为一个整型字段可以得到<code>Semaphore</code>的基础版本</p>
<pre><code class="language-c#">public class 模拟信号量
{
    private readonly object _locker = new object();
    private int _count, _initialCount;
    public 模拟信号量(int initialCount)
    {
      _initialCount = initialCount;
    }
   
    void WaitOne()// +1
    {
      lock (_locker)
      {
            _count++;
            while (_count &gt;= _initialCount)
            {
                Monitor.Wait(_locker);
            }
      }
    }

    void Release()// -1
    {
      lock (_locker)
      {
            _count --;
            Monitor.Pulse(_locker);
      }
    }
}
</code></pre>
<h3 id="模拟countdownevent">模拟CountdownEvent</h3>
<p>是不是非常类似信号量?</p>
<pre><code class="language-c#">public class 模拟CountdownEvent
{
    private object _locker = new object();
    private int _initialCount;

    public 模拟CountdownEvent(int initialCount)
    {
      _initialCount = initialCount;
    }

    public void Signal()// +1
    {
      AddCount(-1);
    }

    public void AddCount(int amount)// +amount
    {
      lock (_locker)
      {
            _initialCount -= amount;
            if (_initialCount &lt;= 0) Monitor.PulseAll(_locker);
      }
    }

    public void Wait()
    {
      lock (_locker)
      {
            while (_initialCount &gt; 0)
                Monitor.Wait(_locker);
      }
    }
}
</code></pre>
<h3 id="线程会合">线程会合</h3>
<h4 id="countdownevent">CountdownEvent</h4>
<p>利用我们刚刚实现的<code>模拟CountdownEvent</code>,来实现两个线程的会和,和同步基础中提到的<code>WaitHandle.SignalAndWait</code>一样。</p>
<p>并且我们也可以通过<code>initialCount</code>将会和的线程扩展到更多个,显而易见的强大。</p>
<pre><code class="language-c#">public class 线程会和测试
{
    private readonly ITestOutputHelper _testOutputHelper;
    private 模拟CountdownEvent _countdown = new 模拟CountdownEvent(2);

    public 线程会和测试(ITestOutputHelper testOutputHelper)
    {
      _testOutputHelper = testOutputHelper;
    }

   
    public void Show()
    {
      // 每个线程都睡眠一段随机时间
      Random r = new Random();
      new Thread(Mate).Start(r.Next(10000));
      Thread.Sleep(r.Next(10000));

      _countdown.Signal();
      _countdown.Wait();

      _testOutputHelper.WriteLine("Mate! ");
    }

    void Mate(object delay)
    {
      Thread.Sleep((int)delay);

      _countdown.Signal(); //+1
      _countdown.Wait();

      _testOutputHelper.WriteLine("Mate! ");
    }
}
</code></pre>
<p>上面例子,每个线程随机休眠一段时间,然后等待对方,他们几乎在同时打印”Mate!“,这被称为<strong>线程执行屏障(thread execution barrier)</strong></p>
<p>当你想让多个线程执行一个系列任务,希望它们步调一致时,可以用到线程执行屏障。然而,我们现在的解决方案有一定限制:我们不能重用同一个<code>Countdown</code>对象来第二次会合线程,至少在没有额外信号构造的情况下不能。为解决这个问题,Framework 4.0 提供了一个新的类<code>Barrier</code>。</p>
<h4 id="barrier">Barrier</h4>
<p>Framework 4.0 加入的一个信号构造。它实现了线程执行屏障(thread execution barrier),允许多个线程在一个时间点会合。这个类非常快速和高效,它是建立在<code>Wait / Pulse</code>和自旋锁基础上的。</p>
<ol>
<li>
<p>实例化它,指定有多少个线程参与会合(可以调用<code>AddParticipants / RemoveParticipants</code>来进行更改)。</p>
<pre><code class="language-c#">public Barrier(int participantCount)
</code></pre>
</li>
<li>
<p>当希望会合时,调用<code>SignalAndWait</code>。表示参与者已到达障碍,并等待所有其他参与者到达障碍</p>
<pre><code class="language-c#">public void SignalAndWait()
</code></pre>
<p>他还实现了<strong>协作取消模式</strong></p>
<pre><code class="language-c#">public void SignalAndWait(CancellationToken cancellationToken)
</code></pre>
<p>并提供了超时时间的重载,返回一个<code>bool</code>类型,true标识在规定的时间,其他参与者到达障碍,false标识没有全部到达</p>
<pre><code class="language-c#">public bool SignalAndWait(TimeSpan timeout)
</code></pre>
</li>
</ol>
<p>实例化<code>Barrier</code>,参数为 3 ,意思是调用<code>SignalAndWait</code>会被<strong>阻塞</strong>直到该方法被调用 3 次。但与<code>CountdownEvent</code>不同,它会自动复位:再调用<code>SignalAndWait</code>仍会阻塞直到被调用 3 次。这允许你保持多个线程“步调一致”,让它们执行一个系列任务。</p>
<p><img src="https://img2022.cnblogs.com/blog/1510705/202211/1510705-20221116093324615-1155582214.png" alt="" loading="lazy"></p>
<p>下边的例子中,三个线程步调一致地打印数字 0 到 4:</p>
<pre><code class="language-c#">private readonly ITestOutputHelper _testOutputHelper;
private Barrier _barrier = new Barrier(3);
public Barrier测试(ITestOutputHelper testOutputHelper)
{
    _testOutputHelper = testOutputHelper;
}

void Show()
{
    new Thread(Speak).Start();
    new Thread(Speak).Start();
    new Thread(Speak).Start();
}
void Speak()
{
    for (int i = 0; i &lt; 5; i++)
    {
      _testOutputHelper.WriteLine(i.ToString());
      _barrier.SignalAndWait();
    }
}
</code></pre>
<p><code>Barrier</code>还提供一个非常用有的构造参数,他是一个委托,会在每个会和处执行。不用担心抢占,因为当它被执行时,所有的参与者都是被阻塞的。</p>
<pre><code class="language-c#">public Barrier(int participantCount, Action&lt;Barrier&gt;? postPhaseAction)
</code></pre>
<h2 id="五拓展">五、拓展</h2>
<p>前景回顾:</p>
<p>还记得我们在讲同步的时候提到的<strong>最小化共享数据</strong>和<strong>无状态</strong>设计吗?经过前面的学习,稍加思考,其实引发线程安全的本质是多线程并发下的数据交互问题。如果我们的数据在线程之间没有交互,或者说我们的数据都是只读的,那不就天然的线程安全了吗?</p>
<p>现在你能理解为什么只读字段是天然线程安全的了吗?</p>
<p>然而有的场景下又需要对公共数据进行读写,同步篇中我们通过很简单的排它锁来保证线程安全,在这里,我们不在满足这种粗暴的粒度(事实上多数时候读总是多于写),这时,读写锁出现了。</p>
<h3 id="readerwriterlockslim">ReaderWriterLockSlim</h3>
<p><code>ReaderWriterLockSlim</code>在 Framework 3.5 加入的,被加入了standard 1.0,此类型是线程安全的,用于保护由多个线程读取的资源。</p>
<blockquote>
<p><code>ReaderWriterLockSlim</code>出现的目的是为了取缔<code>ReaderWriterLock</code>,他简化了递归规则以及锁状态的升级和降级规则。避免了许多潜在的死锁情况。 另外,他的性能显著优于<code>ReaderWriterLock</code>。 建议对所有新开发的项目使用<code>ReaderWriterLockSlim</code></p>
<p>然而如果与普通的<code>lock</code>(<code>Monitor.Enter / Exit</code>)对比,他还是要慢一倍。</p>
</blockquote>
<p><code>ReaderWriterLockSlim</code>有三种模式:</p>
<ul>
<li>
<p>读取模式:允许任意多的线程处于读取模式</p>
</li>
<li>
<p>可升级模式:只允许一个线程处于可升级模式,与读锁兼容</p>
</li>
<li>
<p>写入模式:完全互斥,不允许任何模式下的线程获取任何锁</p>
</li>
</ul>
<p><code>ReaderWriterLockSlim</code>定义了如下的方法来获取和释放读 / 写锁:</p>
<pre><code class="language-c#">public void EnterReadLock();
public void ExitReadLock();
public void EnterWriteLock();
public void ExitWriteLock();
</code></pre>
<p>另外,对应所有<code>EnterXXX</code>的方法,都有相应的<code>TryXXX</code>版本,可以接受一个超时参数,与<code>Monitor.TryEnter</code>类似。</p>
<p>让我们来看一个案例:</p>
<p>模拟三个读线程,两个写线程,并行执行</p>
<pre><code class="language-c#">new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Read).Start();
new Thread(Write).Start();
new Thread(Write).Start();
</code></pre>
<p>读方法是这样的</p>
<pre><code class="language-c#">while (true)
{
    _rw.EnterReadLock();
    foreach (int number in _items)
    {
      Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
      Thread.Sleep(100);
    }
    _rw.ExitReadLock();
}
</code></pre>
<p>写方法是这样的</p>
<pre><code class="language-c#">while (true)
{
    int number = _rand.Value.Next(100);
    _rw.EnterWriteLock();
    _items.Add(number);
    _rw.ExitWriteLock();
    Console.WriteLine("Thread " + Thread.CurrentThread.ManagedThreadId + " added " + number);
    Thread.Sleep(100);
}
</code></pre>
<p>随机数生成方法就是用的TLS讲过的</p>
<pre><code class="language-c#">new ThreadLocal&lt;Random&gt;(() =&gt; new Random(Guid.NewGuid().GetHashCode()));
</code></pre>
<p>需要注意<code>ReaderWriterLockSlim</code>实现了<code>IDisposable</code>,用完了请记得释放</p>
<pre><code class="language-c#">public class ReaderWriterLockSlim : IDisposable
</code></pre>
<p>运行结果:</p>
<pre><code class="language-c#">Thread 11 added 42
Thread 8 reading 42
Thread 6 reading 42
Thread 7 reading 42
Thread 10 added 98
Thread 8 reading 42
...
</code></pre>
<p>显而易见的,并发度变高了</p>
<h4 id="锁递归">锁递归</h4>
<p><code>ReaderWriterLockSlim</code>提供一个构造参数<code>LockRecursionPolicy</code>用于配置锁递归策略</p>
<pre><code class="language-c#">public ReaderWriterLockSlim(LockRecursionPolicy recursionPolicy)
</code></pre>
<pre><code class="language-c#">public enum LockRecursionPolicy
{
/// &lt;summary&gt;If a thread tries to enter a lock recursively, an exception is thrown. Some classes may allow certain recursions when this setting is in effect.&lt;/summary&gt;
NoRecursion,
/// &lt;summary&gt;A thread can enter a lock recursively. Some classes may restrict this capability.&lt;/summary&gt;
SupportsRecursion,
}
</code></pre>
<p>默认情况下是使用<code>NoRecursion</code>策略:不允许递归或重入,这与GO的读写锁设计不谋而合,建议使用此默认策略,因为递归引入了不必要的复杂性,并使代码更易于死锁。</p>
<pre><code class="language-c#">public ReaderWriterLockSlim() : this(LockRecursionPolicy.NoRecursion)
</code></pre>
<p>开启支持递归策略后,以下代码不会抛出<code>LockRecursionException</code>异常</p>
<pre><code class="language-c#">var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
rw.EnterReadLock();
rw.EnterReadLock();
rw.ExitReadLock();
rw.ExitReadLock();
</code></pre>
<p>递归锁定级别只能越来越小,级别顺序如下:<code>读锁,可升级锁,写锁</code>。下面代码会抛出<code>LockRecursionException</code>异常</p>
<pre><code class="language-c#">void F()
{
    var rw = new ReaderWriterLockSlim (LockRecursionPolicy.SupportsRecursion);
    rw.EnterReadLock();
    rw.EnterWriteLock();
    rw.EnterWriteLock();
    rw.ExitReadLock();
}
Assert.Throws&lt;LockRecursionException&gt;(F);
</code></pre>
<p>可升级锁例外,把可升级锁升级为写锁是合法的。</p>
<pre><code class="language-c#">var rw = new ReaderWriterLockSlim(LockRecursionPolicy.SupportsRecursion);
rw.EnterUpgradeableReadLock();
rw.EnterWriteLock();
rw.ExitWriteLock();
rw.ExitUpgradeableReadLock();
</code></pre>
<p>思考一个问题:为什么只允许一个线程处于可升级模式?</p>
<table>
<thead>
<tr>
<th style="text-align: left">SQL Server</th>
<th style="text-align: left">ReaderWriterLockSlim</th>
</tr>
</thead>
<tbody>
<tr>
<td style="text-align: left">共享锁(Share lock)</td>
<td style="text-align: left">读锁(Read lock)</td>
</tr>
<tr>
<td style="text-align: left">排它锁(Exclusive lock)</td>
<td style="text-align: left">写锁(Write lock)</td>
</tr>
<tr>
<td style="text-align: left">更新锁(Update lock)</td>
<td style="text-align: left">可升级锁(Upgradeable lock)</td>
</tr>
</tbody>
</table>
<h3 id="_"></h3>
<h3 id="timer">Timer</h3>
<p>如果你需要使用规律的时间间隔重复执行一些方法,这个例子会使得一个线程永远被占用</p>
<pre><code class="language-c#">while (true)
{
    // do something
    Thread.Sleep(1000);
}
</code></pre>
<p>这时候你会需要<code>Timer</code></p>
<p>创建计时器时,可以指定在方法首次执行之前等待的时间 <code>dueTime</code> ,以及后续执行之间等待的时间<code>period</code>。 类 Timer 的分辨率与系统时钟相同。 这意味着,如果<code>period</code>小于系统时钟的分辨率,委托将以系统时钟分辨率定义的时间间隔执行,在Windows 7 和Windows 8系统上大约为 15 毫秒。</p>
<pre><code class="language-c#">public Timer(TimerCallback callback, object? state, int dueTime, int period)
</code></pre>
<p>下面这个例子首次间隔1s,之后间隔500ms打印tick...</p>
<pre><code class="language-c#">Timer timer = new Timer ((data) =&gt;
{
    _testOutputHelper.WriteLine(data.ToString());
}, "tick...", 1000, 500);
Thread.Sleep(3000);
timer.Dispose();
</code></pre>
<p>计时器委托是在构造计时器时指定的,不能更改。 该方法不会在创建计时器的线程上执行;而是在<strong>线程池(thread pool)</strong>执行。</p>
<blockquote>
<p>如果计时器间隔<code>period</code>小于执行回调所需的时间,或者如果所有线程池线程都在使用,并且回调被多次排队,则可以在两个线程池线程上同时执行回调。</p>
<p>只要使用 Timer,就必须保留对它的引用。 与任何托管对象一样,当没有对其引用时,会受到垃圾回收的约束。 即使 Timer 仍然处于活动状态也不会阻止它被收集。</p>
<p>不再需要计时器时,请调用 Dispose 释放计时器持有的资源。请注意,调用 Dispose() 后仍然可能会发生回调,因为计时器将回调排队供线程池线程执行。可以使用<code>public bool Dispose(WaitHandle notifyObject)</code>重载等待所有回调完成。</p>
</blockquote>
<blockquote>
<p><code>System.Threading.Timer</code>是一个普通计时器。 它会回调一个线程池线程(来自工作池)。</p>
<p><code>System.Timers.Timer</code>是一个<code>System.ComponentModel.Component</code> ,它包装<code>System.Threading.Timer</code> ,并提供一些用于在特定线程上调度的附加功能。</p>
</blockquote><br><br>
来源:https://www.cnblogs.com/xiaolipro/p/16891311.html
頁: [1]
查看完整版本: C#多线程(三)线程高级篇