竹竿划大船 發表於 2020-2-28 19:36:00

go 一步步实现Goroutine Pool

<p>&nbsp;</p>
<h1>&nbsp;Goroutine Pool架构</h1>
<p>超大规模并发的场景下,不加限制的大规模的goroutine可能造成内存暴涨,给机器带来极大的压力,吞吐量下降和处理速度变慢。</p>
<p>而实现一个Goroutine Pool,复用goroutine,减轻runtime的调度压力以及缓解内存压力,依托这些优化,在大规模goroutine并发的场景下可以极大地提高并发性能。</p>
<p><img src="https://img2018.cnblogs.com/i-beta/1477786/202002/1477786-20200228185941352-1984989989.jpg" alt=""></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<h1>Pool类型</h1>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">type Pool struct {
        // capacity of the pool.
        //capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
        //worker类型为任务类。
        capacity int32
        // running is the number of the currently running goroutines.
        //running是当前正在执行任务的worker数量
        running int32
        // expiryDuration set the expired time (second) of every worker.
        //expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
        expiryDuration time.Duration
        // workers is a slice that store the available workers.
        //任务队列
        workers []*Worker
        // release is used to notice the pool to closed itself.
        //当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
        release chan sig
        // lock for synchronous operation
        //用以支持Pool的同步操作
        lock sync.Mutex
        //once用在确保Pool关闭操作只会执行一次
        once sync.Once
}</pre>
</div>
<h3>初始化Pool</h3>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
        if size &lt;= 0 {
                return nil, errors.New("Pool Size &lt;0,not Create")
        }
        p := &amp;Pool{
                capacity:       int32(size),
                release:      make(chan sig, 1),
                expiryDuration: time.Duration(expiry) * time.Second,
                running:                0,
        }
        // 启动定期清理过期worker任务,独立goroutine运行,
        // 进一步节省系统资源
        p.monitorAndClear()
        return p, nil
}</pre>
</div>
<h3>获取Worker</h3>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
        var w *Worker
        // 标志,表示当前运行的worker数量是否已达容量上限
        waiting := false
        // 涉及从workers队列取可用worker,需要加锁
        p.lock.Lock()
        workers := p.workers
        n := len(workers) - 1
        fmt.Println("空闲worker数量:",n+1)
        fmt.Println("协程池现在运行的worker数量:",p.running)
        // 当前worker队列为空(无空闲worker)
        if n &lt; 0 {
                //没有空闲的worker有两种可能:
                //1.运行的worker超出了pool容量
                //2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
                // 运行worker数目已达到该Pool的容量上限,置等待标志
                if p.running &gt;= p.capacity {
                        //print("超过上限")
                        waiting = true
                } else {
                        // 当前无空闲worker但是Pool还没有满,
                        // 则可以直接新开一个worker执行任务
                        p.running++
                        w = &amp;Worker{
                                pool: p,
                                task: make(chan functinType),
                                str:make(chan string),
                        }
                }
                // 有空闲worker,从队列尾部取出一个使用
        } else {
                //&lt;-p.freeSignal
                w = workers
                workers = nil
                p.workers = workers[:n]
                p.running++
        }
        // 判断是否有worker可用结束,解锁
        p.lock.Unlock()
        if waiting {
                //当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
                // 阻塞等待直到有空闲worker
                for len(p.workers) == 0{
                        continue
                }
                p.lock.Lock()
                workers = p.workers
                l := len(workers) - 1
                w = workers
                workers = nil
                p.workers = workers[:l]
                p.running++
                p.lock.Unlock()
        }
        return w
}</pre>
</div>
<h3>定期清理过期Worker</h3>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">func (p *Pool) monitorAndClear() {
        go func() {
                for {
                        // 周期性循环检查过期worker并清理
                        time.Sleep(p.expiryDuration)
                        currentTime := time.Now()
                        p.lock.Lock()
                        idleWorkers := p.workers
                        n := 0
                        for i, w := range idleWorkers {
                                // 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
                                if currentTime.Sub(w.recycleTime) &lt;= p.expiryDuration {
                                        break
                                }
                                n = i
                                w.stop()
                                idleWorkers = nil
                        }
                        if n &gt; 0 {
                                n++
                                p.workers = idleWorkers
                        }
                        p.lock.Unlock()
                }
        }()
}</pre>
</div>
<h3>复用Worker</h3>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
        // 写入回收时间,亦即该worker的最后运行时间
        worker.recycleTime = time.Now()
        p.lock.Lock()
        p.running --
        p.workers = append(p.workers, worker)
        p.lock.Unlock()

}</pre>
</div>
<h3>动态扩容或者缩小容量</h3>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
        cap := int(p.capacity)
        if size &lt;cap{
                diff := cap - size
                for i := 0; i &lt; diff; i++ {
                        p.getWorker().stop()
                }
        } else if size == cap {
                return
        }
        atomic.StoreInt32(&amp;p.capacity, int32(size))
} </pre>
</div>
<h3>提交Worker</h3>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
        if len(p.release) &gt; 0 {
                return errors.New("Pool is Close")
        }
        //创建或得到一个空闲的worker
        w := p.getWorker()
        w.run()
        //将任务参数通过信道传递给它
        w.sendarg(str)
        //将任务通过信道传递给它
        w.sendTask(task)
        return nil
}
</pre>
</div>
<p>  </p>
<h1>Worker类</h1>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">package Poolpkg

import (
        "sync/atomic"
        "time"
)

type functinType func(string) error


// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
        // pool who owns this worker.
        pool *Pool
        // task is a job should be done.
        task chan functinType
        // recycleTime will be update when putting a worker back into queue.
        recycleTime time.Time

        str chan string
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {

        go func() {
                //监听任务列表,一旦有任务立马取出运行
                count := 1
                var str string
                var f functinType
                for count &lt;=2{
                        select {
                        case str_temp, ok := &lt;- w.str:
                                if !ok {
                                        return
                                }
                                count ++
                                str = str_temp
                        case f_temp, ok := &lt;-w.task:
                                if !ok {
                                        //如果接收到关闭
                                        atomic.AddInt32(&amp;w.pool.running, -1)
                                        close(w.task)
                                        return
                                }
                                count++
                                f = f_temp
                        }
                }
                err := f(str)
                if err != nil{
                        //fmt.Println("执行任务失败")
                }
                //回收复用
                w.pool.putWorker(w)
                return
        }()
}

// stop this worker.
func (w *Worker) stop() {
        w.sendTask(nil)
        close(w.str)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
        w.task &lt;- task
}

func (w *Worker) sendarg(str string) {
        w.str &lt;- str
}
</pre>
</div>
<p>  </p>
<h1>总结和实践</h1>
<h3>怎么理解Woreker,task、Pool的关系</h3>
<p><strong>Woker类型其实就是task的载体,Worker类型有两个很重要的参数:</strong></p>
<pre>task chan functinType:用来是传递task。</pre>
<pre>str chan string:用来传递task所需的参数。<br><br></pre>
<p><strong>task是任务本身,它一般为一个函数,在程序中被定义为函数类型:</strong></p>
<pre class="brush:go;gutter:true;">type functinType func(string) error</pre>
<p><strong>Pool存储Worker,当用户要执行一个task时,首先要得到一个Worker,必须从池中获取,获取到一个Worker后,就开启一个协程去处理,在这个协程中接收任务task和参数。</strong></p>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">//创建或得到一个空闲的worker
w := p.getWorker()<br>//开协程去处理
w.run()
//将任务参数通过信道传递给它
w.sendarg(str)
//将任务通过信道传递给它
w.sendTask(task)</pre>
</div>
<h3>Worker怎么接收task和参数</h3>
<pre class="brush:go;gutter:true;">count定义接收数据的个数,一个Woker必须接收到task和参数才能开始工作。<br>工作完后这个Worker被返回到Pool中,下次还可以复用这个Worker,也就是复用Worker这个实例。</pre>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">go func() {
                //监听任务列表,一旦有任务立马取出运行
                count := 1
                var str string
                var f functinType
                for count &lt;=2{
                        select {
                        case str_temp, ok := &lt;- w.str:
                                if !ok {
                                        return
                                }
                                count ++
                                str = str_temp
                        case f_temp, ok := &lt;-w.task:
                                if !ok {
                                        //如果接收到关闭
                                        atomic.AddInt32(&amp;w.pool.running, -1)
                                        close(w.task)
                                        return
                                }
                                count++
                                f = f_temp
                        }
                }
                err := f(str)
                if err != nil{
                        //fmt.Println("执行任务失败")
                }
                //回收复用
                w.pool.putWorker(w)
                return
        }()</pre>
</div>
<h3>Pool怎么处理用户提交task获取Worker的请求</h3>
<p>1.先得到Pool池中空闲Worker的数量,然后判断</p>
<p>2.如果小于零,则表示池中没有空闲的Worker,这里有两种原因:</p>
<ul>
<li>1.运行的Worker数量超过了Pool容量,当用户获取Worker的请求数量激增,池中大多数Worker都是执行完任务的Worker重新添加到池中的,返回的Worker跟不上激增的需求。</li>
<li>2.当前是空pool,从未往pool添加任务或者一段时间内没有Worker任务运行,被定期清除。</li>
</ul>
<p>3.如果大于或者等于零,有空闲的Worker直接从池中获取最后一个Worker。</p>
<p>4.如果是第二种的第一种情况,则阻塞等待池中有空闲的Worker。</p>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">if waiting {
                //当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
                // 阻塞等待直到有空闲worker
                for len(p.workers) == 0{
                        continue
                }
                p.lock.Lock()
                workers = p.workers
                l := len(workers) - 1
                w = workers
                workers = nil
                p.workers = workers[:l]
                p.running++
                p.lock.Unlock()
        }
</pre>
</div>
<p>5.如果是第二种的第二种情况,直接创建一个Worker实例。</p>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">// 当前无空闲worker但是Pool还没有满,
// 则可以直接新开一个worker执行任务
p.running++
w = &amp;Worker{
        pool: p,
        task: make(chan functinType),
        str:make(chan string),
}</pre>
</div>
<h3>测试</h3>
<div class="cnblogs_Highlighter">
<pre class="brush:go;gutter:true;">package main

import (
        "Pool/Poolpkg"
        "fmt"
)

func main(){<br>     //开20个大小的Pool池,过期清除时间5分钟
        Pool,err := Poolpkg.NewPool(20,5)
        i :=0
        for i &lt; 50 {
                err = Pool.Submit(Print_Test1,"并发测试!")
                if err != nil{
                        fmt.Println(err)
                }
                i++
        }
}</pre>
</div>
<p><img src="https://img2018.cnblogs.com/i-beta/1477786/202002/1477786-20200228193948158-1526108562.jpg" alt=""></p>
<p>&nbsp;</p>
<p><img src="https://img2018.cnblogs.com/i-beta/1477786/202002/1477786-20200228193955284-1026985043.jpg" alt=""></p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<p>&nbsp;</p>
<h3>源码</h3>
<p><strong>Pool</strong></p>
<div class="cnblogs_Highlighter">
<pre class="brush:go;collapse:true;;gutter:true;">package Poolpkg

import (
        "errors"
        "fmt"
        "sync"
        "sync/atomic"
        "time"
)

type sig struct{}



// Pool accept the tasks from client,it limits the total
// of goroutines to a given number by recycling goroutines.
type Pool struct {
        // capacity of the pool.
        //capacity是该Pool的容量,也就是开启worker数量的上限,每一个worker需要一个goroutine去执行;
        //worker类型为任务类。
        capacity int32
        // running is the number of the currently running goroutines.
        //running是当前正在执行任务的worker数量
        running int32
        // expiryDuration set the expired time (second) of every worker.
        //expiryDuration是worker的过期时长,在空闲队列中的worker的最新一次运行时间与当前时间之差如果大于这个值则表示已过期,定时清理任务会清理掉这个worker;
        expiryDuration time.Duration
        // workers is a slice that store the available workers.
        //任务队列
        workers []*Worker
        // release is used to notice the pool to closed itself.
        //当关闭该Pool支持通知所有worker退出运行以防goroutine泄露
        release chan sig
        // lock for synchronous operation
        //用以支持Pool的同步操作
        lock sync.Mutex
        //once用在确保Pool关闭操作只会执行一次
        once sync.Once
}

// NewPool generates a instance of ants pool
func NewPool(size, expiry int) (*Pool, error) {
        if size &lt;= 0 {
                return nil, errors.New("Pool Size &lt;0,not Create")
        }
        p := &amp;Pool{
                capacity:       int32(size),
                release:      make(chan sig, 1),
                expiryDuration: time.Duration(expiry) * time.Second,
                running:                0,
        }
        // 启动定期清理过期worker任务,独立goroutine运行,
        // 进一步节省系统资源
        p.monitorAndClear()
        return p, nil
}

// Submit submit a task to pool
func (p *Pool) Submit(task functinType,str string) error {
        if len(p.release) &gt; 0 {
                return errors.New("Pool is Close")
        }
        //创建或得到一个空闲的worker
        w := p.getWorker()
        w.run()
        //将任务参数通过信道传递给它
        w.sendarg(str)
        //将任务通过信道传递给它
        w.sendTask(task)
        return nil
}

// getWorker returns a available worker to run the tasks.
func (p *Pool) getWorker() *Worker {
        var w *Worker
        // 标志,表示当前运行的worker数量是否已达容量上限
        waiting := false
        // 涉及从workers队列取可用worker,需要加锁
        p.lock.Lock()
        workers := p.workers
        n := len(workers) - 1
        fmt.Println("空闲worker数量:",n+1)
        fmt.Println("协程池现在运行的worker数量:",p.running)
        // 当前worker队列为空(无空闲worker)
        if n &lt; 0 {
                //没有空闲的worker有两种可能:
                //1.运行的worker超出了pool容量
                //2.当前是空pool,从未往pool添加任务或者一段时间内没有任务添加,被定期清除
                // 运行worker数目已达到该Pool的容量上限,置等待标志
                if p.running &gt;= p.capacity {
                        //print("超过上限")
                        waiting = true
                } else {
                        // 当前无空闲worker但是Pool还没有满,
                        // 则可以直接新开一个worker执行任务
                        p.running++
                        w = &amp;Worker{
                                pool: p,
                                task: make(chan functinType),
                                str:make(chan string),
                        }
                }
                // 有空闲worker,从队列尾部取出一个使用
        } else {
                //&lt;-p.freeSignal
                w = workers
                workers = nil
                p.workers = workers[:n]
                p.running++
        }
        // 判断是否有worker可用结束,解锁
        p.lock.Unlock()
        if waiting {
                //当一个任务执行完以后会添加到池中,有了空闲的任务就可以继续执行:
                // 阻塞等待直到有空闲worker
                for len(p.workers) == 0{
                        continue
                }
                p.lock.Lock()
                workers = p.workers
                l := len(workers) - 1
                w = workers
                workers = nil
                p.workers = workers[:l]
                p.running++
                p.lock.Unlock()
        }
        return w
}

//定期清理过期Worker
func (p *Pool) monitorAndClear() {
        go func() {
                for {
                        // 周期性循环检查过期worker并清理
                        time.Sleep(p.expiryDuration)
                        currentTime := time.Now()
                        p.lock.Lock()
                        idleWorkers := p.workers
                        n := 0
                        for i, w := range idleWorkers {
                                // 计算当前时间减去该worker的最后运行时间之差是否符合过期时长
                                if currentTime.Sub(w.recycleTime) &lt;= p.expiryDuration {
                                        break
                                }
                                n = i
                                w.stop()
                                idleWorkers = nil
                                p.running--
                        }
                        if n &gt; 0 {
                                n++
                                p.workers = idleWorkers
                        }
                        p.lock.Unlock()
                }
        }()
}

//Worker回收(goroutine复用)
// putWorker puts a worker back into free pool, recycling the goroutines.
func (p *Pool) putWorker(worker *Worker) {
        // 写入回收时间,亦即该worker的最后运行时间
        worker.recycleTime = time.Now()
        p.lock.Lock()
        p.running --
        p.workers = append(p.workers, worker)
        p.lock.Unlock()

}

//动态扩容或者缩小池容量
// ReSize change the capacity of this pool
func (p *Pool) ReSize(size int) {
        cap := int(p.capacity)
        if size &lt;cap{
                diff := cap - size
                for i := 0; i &lt; diff; i++ {
                        p.getWorker().stop()
                }
        } else if size == cap {
                return
        }
        atomic.StoreInt32(&amp;p.capacity, int32(size))
}
</pre>
</div>
<p><strong>Woker</strong></p>
<div class="cnblogs_Highlighter">
<pre class="brush:go;collapse:true;;gutter:true;">package Poolpkg

import (
        "sync/atomic"
        "time"
)

type functinType func(string) error


// Worker is the actual executor who runs the tasks,
// it starts a goroutine that accepts tasks and
// performs function calls.
type Worker struct {
        // pool who owns this worker.
        pool *Pool
        // task is a job should be done.
        task chan functinType
        // recycleTime will be update when putting a worker back into queue.
        recycleTime time.Time

        str chan string
}

// run starts a goroutine to repeat the process
// that performs the function calls.
func (w *Worker) run() {

        go func() {
                //监听任务列表,一旦有任务立马取出运行
                count := 1
                var str string
                var f functinType
                for count &lt;=2{
                        select {
                        case str_temp, ok := &lt;- w.str:
                                if !ok {
                                        return
                                }
                                count ++
                                str = str_temp
                        case f_temp, ok := &lt;-w.task:
                                if !ok {
                                        //如果接收到关闭
                                        atomic.AddInt32(&amp;w.pool.running, -1)
                                        close(w.task)
                                        return
                                }
                                count++
                                f = f_temp
                        }
                }
                err := f(str)
                if err != nil{
                        //fmt.Println("执行任务失败")
                }
                //回收复用
                w.pool.putWorker(w)
                return
        }()
}

// stop this worker.
func (w *Worker) stop() {
        w.sendTask(nil)
        close(w.str)
}

// sendTask sends a task to this worker.
func (w *Worker) sendTask(task functinType) {
        w.task &lt;- task
}

func (w *Worker) sendarg(str string) {
        w.str &lt;- str
}
</pre>
</div>
<p> </p>
<p>&nbsp;</p><br><br>
来源:https://www.cnblogs.com/-wenli/p/12378699.html
頁: [1]
查看完整版本: go 一步步实现Goroutine Pool