黄怀莹 發表於 2025-12-18 10:38:25

深度剖析golang中的sync包

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">简介</a></li><li><a href="#_label1">Mutex</a></li><li><a href="#_label2">RWMutex</a></li><li><a href="#_label3">WaitGroup</a></li><li><a href="#_label4">Once</a></li><li><a href="#_label5">Cond</a></li><li><a href="#_label6">Map</a></li><li><a href="#_label7">Pool</a></li></ul></div><p class="maodian"><a name="_label0"></a></p><h2>简介</h2>
<ul><li><code>golang</code>的sync包提供了一些并发控制的工具,在应用程序开发过程中是非常有用的,下面详细介绍下这些工具的原理和使用</li><li>在介绍工具之前,先讲解下内存模型中的<code>happens-before</code>关系</li></ul>
<div class="jb51code"><pre class="brush:go;">// 示例:Happens-Before关系
var a string
var done bool
func setup() {
    a = "hello, world"// 写操作A
    done = true         // 写操作B
}
func main() {
    go setup()
    for !done {      // 读操作C
      // 忙等待
    }
    print(a)         // 读操作D
}</pre></div>
<p><strong>关键点</strong>:</p>
<ul><li>如果B happens-before C,那么A happens-before D</li><li>sync包的作用就是建立这种happens-before关系</li></ul>
<p class="maodian"><a name="_label1"></a></p><h2>Mutex</h2>
<ul><li><code>golang</code>提供的锁,常用于保护共享资源的访问安全,使用如下</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
        var mu sync.Mutex
        mu.Lock()
        defer mu.Unlock()
        mu.TryLock()
}
</pre></div>
<ul><li><code>mu.Lock()</code>:加锁</li><li><code>mu.Unlock()</code>:解锁</li><li><code>mu.TryLock()</code>:尝试加锁,如果成功会返回<code>true</code></li></ul>
<p>锁结构如下</p>
<div class="jb51code"><pre class="brush:go;">type Mutex struct {
    state int32// 锁状态:包含多个标志位
    semauint32 // 信号量:用于阻塞goroutine
}
</pre></div>
<ul><li>加锁的时候,会先使用<code>cas</code>原子操作获取锁(CPU实现),如果拿不到说明已经被其他<code>goroutine</code>占有,通过自旋阻塞当前<code>goroutine</code>(这过程会通过PAUSE指令减少CPU功耗)</li></ul>
<div class="jb51code"><pre class="brush:go;">func (m *Mutex) Lock() {
    // 快速路径:尝试原子操作获取锁
    if atomic.CompareAndSwapInt32(&amp;m.state, 0, mutexLocked) {
      return
    }
    // 慢速路径:锁已被持有,需要等待
    m.lockSlow()
}</pre></div>
<ul><li>读写均衡或者写者较多的场景,使用这个锁(防止写者竞争,内存占用也更小)</li></ul>
<p class="maodian"><a name="_label2"></a></p><h2>RWMutex</h2>
<ul><li><code>golang</code>提供的读写锁,和上面的锁区别是区分了读和写两种场景。读之间的冲突不阻塞,若写锁遇到了读锁,需要等待所有读者释放,若读者和写者同时到达,读者要等待写者完成(写优先)</li></ul>
<div class="jb51code"><pre class="brush:go;">func (rw *RWMutex) Lock() {
    // 1. 获取互斥锁
    rw.w.Lock()
    // 2. 设置readerCount为负值,阻止新读者
    r := atomic.AddInt32(&amp;rw.readerCount, -rwmutexMaxReaders)
    // 3. 等待现有读者
    if r != 0 {
      runtime_Semacquire(&amp;rw.writerSem)
    }
}
func (rw *RWMutex) RLock() {
    // 检查是否有写者(readerCount &lt; 0表示有写者)
    if atomic.AddInt32(&amp;rw.readerCount, 1) &lt; 0 {
      // 有写者在等待或正在写,读者必须阻塞
      runtime_Semacquire(&amp;rw.readerSem)
    }
}</pre></div>
<ul><li>读多写少的场景使用这个读写锁,性能会有比较大的提升</li></ul>
<p class="maodian"><a name="_label3"></a></p><h2>WaitGroup</h2>
<ul><li><code>waitGroup</code>提供了下面三个方法,用来控制多个<code>goroutine</code>是否都执行完成,主<code>goroutine</code>调用<code>Add</code>方法设置要等待的goroutine数量,每个goroutine结束的时候,调用<code>Done</code>方法标记这个任务结束,<code>Wait</code>方法会等待所有goroutine调用完<code>Done</code>方法</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
        var wg sync.WaitGroup
        wg.Add(1) // 增加一个等待者
        go func() {
                defer wg.Done()
                // 执行逻辑, 逻辑执行完之后, 等待者数量-1
        }()
        wg.Wait() // 等待所有等待者执行完成
}</pre></div>
<ul><li>底层实现如下,它使用了一个state来存储等待者和要执行的工作数量</li></ul>
<div class="jb51code"><pre class="brush:go;">type WaitGroup struct {
    noCopy noCopy
    // 64位值的高32位是计数器,低32位是等待者数量
    // 64位原子操作需要64位对齐,但32位编译器不能确保这一点
    state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
}</pre></div>
<div class="jb51code"><pre class="brush:go;">// WaitGroup状态布局(64位)
// 高32位:计数器 (counter)
// 低32位:等待者数量 (waiters)
func (wg *WaitGroup) Add(delta int) {
    statep, semap := wg.state()
    // 更新计数器
    state := atomic.AddUint64(statep, uint64(delta)&lt;&lt;32)
    // 检查状态变化
    v := int32(state &gt;&gt; 32)// 计数器
    w := uint32(state)       // 等待者数量
    if v &gt; 0 || w == 0 {
      return // 还有工作要做,或者没有等待者
    }
    // 所有工作完成,唤醒等待者
    if *statep != 0 {
      panic("sync: WaitGroup misuse")
    }
    // 唤醒所有等待的goroutine
    for ; w != 0; w-- {
      runtime_Semrelease(semap, false, 0)
    }
}</pre></div>
<p>具体</p>
<ol><li>高32位 = 工作计数器<br />wg.Add(5) // 高32位 += 5,表示还有5个工作要做<br />wg.Done() // 高32位 -= 1,表示完成了1个工作</li></ol>
<ul><li>当高32位变为0时,表示所有工作完成</li></ul>
<ol start="2"><li>低32位 = 等待者数量<br />wg.Wait() // 低32位 += 1,表示1个goroutine开始等待</li></ol>
<ul><li>当高32位变为0时,低32位表示需要唤醒多少个goroutine</li></ul>
<p>当调用<code>Wait</code>方法时,也是一个自旋等待的逻辑。配合cas,实现等待所有工作完成。具体细节参考源码,这里只简单介绍下原理</p>
<p class="maodian"><a name="_label4"></a></p><h2>Once</h2>
<ul><li><code>sync.Once{}</code>提供了一个多次调用,只执行一次的方法实现,如下所示</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
        once := sync.Once{}
        once.Do(func() {
                // 这里的逻辑只会执行一次
        })
}</pre></div>
<ul><li>这个的底层实现比较简单,因为once内部维护了一个原子类,指向一个32位无符号数,当执行<code>func</code>的时候,如果有多个goroutine并发执行,两个goroutine会竞争锁,最终只有一个能够执行<code>func</code>的逻辑,执行完之后,这个数会设置为1,从而后面的所有goroutine都不会再走到执行逻辑。具体参考下面源码实现</li></ul>
<div class="jb51code"><pre class="brush:go;">func (o *Once) Do(f func()) {
        if o.done.Load() == 0 {
                o.doSlow(f)
        }
}
func (o *Once) doSlow(f func()) {
        o.m.Lock()
        defer o.m.Unlock()
        if o.done.Load() == 0 {
                defer o.done.Store(1)
                f()
        }
}</pre></div>
<p class="maodian"><a name="_label5"></a></p><h2>Cond</h2>
<ul><li>这个工具主要是协调访问共享资源的那些<code>goroutine</code>,<code>c.L</code>是工具内部的一个锁。当调用<code>c.L.Wait()</code>会解锁等待Signal或者Broadcast信号,Broadcast会唤醒所有等待的<code>Wait</code>,<code>Signal</code>会随机唤醒一个等待的<code>Wait</code>。具体使用方法如下面的例子所示</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
        cond := sync.NewCond(new(sync.Mutex))
        done := false
        read := func(name string, c *sync.Cond) {
                c.L.Lock()
                if !done {
                        c.Wait()
                }
                fmt.Println("start reading: ", name)
                c.L.Unlock()
        }
        write := func(name string, c *sync.Cond) {
                fmt.Println("start writing: ", name)
                c.L.Lock()
                done = true
                c.L.Unlock()
                fmt.Println("wakes all:", name)
                c.Signal()
        }
        go read("read3", cond)
        go read("read1", cond)
        go read("read2", cond)
        write("write1", cond)
        time.Sleep(5 * time.Second)
}</pre></div>
<p class="maodian"><a name="_label6"></a></p><h2>Map</h2>
<ul><li>golang在sync包内提供了一个并发安全的Map,可以增删改查,遍历、compareAndSwap、LoadOrStore等操作。可以做一些无锁编程,提升性能</li><li>读多写少(读操作占比&gt;80%)、数据量大(键值对数量&gt;1000)、写入不频繁(写入频率&lt;100次/秒)、简单键值等场景,使用这个的性能要优于普通map+锁</li></ul>
<p class="maodian"><a name="_label7"></a></p><h2>Pool</h2>
<ul><li>对于频繁进行内存分配和回收的场景,可以使用<code>sync.Pool{}</code>,例如下面的场景</li></ul>
<div class="jb51code"><pre class="brush:go;">// 频繁分配和回收的典型特征
func frequentAllocation() {
    pool := sync.Pool{
      New: func() interface{} {
            return make([]byte, 1024)
      },
    }
    // 在循环中重复创建和销毁对象
    for i := 0; i &lt; 1000; i++ {
      // 分配:创建新对象
      // buf := make([]byte, 1024)// 直接分配内存
      buf := pool.Get().([]byte) // 使用对象池
      // 使用对象
      copy(buf, data)
      process(buf)
      // 回收:对象超出作用域,被GC回收
      pool.Put(buf[:0]) // 归还对象(清空)
    }
    // 每次循环都经历:分配→使用→回收
}</pre></div>
頁: [1]
查看完整版本: 深度剖析golang中的sync包