得力 發表於 2019-8-17 16:10:00

理解Go协程与并发

<h2 id="协程">协程</h2>
<p>Go语言里创建一个协程很简单,使用<code>go</code>关键字就可以让一个普通方法协程化:</p>
<pre><code class="language-go">package main

import (
        "fmt"
        "time"
)

func main(){
        fmt.Println("run in main coroutine.")

        for i:=0; i&lt;10; i++ {
                go func(i int) {
                        fmt.Printf("run in child coroutine %d.\n", i)
                }(i)
        }

        //防止子协程还没有结束主协程就退出了
        time.Sleep(time.Second * 1)
}
</code></pre>
<p>下面这些概念可能不太好理解,需要慢慢理解。可以先跳过,回头再来看。</p>
<p>概念:</p>
<ol>
<li><code>协程</code>可以理解为纯用户态的线程,其通过协作而不是抢占来进行切换。相对于进程或者线程,协程所有的操作都可以在用户态完成,创建和切换的消耗更低。</li>
<li>一个进程内部可以运行多个线程,而每个线程又可以运行很多协程。线程要负责对协程进行调度,保证每个协程都有机会得到执行。当一个协程睡眠时,它要将线程的运行权让给其它的协程来运行,而不能持续霸占这个线程。同一个线程内部最多只会有一个协程正在运行。</li>
<li>协程可以简化为三个状态:<code>运行态</code>、<code>就绪态</code>和<code>休眠态</code>。同一个线程中最多只会存在一个处于运行态的协程。<code>就绪态协程</code>是指那些具备了运行能力但是还没有得到运行机会的协程,它们随时会被调度到运行态;<code>休眠态的协程</code>还不具备运行能力,它们是在等待某些条件的发生,比如 IO 操作的完成、睡眠时间的结束等。</li>
<li>子协程的异常退出会将异常传播到主协程,直接会导致主协程也跟着挂掉。</li>
</ol>
<blockquote>
<p>协程一般用 TCP/HTTP/RPC服务、消息推送系统、聊天系统等。使用协程,我们可以很方便的搭建一个支持高并发的TCP或HTTP服务端。</p>
</blockquote>
<h2 id="通道">通道</h2>
<p>通道的英文是Channels,简称<code>chan</code>。什么时候要用到通道呢?可以先简单的理解为:<code>协程</code>在需要协作通信的时候就需要用通道。</p>
<p>在GO里,不同的并行协程之间交流的方式有两种,一种是通过共享变量,另一种是通过通道。Go 语言鼓励使用通道的形式来交流。</p>
<p>举个简单的例子,我们使用协程实现并发调用远程接口,最终我们需要把每个协程请求回来的数据进行汇总一起返回,这个时候就用到通道了。</p>
<h3 id="创建通道">创建通道</h3>
<p>创建<code>通道</code>(channel)只能使用<code>make</code>函数:</p>
<pre><code class="language-go">c := make(chan int)
</code></pre>
<p><code>通道</code>是区分类型的,如这里的<code>int</code>。</p>
<p>Go 语言为通道的读写设计了特殊的箭头语法糖 <code>&lt;-</code>,让我们使用通道时非常方便。把箭头写在通道变量的右边就是写通道,把箭头写在通道的左边就是读通道。一次只能读写一个元素。</p>
<pre><code class="language-go">c := make(chan bool)
c &lt;- true //写入
&lt;- c //读取
</code></pre>
<h3 id="缓冲通道">缓冲通道</h3>
<p>上面我们介绍了默认的非缓存类型的channel,不过Go也允许指定channel的缓冲大小,很简单,就是channel可以存储多少元素:</p>
<pre><code class="language-go">c := make(chan int, value)
</code></pre>
<p>当 <code>value = 0</code> 时,<code>通道</code>是无缓冲阻塞读写的,等价于<code>make(chan int)</code>;当<code>value &gt; 0</code> 时,<code>通道</code>有缓冲、是非阻塞的,直到写满 <code>value</code> 个元素才阻塞写入。具体说明下:</p>
<p><strong>非缓冲通道</strong><br>
无论是发送操作还是接收操作,一开始执行就会被阻塞,直到配对的操作也开始执行才会继续传递。由此可见,非缓冲通道是在用同步的方式传递数据。也就是说,只有收发双方对接上了,数据才会被传递。数据是直接从发送方复制到接收方的,中间并不会用非缓冲通道做中转。</p>
<p><strong>缓冲通道</strong><br>
缓冲通道可以理解为消息队列,在有容量的时候,发送和接收是不会互相依赖的。用异步的方式传递数据。</p>
<p>下面我们用一个例子来理解一下:</p>
<pre><code class="language-go">package main

import "fmt"

func main() {
        var c = make(chan int, 0)
        var a string

        go func() {
                a = "hello world"
                &lt;-c
        }()

        c &lt;- 0
        fmt.Println(a)
}
</code></pre>
<p>这个例子输出的一定是<code>hello world</code>。但是如果你把通道的容量由0改为大于0的数字,输出结果就不一定是<code>hello world</code>了,很可能是空。为什么?</p>
<p>当通道是无缓冲通道时,执行到<code>c &lt;- 0</code>,通道满了,写操作会被阻塞住,直到执行<code>&lt;-c</code>解除阻塞,后面的语句接着执行。</p>
<p>要是改成非阻塞通道,执行到<code>c &lt;- 0</code>,发现还能写入,主协程就不会阻塞了,但这时候输出的是空字符串还是<code>hello world</code>,取决于是子协程和主协程哪个运行的速度快。</p>
<blockquote>
<p>通道作为容器,它可以像切片一样,使用 <code>cap()</code> 和 <code>len()</code> 全局函数获得通道的容量和当前内部的元素个数。</p>
</blockquote>
<h3 id="模拟消息队列">模拟消息队列</h3>
<p>上一节"协程"的例子里,我们在主协程里加了个<code>time.Sleep()</code>,目的是防止子协程还没有结束主协程就退出了。但是对于实际生活的大多数场景来说,1秒是不够的,并且大部分时候我们都无法预知for循环内代码运行时间的长短。这时候就不能使用<code>time.Sleep()</code> 来完成等待操作了。下面我们用通道来改写:</p>
<pre><code class="language-go">package main

import (
        "fmt"
)

func main() {
        fmt.Println("run in main coroutine.")

        count := 10
        c := make(chan bool, count)

        for i := 0; i &lt; count; i++ {
                go func(i int) {
                        fmt.Printf("run in child coroutine %d.\n", i)
                        c &lt;- true
                }(i)
        }

        for i := 0; i &lt; count; i++ {
                &lt;-c
        }
}
</code></pre>
<h3 id="单向通道">单向通道</h3>
<p>默认的通道是支持读写的,我们可以定义单向通道:</p>
<pre><code class="language-go">//只读
var readOnlyChannel = make(&lt;-chan int)

//只写
var writeOnlyChannel = make(chan&lt;- int)
</code></pre>
<p>下面是一个示例,我们模拟消息队列的消费者、生产者:</p>
<pre><code class="language-go">package main

import (
        "fmt"
        "time"
)

func Producer(c chan&lt;- int) {
        for i := 0; i &lt; 10; i++ {
                c &lt;- i
        }
}

func Consumer1(c &lt;-chan int) {
        for m := range c {
                fmt.Printf("oh, I get luckly num: %v\n", m)
        }
}

func Consumer2(c &lt;-chan int) {
        for m := range c {
                fmt.Printf("oh, I get luckly num too: %v\n", m)
        }
}

func main() {
        c := make(chan int, 2)

        go Consumer1(c)
        go Consumer2(c)

        Producer(c)

        time.Sleep(time.Second)
}
</code></pre>
<p>对于生产者,我们希望通道是只写属性,而对于消费者则是只读属性,这样避免对通道进行错误的操作。当然,如果你将本例里消费者、生产者的通道单向属性去掉也是可以的,没什么问题:</p>
<pre><code class="language-go">func Producer(c chan int) {}
func Consumer1(c chan int) {}
func Consumer2(c chan int) {}
</code></pre>
<blockquote>
<p>事实上 <code>channel</code> 只读或只写都没有意义,所谓的单向 <code>channel</code> 其实只是方法里声明时用,如果后续代码里,向本来用于读<code>channel</code>里写入了数据,编译器会提示错误。</p>
</blockquote>
<h3 id="关闭通道">关闭通道</h3>
<p>读取一个已经关闭的通道会立即返回通道类型的<code>零值</code>,而写一个已经关闭的通道会抛异常。如果通道里的元素是整型的,读操作是不能通过返回值来确定通道是否关闭的。</p>
<p>1、如何安全的读通道,确保不是读取的已关闭通道的<code>零值</code>?<br>
答案是使用<code>for...range</code>语法。当通道为空时,循环会阻塞;当通道关闭,循环会停止。通过循环停止,我们可以认为通道已经关闭。示例:</p>
<pre><code class="language-go">package main

import "fmt"

func main() {
        var c = make(chan int, 3)

        //子协程写
        go func() {
                c &lt;- 1
                close(c)
        }()

        //直接读取通道,存在不知道子协程是否已关闭的情况
        //fmt.Println(&lt;-c)
        //fmt.Println(&lt;-c)

        //主协程读取:使用for...range安全的读取
        for value := range c {
                fmt.Println(value)
        }
}
</code></pre>
<p>输出:</p>
<pre><code>1
</code></pre>
<p>2、如何安全的写通道,确保不会写入已关闭的通道?<br>
Go 语言并不存在一个内置函数可以判断出通道是否已经被关闭。确保通道写安全的最好方式是由负责写通道的协程自己来关闭通道,读通道的协程不要去关闭通道。</p>
<p>但是这个方法只能解决单写多读的场景。如果遇到多写单读的情况就有问题了:无法知道其它写协程什么时候写完,那么也就不能确定什么时候关闭通道。这个时候就得额外使用一个通道专门做这个事情。</p>
<p>我们可以使用内置的 <code>sync.WaitGroup</code>,它使用计数来等待指定事件完成:</p>
<pre><code class="language-go">package main

import (
        "fmt"
        "sync"
        "time"
)

func main() {

        var ch = make(chan int, 8)

        //写协程
        var wg = new(sync.WaitGroup)

        for i := 1; i &lt;= 4; i++ {
                wg.Add(1)
                go func(num int, ch chan int, wg *sync.WaitGroup) {
                        defer wg.Done()
                        ch &lt;- num
                        ch &lt;- num * 10
                }(i, ch, wg)
        }

        //读
        go func(ch chan int) {
                for num := range ch {
                        fmt.Println(num)
                }
        }(ch)

        //Wait阻塞等待所有的写通道协程结束,待计数值变成零,Wait才会返回
        wg.Wait()

        //安全的关闭通道
        close(ch)

        //防止读取通道的协程还没有完毕
        time.Sleep(time.Second)

        fmt.Println("finish")
}
</code></pre>
<p>输出:</p>
<pre><code>
3
30
2
20
1
10
4
40
finish
</code></pre>
<h3 id="多路通道">多路通道</h3>
<p>有时候还会遇到多个生产者,只要有一个生产者就绪,消费者就可以进行消费的情况。这个时候可以使用go语言提供的<code>select</code> 语句,它可以同时管理多个通道读写,<strong>如果所有通道都不能读写,它就整体阻塞,只要有一个通道可以读写,它就会继续</strong>。示例:</p>
<pre><code class="language-go">package main

import (
        "fmt"
        "time"
)

func main() {

        var ch1 = make(chan int)
        var ch2 = make(chan int)

        fmt.Println(time.Now().Format("15:04:05"))

        go func(ch chan int) {
                time.Sleep(time.Second)
                ch &lt;- 1
        }(ch1)

        go func(ch chan int) {
                time.Sleep(time.Second * 2)
                ch &lt;- 2
        }(ch2)

        for {
                select {
                        case v := &lt;-ch1:
                                fmt.Println(time.Now().Format("15:04:05") + ":来自ch1:", v)
                        case v := &lt;-ch2:
                                fmt.Println(time.Now().Format("15:04:05") + ":来自ch2:", v)
                        //default:
                                //fmt.Println("channel is empty !")
                }
        }
}
</code></pre>
<p>输出:</p>
<pre><code>13:39:56
13:39:57:来自ch1: 1
13:39:58:来自ch2: 2
fatal error: all goroutines are asleep - deadlock!
</code></pre>
<p>默认<code>select</code>处于阻塞状态,1s后,子协程1完成写入,主协程读出了数据;接着子协程2完成写入,主协程读出了数据;接着主协程挂掉了,原因是主协程发现在等一个永远不会来的数据,这显然是没有结果的,干脆就直接退出了。</p>
<p>如果把注释的部分打开,那么程序在打印出来自ch1、ch2的数据后,就会一直执行<code>default</code>里面的程序。这个时候程序不会退出。原因是当 <code>select</code> 语句所有通道都不可读写时,如果定义了 <code>default</code> 分支,那就会执行 <code>default</code> 分支逻辑。</p>
<blockquote>
<p>注:<code>select{}</code>代码块是一个没有任何<code>case</code>的<code>select</code>,它会一直阻塞。</p>
</blockquote>
<h3 id="chan的应用场景">Chan的应用场景</h3>
<p>golang中chan的应用场景总结<br>
https://github.com/nange/blog/issues/9</p>
<p>Go语言之Channels实际应用<br>
https://www.s0nnet.com/archives/go-channels-practice</p>
<ul>
<li>消息队列</li>
<li>并发请求</li>
<li>模拟锁的功能</li>
<li>模拟sync.WaitGroup</li>
<li>并行计算</li>
</ul>
<blockquote>
<p>通道原理部分可以根据文末给出的参考链接<code>《快学 Go 语言》第 12 课 —— 通道</code>去查看。</p>
</blockquote>
<h2 id="并发锁">并发锁</h2>
<h3 id="互斥所">互斥所</h3>
<p>go语言里的<code>map</code>是线程不安全的:</p>
<pre><code class="language-go">package main

import "fmt"

func write(d mapstring) {
        d["name"] = "yujc"
}

func read(d mapstring) {
        fmt.Println(d["name"])
}

func main() {
        d := mapstring{}
        go read(d)
        write(d)
}
</code></pre>
<p>Go 语言内置了数据结构<code>竞态检查</code>工具来帮我们检查程序中是否存在线程不安全的代码,只要在运行的时候加上<code>-race</code>参数即可:</p>
<pre><code class="language-bash">$ go run -race main.go
==================
WARNING: DATA RACE
Read at 0x00c0000a8180 by goroutine 6:

...

yujc
Found 2 data race(s)
exit status 66
</code></pre>
<p>可以看出,上面的代码存在安全隐患。</p>
<p>我们可以使用<code>sync.Mutex</code>来保护<code>map</code>,原理是在每次读写操作之前使用互斥锁进行保护,防止其他线程同时操作:</p>
<pre><code class="language-go">package main

import (
        "fmt"
        "sync"
)

type SafeDict struct {
        data mapstring
        mux*sync.Mutex
}

func NewSafeDict(data mapstring) *SafeDict {
        return &amp;SafeDict{
                data: data,
                mux:&amp;sync.Mutex{},
        }
}

func (d *SafeDict) Get(key string) string {
        d.mux.Lock()
        defer d.mux.Unlock()
        return d.data
}

func (d *SafeDict) Set(key string, value string) {
        d.mux.Lock()
        defer d.mux.Unlock()
        d.data = value
}

func main(){
        dict := NewSafeDict(mapstring{})

        go func(dict *SafeDict) {
                fmt.Println(dict.Get("name"))
        }(dict)

        dict.Set("name", "yujc")
}
</code></pre>
<p>运行检测:</p>
<pre><code class="language-bash">$ go run -race main.go
yujc
</code></pre>
<p>上面的代码如果不使用<code>-race</code>运行,不一定会有结果,取决于主协程、子协程哪个先运行。</p>
<blockquote>
<p>注意:<code>sync.Mutex</code> 是一个结构体对象,这个对象在使用的过程中要避免被浅拷贝,否则起不到保护作用。应尽量使用它的指针类型。</p>
</blockquote>
<p>上面的代码里我们多处使用了<code>d.mux.Lock()</code>,能否简化成<code>d.Lock()</code>呢?答案是可以的。我们知道,结构体可以自动继承匿名内部结构体的所有方法:</p>
<pre><code class="language-go">type SafeDict struct {
        data mapstring
        *sync.Mutex
}

func NewSafeDict(data mapstring) *SafeDict {
        return &amp;SafeDict{data, &amp;sync.Mutex{}}
}

func (d *SafeDict) Get(key string) string {
        d.Lock()
        defer d.Unlock()
        return d.data
}
</code></pre>
<p>这样就完成了简化。</p>
<h3 id="读写锁">读写锁</h3>
<p>对于读多写少的场景,可以使用<code>读写锁</code>代替<code>互斥锁</code>,可以提高性能。</p>
<p>读写锁提供了下面4个方法:</p>
<ul>
<li><code>Lock()</code> 写加锁</li>
<li><code>Unlock()</code> 写释放锁</li>
<li><code>RLock()</code> 读加锁</li>
<li><code>RUnlock()</code> 读释放锁</li>
</ul>
<p><code>写锁</code>是<code>排它锁</code>,加<code>写锁</code>时会阻塞其它协程再加<code>读锁</code>和<code>写锁</code>;<code>读锁</code>是<code>共享锁</code>,加读锁还可以允许其它协程再加<code>读锁</code>,但是会阻塞加<code>写锁</code>。<code>读写锁</code>在写并发高的情况下性能退化为普通的<code>互斥锁</code>。</p>
<p>我们把上节中的互斥锁换成读写锁:</p>
<pre><code class="language-go">package main

import (
        "fmt"
        "sync"
)

type SafeDict struct {
        data mapstring
        *sync.RWMutex
}

func NewSafeDict(data mapstring) *SafeDict {
        return &amp;SafeDict{data, &amp;sync.RWMutex{}}
}

func (d *SafeDict) Get(key string) string {
        d.RLock()
        defer d.RUnlock()
        return d.data
}

func (d *SafeDict) Set(key string, value string) {
        d.Lock()
        defer d.Unlock()
        d.data = value
}

func main(){
        dict := NewSafeDict(mapstring{})

        go func(dict *SafeDict) {
                fmt.Println(dict.Get("name"))
        }(dict)

        dict.Set("name", "yujc")
}
</code></pre>
<p>改完后,使用竞态检测工具检测还是能通过的。</p>
<h2 id="参考">参考</h2>
<p>1、make(chan int) 和 make(chan int, 1) 的区别<br>
https://www.jianshu.com/p/f12e1766c19f<br>
2、channel<br>
https://www.jianshu.com/p/4d97dc032730<br>
3、《快学 Go 语言》第 12 课 —— 通道<br>
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ<mark>&amp;mid=2247484601&amp;idx=1&amp;sn=97c0de2acc3127c9e913b6338fa65737<br>
4、《快学 Go 语言》第 13 课 —— 并发与安全<br>
https://mp.weixin.qq.com/s?__biz=MzI0MzQyMTYzOQ</mark>&amp;mid=2247484683&amp;idx=1&amp;sn=966cb818f034ffd4538eae7a61cd0c58</p>


</div>
<div id="MySignature" role="contentinfo">
    (本文完)
<br><br>


欢迎关注公众号"飞鸿影记(fhyblog)",探寻物件背后的逻辑,记录生活真实的影子。
<div style="text-align: center"><span style="color: #808000"><img src="https://images2015.cnblogs.com/blog/663847/201703/663847-20170331215930477-1406562582.jpg" alt="" width="250" height="250"></span></div>


<div>
<p><strong>作者:飞鸿影</strong></p>
<p><strong>出处:http://52fhy.cnblogs.com/</strong></p>
</div>

<hr>

<p style="border: silver 1px dashed; padding: 8px 5px; font-weight: bold">版权申明:没有标明转载或特殊申明均为作者原创。本文采用以下协议进行授权,自由转载 - 非商用 - 非衍生 - 保持署名 | Creative Commons BY-NC-ND 3.0,转载请注明作者及出处。</p>

<hr>


<div style="text-align: center;display:none"><span style="color: #808000"><img src="https://images2018.cnblogs.com/blog/663847/201805/663847-20180515232241445-706573186.png" alt="" width="250" height="250"></span></div><br><br>
来源:https://www.cnblogs.com/52fhy/p/11369028.html
頁: [1]
查看完整版本: 理解Go协程与并发