伺机待动 發表於 2025-11-14 09:29:31

Go 语言中控制协程数量的常用方法小结

<div id="navCategory"><h5 class="catalogue">目录</h5><ul class="first_class_ul"><li><a href="#_label0">一、使用带缓冲的通道控制</a></li><li><a href="#_label1">二、使用 sync.WaitGroup 配合通道</a></li><li><a href="#_label2">三、使用工作池(Worker Pool)模式</a></li><li><a href="#_label3">四、使用第三方库</a></li></ul></div><p>在 Go 语言中,协程(goroutine)是轻量级的执行单元,虽然开销小,但无限制地创建协程仍然会消耗大量系统资源,甚至导致程序崩溃。因此,合理控制协程数量是编写高效 Go 程序的关键。本文将介绍几种常用的协程数量控制方法,并结合具体案例说明其用法。</p>
<p class="maodian"><a name="_label0"></a></p><h2>一、使用带缓冲的通道控制</h2>
<p>带缓冲的通道可以作为一个简易的信号量(Semaphore),通过控制通道的容量来限制同时运行的协程数量。</p>
<p>基本原理:</p>
<ul><li>创建一个指定容量的通道</li><li>启动协程前先向通道发送信号(获取令牌)</li><li>协程结束后从通道接收信号(释放令牌)</li><li>当通道已满时,新的协程需要等待直到有令牌释放</li></ul>
<p>案例代码:</p>
<div class="jb51code"><pre class="brush:go;">package main

import (
        "fmt"
        "time"
)

func worker(id int, sem chan struct{}) {
        defer func() { &lt;-sem }() // 释放令牌
        fmt.Printf("Worker %d 开始工作\n", id)
        time.Sleep(time.Second) // 模拟工作
        fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
        const maxGoroutines = 3 // 最大协程数量
        sem := make(chan struct{}, maxGoroutines)
        totalTasks := 10 // 总任务数

        for i := 0; i &lt; totalTasks; i++ {
                sem &lt;- struct{}{} // 获取令牌,若满则等待
                go worker(i, sem)
        }

        // 等待所有令牌被释放(所有协程完成)
        for i := 0; i &lt; cap(sem); i++ {
                sem &lt;- struct{}{}
        }
        fmt.Println("所有任务完成")
}
</pre></div>
<p class="maodian"><a name="_label1"></a></p><h2>二、使用 sync.WaitGroup 配合通道</h2>
<p><code>sync.WaitGroup</code> 用于等待一组协程完成,结合通道可以更灵活地控制协程数量。</p>
<p>案例代码:</p>
<div class="jb51code"><pre class="brush:go;">package main

import (
        "fmt"
        "sync"
        "time"
)

func worker(id int, wg *sync.WaitGroup) {
        defer wg.Done()
        fmt.Printf("Worker %d 开始工作\n", id)
        time.Sleep(time.Second)
        fmt.Printf("Worker %d 完成工作\n", id)
}

func main() {
        const maxGoroutines = 3
        sem := make(chan struct{}, maxGoroutines)
        var wg sync.WaitGroup
        totalTasks := 10

        for i := 0; i &lt; totalTasks; i++ {
                sem &lt;- struct{}{}
                wg.Add(1)
                go func(id int) {
                        defer func() { &lt;-sem }()
                        worker(id, &amp;wg)
                }(i)
        }

        wg.Wait() // 等待所有任务完成
        fmt.Println("所有任务完成")
}
</pre></div>
<p class="maodian"><a name="_label2"></a></p><h2>三、使用工作池(Worker Pool)模式</h2>
<p>工作池模式创建固定数量的工作协程,从任务队列中获取任务执行,适用于任务数量多且可批量处理的场景。</p>
<p>案例代码:</p>
<div class="jb51code"><pre class="brush:go;">package main

import (
        "fmt"
        "sync"
        "time"
)

func worker(id int, jobs &lt;-chan int, results chan&lt;- int, wg *sync.WaitGroup) {
        defer wg.Done()
        for job := range jobs {
                fmt.Printf("Worker %d 处理任务 %d\n", id, job)
                time.Sleep(time.Second) // 模拟处理时间
                results &lt;- job * 2      // 模拟处理结果
        }
}

func main() {
        const (
                numWorkers = 3    // 工作协程数量
                numJobs    = 10   // 任务数量
        )

        jobs := make(chan int, numJobs)
        results := make(chan int, numJobs)
        var wg sync.WaitGroup

        // 启动工作协程
        wg.Add(numWorkers)
        for w := 1; w &lt;= numWorkers; w++ {
                go worker(w, jobs, results, &amp;wg)
        }

        // 发送任务
        go func() {
                for j := 1; j &lt;= numJobs; j++ {
                        jobs &lt;- j
                }
                close(jobs) // 所有任务发送完毕,关闭通道
        }()

        // 等待所有工作协程完成
        go func() {
                wg.Wait()
                close(results) // 所有结果处理完毕,关闭通道
        }()

        // 收集结果
        for result := range results {
                fmt.Printf("收到结果: %d\n", result)
        }

        fmt.Println("所有任务完成")
}
</pre></div>
<p class="maodian"><a name="_label3"></a></p><h2>四、使用第三方库</h2>
<p>对于复杂场景,可以使用成熟的第三方库,如 <code>golang.org/x/sync/errgroup</code> 或 <code>github.com/panjf2000/ants</code>(高性能协程池)。</p>
<p>使用 errgroup 的案例:</p>
<div class="jb51code"><pre class="brush:go;">package main

import (
        "context"
        "fmt"
        "golang.org/x/sync/errgroup"
        "time"
)

func worker(id int) error {
        fmt.Printf("Worker %d 开始工作\n", id)
        time.Sleep(time.Second)
        fmt.Printf("Worker %d 完成工作\n", id)
        return nil
}

func main() {
        const maxGoroutines = 3
        g, ctx := errgroup.WithContext(context.Background())
        g.SetLimit(maxGoroutines) // 设置最大并发数
        totalTasks := 10

        for i := 0; i &lt; totalTasks; i++ {
                id := i
                g.Go(func() error {
                        select {
                        case &lt;-ctx.Done():
                                return ctx.Err()
                        default:
                                return worker(id)
                        }
                })
        }

        if err := g.Wait(); err != nil {
                fmt.Printf("发生错误: %v\n", err)
        } else {
                fmt.Println("所有任务完成")
        }
}
</pre></div>
頁: [1]
查看完整版本: Go 语言中控制协程数量的常用方法小结