深度剖析golang中的sync包
<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">简介</a></li><li><a href="#_label1">Mutex</a></li><li><a href="#_label2">RWMutex</a></li><li><a href="#_label3">WaitGroup</a></li><li><a href="#_label4">Once</a></li><li><a href="#_label5">Cond</a></li><li><a href="#_label6">Map</a></li><li><a href="#_label7">Pool</a></li></ul></div><p class="maodian"><a name="_label0"></a></p><h2>简介</h2><ul><li><code>golang</code>的sync包提供了一些并发控制的工具,在应用程序开发过程中是非常有用的,下面详细介绍下这些工具的原理和使用</li><li>在介绍工具之前,先讲解下内存模型中的<code>happens-before</code>关系</li></ul>
<div class="jb51code"><pre class="brush:go;">// 示例:Happens-Before关系
var a string
var done bool
func setup() {
a = "hello, world"// 写操作A
done = true // 写操作B
}
func main() {
go setup()
for !done { // 读操作C
// 忙等待
}
print(a) // 读操作D
}</pre></div>
<p><strong>关键点</strong>:</p>
<ul><li>如果B happens-before C,那么A happens-before D</li><li>sync包的作用就是建立这种happens-before关系</li></ul>
<p class="maodian"><a name="_label1"></a></p><h2>Mutex</h2>
<ul><li><code>golang</code>提供的锁,常用于保护共享资源的访问安全,使用如下</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
var mu sync.Mutex
mu.Lock()
defer mu.Unlock()
mu.TryLock()
}
</pre></div>
<ul><li><code>mu.Lock()</code>:加锁</li><li><code>mu.Unlock()</code>:解锁</li><li><code>mu.TryLock()</code>:尝试加锁,如果成功会返回<code>true</code></li></ul>
<p>锁结构如下</p>
<div class="jb51code"><pre class="brush:go;">type Mutex struct {
state int32// 锁状态:包含多个标志位
semauint32 // 信号量:用于阻塞goroutine
}
</pre></div>
<ul><li>加锁的时候,会先使用<code>cas</code>原子操作获取锁(CPU实现),如果拿不到说明已经被其他<code>goroutine</code>占有,通过自旋阻塞当前<code>goroutine</code>(这过程会通过PAUSE指令减少CPU功耗)</li></ul>
<div class="jb51code"><pre class="brush:go;">func (m *Mutex) Lock() {
// 快速路径:尝试原子操作获取锁
if atomic.CompareAndSwapInt32(&m.state, 0, mutexLocked) {
return
}
// 慢速路径:锁已被持有,需要等待
m.lockSlow()
}</pre></div>
<ul><li>读写均衡或者写者较多的场景,使用这个锁(防止写者竞争,内存占用也更小)</li></ul>
<p class="maodian"><a name="_label2"></a></p><h2>RWMutex</h2>
<ul><li><code>golang</code>提供的读写锁,和上面的锁区别是区分了读和写两种场景。读之间的冲突不阻塞,若写锁遇到了读锁,需要等待所有读者释放,若读者和写者同时到达,读者要等待写者完成(写优先)</li></ul>
<div class="jb51code"><pre class="brush:go;">func (rw *RWMutex) Lock() {
// 1. 获取互斥锁
rw.w.Lock()
// 2. 设置readerCount为负值,阻止新读者
r := atomic.AddInt32(&rw.readerCount, -rwmutexMaxReaders)
// 3. 等待现有读者
if r != 0 {
runtime_Semacquire(&rw.writerSem)
}
}
func (rw *RWMutex) RLock() {
// 检查是否有写者(readerCount < 0表示有写者)
if atomic.AddInt32(&rw.readerCount, 1) < 0 {
// 有写者在等待或正在写,读者必须阻塞
runtime_Semacquire(&rw.readerSem)
}
}</pre></div>
<ul><li>读多写少的场景使用这个读写锁,性能会有比较大的提升</li></ul>
<p class="maodian"><a name="_label3"></a></p><h2>WaitGroup</h2>
<ul><li><code>waitGroup</code>提供了下面三个方法,用来控制多个<code>goroutine</code>是否都执行完成,主<code>goroutine</code>调用<code>Add</code>方法设置要等待的goroutine数量,每个goroutine结束的时候,调用<code>Done</code>方法标记这个任务结束,<code>Wait</code>方法会等待所有goroutine调用完<code>Done</code>方法</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
var wg sync.WaitGroup
wg.Add(1) // 增加一个等待者
go func() {
defer wg.Done()
// 执行逻辑, 逻辑执行完之后, 等待者数量-1
}()
wg.Wait() // 等待所有等待者执行完成
}</pre></div>
<ul><li>底层实现如下,它使用了一个state来存储等待者和要执行的工作数量</li></ul>
<div class="jb51code"><pre class="brush:go;">type WaitGroup struct {
noCopy noCopy
// 64位值的高32位是计数器,低32位是等待者数量
// 64位原子操作需要64位对齐,但32位编译器不能确保这一点
state atomic.Uint64 // high 32 bits are counter, low 32 bits are waiter count.
}</pre></div>
<div class="jb51code"><pre class="brush:go;">// WaitGroup状态布局(64位)
// 高32位:计数器 (counter)
// 低32位:等待者数量 (waiters)
func (wg *WaitGroup) Add(delta int) {
statep, semap := wg.state()
// 更新计数器
state := atomic.AddUint64(statep, uint64(delta)<<32)
// 检查状态变化
v := int32(state >> 32)// 计数器
w := uint32(state) // 等待者数量
if v > 0 || w == 0 {
return // 还有工作要做,或者没有等待者
}
// 所有工作完成,唤醒等待者
if *statep != 0 {
panic("sync: WaitGroup misuse")
}
// 唤醒所有等待的goroutine
for ; w != 0; w-- {
runtime_Semrelease(semap, false, 0)
}
}</pre></div>
<p>具体</p>
<ol><li>高32位 = 工作计数器<br />wg.Add(5) // 高32位 += 5,表示还有5个工作要做<br />wg.Done() // 高32位 -= 1,表示完成了1个工作</li></ol>
<ul><li>当高32位变为0时,表示所有工作完成</li></ul>
<ol start="2"><li>低32位 = 等待者数量<br />wg.Wait() // 低32位 += 1,表示1个goroutine开始等待</li></ol>
<ul><li>当高32位变为0时,低32位表示需要唤醒多少个goroutine</li></ul>
<p>当调用<code>Wait</code>方法时,也是一个自旋等待的逻辑。配合cas,实现等待所有工作完成。具体细节参考源码,这里只简单介绍下原理</p>
<p class="maodian"><a name="_label4"></a></p><h2>Once</h2>
<ul><li><code>sync.Once{}</code>提供了一个多次调用,只执行一次的方法实现,如下所示</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
once := sync.Once{}
once.Do(func() {
// 这里的逻辑只会执行一次
})
}</pre></div>
<ul><li>这个的底层实现比较简单,因为once内部维护了一个原子类,指向一个32位无符号数,当执行<code>func</code>的时候,如果有多个goroutine并发执行,两个goroutine会竞争锁,最终只有一个能够执行<code>func</code>的逻辑,执行完之后,这个数会设置为1,从而后面的所有goroutine都不会再走到执行逻辑。具体参考下面源码实现</li></ul>
<div class="jb51code"><pre class="brush:go;">func (o *Once) Do(f func()) {
if o.done.Load() == 0 {
o.doSlow(f)
}
}
func (o *Once) doSlow(f func()) {
o.m.Lock()
defer o.m.Unlock()
if o.done.Load() == 0 {
defer o.done.Store(1)
f()
}
}</pre></div>
<p class="maodian"><a name="_label5"></a></p><h2>Cond</h2>
<ul><li>这个工具主要是协调访问共享资源的那些<code>goroutine</code>,<code>c.L</code>是工具内部的一个锁。当调用<code>c.L.Wait()</code>会解锁等待Signal或者Broadcast信号,Broadcast会唤醒所有等待的<code>Wait</code>,<code>Signal</code>会随机唤醒一个等待的<code>Wait</code>。具体使用方法如下面的例子所示</li></ul>
<div class="jb51code"><pre class="brush:go;">func main() {
cond := sync.NewCond(new(sync.Mutex))
done := false
read := func(name string, c *sync.Cond) {
c.L.Lock()
if !done {
c.Wait()
}
fmt.Println("start reading: ", name)
c.L.Unlock()
}
write := func(name string, c *sync.Cond) {
fmt.Println("start writing: ", name)
c.L.Lock()
done = true
c.L.Unlock()
fmt.Println("wakes all:", name)
c.Signal()
}
go read("read3", cond)
go read("read1", cond)
go read("read2", cond)
write("write1", cond)
time.Sleep(5 * time.Second)
}</pre></div>
<p class="maodian"><a name="_label6"></a></p><h2>Map</h2>
<ul><li>golang在sync包内提供了一个并发安全的Map,可以增删改查,遍历、compareAndSwap、LoadOrStore等操作。可以做一些无锁编程,提升性能</li><li>读多写少(读操作占比>80%)、数据量大(键值对数量>1000)、写入不频繁(写入频率<100次/秒)、简单键值等场景,使用这个的性能要优于普通map+锁</li></ul>
<p class="maodian"><a name="_label7"></a></p><h2>Pool</h2>
<ul><li>对于频繁进行内存分配和回收的场景,可以使用<code>sync.Pool{}</code>,例如下面的场景</li></ul>
<div class="jb51code"><pre class="brush:go;">// 频繁分配和回收的典型特征
func frequentAllocation() {
pool := sync.Pool{
New: func() interface{} {
return make([]byte, 1024)
},
}
// 在循环中重复创建和销毁对象
for i := 0; i < 1000; i++ {
// 分配:创建新对象
// buf := make([]byte, 1024)// 直接分配内存
buf := pool.Get().([]byte) // 使用对象池
// 使用对象
copy(buf, data)
process(buf)
// 回收:对象超出作用域,被GC回收
pool.Put(buf[:0]) // 归还对象(清空)
}
// 每次循环都经历:分配→使用→回收
}</pre></div>
頁:
[1]