Go处理每分钟100万个请求
<h3 id="原文">引用原文</h3><blockquote>
<p>原文链接 http://marcio.io/2015/07/handling-1-million-requests-per-minute-with-golang/</p>
</blockquote>
<h3 id="问题描述">问题描述</h3>
<p>直入本文要描述的问题:网站流量上来了,高并发负载是不可避免滴问题了,当服务端需要处理大量耗时的任务时,我们一般都会考虑将耗时任务异步处理。那么如果使用Go如何实现?</p>
<p><span>传统上,我们会考虑使用以下方法创建工作者层架构:</span></p>
<ul>
<li>Resque(队列,比如redis resque)</li>
<li>DelayedJob(延迟任务,比如go defer)</li>
<li>Elasticbeanstalk Worker Tier</li>
<li>RabbitMQ(消息队列)</li>
</ul>
<h3 id="简单粗暴法">简单惯用法</h3>
<p>golang的异步处理之携程:go func()可以带来了很大的方便,虽然协程相对于线程占用的系统资源更少,但这并不代表我们可以无休止的创建协程。</p>
<p>不停创建协程也有压垮系统的风险。然而绝大多数的时候,我们不能简单粗暴的创建协程来处理异步任务,原因是不可控。下面我们引用原作者的demo,一个执行耗时任务的handler。</p>
<p>代码我们只用看大致的实现流程原理,实现细节暂且不论。</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">package main
import (
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">bytes</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">encoding/json</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">fmt</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">io</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">net/http</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)
type PayloadCollection </span><span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
WindowsVersion</span><span style="color: rgba(0, 0, 255, 1)">string</span> `json:<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">version</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">`
Token </span><span style="color: rgba(0, 0, 255, 1)">string</span> `json:<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">token</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">`
Payloads []Payload `json:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">data</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">`
}
type Payload </span><span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> </span>
<span style="color: rgba(0, 0, 0, 1)">}
func (p </span>*<span style="color: rgba(0, 0, 0, 1)">Payload) UploadToS3() error {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> the storageFolder method ensures that there are no name collision in
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> case we get same timestamp in the key name</span>
storage_path := fmt.Sprintf(<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">%v/%v</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, p.storageFolder, time.Now().UnixNano())
bucket :</span>=<span style="color: rgba(0, 0, 0, 1)"> S3Bucket
b :</span>= <span style="color: rgba(0, 0, 255, 1)">new</span><span style="color: rgba(0, 0, 0, 1)">(bytes.Buffer)
encodeErr :</span>=<span style="color: rgba(0, 0, 0, 1)"> json.NewEncoder(b).Encode(payload)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> encodeErr !=<span style="color: rgba(0, 0, 0, 1)"> nil {
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> encodeErr
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Everything we post to the S3 bucket should be marked 'private'</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> acl =<span style="color: rgba(0, 0, 0, 1)"> s3.Private
</span><span style="color: rgba(0, 0, 255, 1)">var</span> contentType = <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">application/octet-stream</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> bucket.PutReader(storage_path, b, int64(b.Len()), contentType, acl, s3.Options{})
}
func payloadHandler(w http.ResponseWriter, r </span>*<span style="color: rgba(0, 0, 0, 1)">http.Request) {
</span><span style="color: rgba(0, 0, 255, 1)">if</span> r.Method != <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">POST</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)"> {
w.WriteHeader(http.StatusMethodNotAllowed)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Read the body into a string for json decoding</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> content = &<span style="color: rgba(0, 0, 0, 1)">PayloadCollection{}
err :</span>= json.NewDecoder(io.LimitReader(r.Body, MaxLength)).Decode(&<span style="color: rgba(0, 0, 0, 1)">content)
</span><span style="color: rgba(0, 0, 255, 1)">if</span> err !=<span style="color: rgba(0, 0, 0, 1)"> nil {
w.Header().Set(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">Content-Type</span><span style="color: rgba(128, 0, 0, 1)">"</span>, <span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">application/json; charset=UTF-8</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
w.WriteHeader(http.StatusBadRequest)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> Go through each payload and queue items individually to be posted to S3</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> _, payload :=<span style="color: rgba(0, 0, 0, 1)"> range content.Payloads {
go payload.UploadToS3() </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> <----- DON'T DO THIS</span>
<span style="color: rgba(0, 0, 0, 1)"> }
w.WriteHeader(http.StatusOK)
}</span></pre>
</div>
<div>
<div>
<p>对于适量的负载,这个方案应该没有问题。但是负载增加以后这个方法就不能很好地工作。当我们把这个版本部署到生产环境中后,如果我们遇到了比预期大一个数量级的请求量。</p>
<p>那么这个方法就有些不尽如人意了。它无法控制创建goroutine的数量。因为我们每分钟收到了一百万个POST请求,上面的代码很快就奔溃了。</p>
<p>这就是我们遇到的第一个问题,简单粗暴起协程处理耗时任务导致的系统<strong>不可控性</strong>。我们自然而然就会想,怎么能让系统更可控呢?</p>
<h3 id="优雅的方法">优雅的方法</h3>
<p>创建带缓冲的channel。这样我们可以把工作任务放到队列里然后再上传到S3。因为可以控制队列的长度并且有充足的内存,我觉得把工作任务缓存在channel队列里应该没有问题。</p>
<p>所以一个很自然的思路那就是:建立<strong>任务队列</strong>。golang提供了线程安全的任务队列实现方式:带缓冲的channal。但是这样只是延后了请求的爆发。</p>
<p>作者意识到,要解决这一问题,必须控制协程的数量。如何控制协程的数量?Job/Worker模式!这里我将作者的代码修改了一下,单文件可执行,以记录并理解这一模式。</p>
</div>
</div>
<p> </p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">package main
import (
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">fmt</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reflect</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)
</span><span style="color: rgba(0, 0, 255, 1)">var</span><span style="color: rgba(0, 0, 0, 1)"> (
MaxWorker </span>= <span style="color: rgba(128, 0, 128, 1)">10</span><span style="color: rgba(0, 0, 0, 1)">
)
type Payload </span><span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
Num </span><span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">待执行的工作</span>
type Job <span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
Payload Payload
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">任务channal</span>
<span style="color: rgba(0, 0, 255, 1)">var</span><span style="color: rgba(0, 0, 0, 1)"> JobQueue chan Job
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">执行任务的工作者单元</span>
type Worker <span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
WorkerPool chan chan Job </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者池--每个元素是一个工作者的私有任务channal</span>
JobChannel chan Job <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">每个工作者单元包含一个任务管道 用于获取任务</span>
quit chan <span style="color: rgba(0, 0, 255, 1)">bool</span> <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">退出信号</span>
no <span style="color: rgba(0, 0, 255, 1)">int</span> <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">编号</span>
<span style="color: rgba(0, 0, 0, 1)">}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建一个新工作者单元</span>
func NewWorker(workerPool chan chan Job, no <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)">) Worker {
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">创建一个新工作者单元</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan </span><span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)">),
no: no,
}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">循环监听任务和结束信号</span>
<span style="color: rgba(0, 0, 0, 1)">func (w Worker) Start() {
go func() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> register the current worker into the worker queue.</span>
w.WorkerPool <-<span style="color: rgba(0, 0, 0, 1)"> w.JobChannel
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">w.WorkerPool <- w.JobChannel</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, w)
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">case</span> job := <-<span style="color: rgba(0, 0, 0, 1)">w.JobChannel:
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">job := <-w.JobChannel</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 收到任务</span>
<span style="color: rgba(0, 0, 0, 1)"> fmt.Println(job)
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">100</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second)
</span><span style="color: rgba(0, 0, 255, 1)">case</span> <-<span style="color: rgba(0, 0, 0, 1)">w.quit:
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 收到退出信号</span>
<span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
}
}()
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 停止信号</span>
<span style="color: rgba(0, 0, 0, 1)">func (w Worker) Stop() {
go func() {
w.quit </span><- <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
}()
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">调度中心</span>
type Dispatcher <span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者池</span>
<span style="color: rgba(0, 0, 0, 1)"> WorkerPool chan chan Job
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者数量</span>
MaxWorkers <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建调度中心</span>
func NewDispatcher(maxWorkers <span style="color: rgba(0, 0, 255, 1)">int</span>) *<span style="color: rgba(0, 0, 0, 1)">Dispatcher {
pool :</span>=<span style="color: rgba(0, 0, 0, 1)"> make(chan chan Job, maxWorkers)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> &<span style="color: rgba(0, 0, 0, 1)">Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者池的初始化</span>
func (d *<span style="color: rgba(0, 0, 0, 1)">Dispatcher) Run() {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> starting n number of workers</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> i := <span style="color: rgba(128, 0, 128, 1)">1</span>; i < d.MaxWorkers+<span style="color: rgba(128, 0, 128, 1)">1</span>; i++<span style="color: rgba(0, 0, 0, 1)"> {
worker :</span>=<span style="color: rgba(0, 0, 0, 1)"> NewWorker(d.WorkerPool, i)
worker.Start()
}
go d.dispatch()
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">调度</span>
func (d *<span style="color: rgba(0, 0, 0, 1)">Dispatcher) dispatch() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">case</span> job := <-<span style="color: rgba(0, 0, 0, 1)">JobQueue:
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">job := <-JobQueue:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
go func(job Job) {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">等待空闲worker (任务多的时候会阻塞这里)</span>
jobChannel := <-<span style="color: rgba(0, 0, 0, 1)">d.WorkerPool
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">jobChannel := <-d.WorkerPool</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, reflect.TypeOf(jobChannel))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 将任务放到上述woker的私有任务channal中</span>
jobChannel <-<span style="color: rgba(0, 0, 0, 1)"> job
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">jobChannel <- job</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
}(job)
}
}
}
func main() {
JobQueue </span>= make(chan Job, <span style="color: rgba(128, 0, 128, 1)">10</span><span style="color: rgba(0, 0, 0, 1)">)
dispatcher :</span>=<span style="color: rgba(0, 0, 0, 1)"> NewDispatcher(MaxWorker)
dispatcher.Run()
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">1</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second)
go addQueue()
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">1000</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second)
}
func addQueue() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span> i := <span style="color: rgba(128, 0, 128, 1)">0</span>; i < <span style="color: rgba(128, 0, 128, 1)">20</span>; i++<span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 新建一个任务</span>
payLoad := Payload{Num: <span style="color: rgba(128, 0, 128, 1)">1</span><span style="color: rgba(0, 0, 0, 1)">}
work :</span>=<span style="color: rgba(0, 0, 0, 1)"> Job{Payload: payLoad}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 任务放入任务队列channal</span>
JobQueue <-<span style="color: rgba(0, 0, 0, 1)"> work
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">JobQueue <- work</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">1</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second)
}
}
</span><span style="color: rgba(0, 128, 0, 1)">/*</span><span style="color: rgba(0, 128, 0, 1)">
一个任务的执行过程如下
JobQueue <- work新任务入队
job := <-JobQueue: 调度中心收到任务
jobChannel := <-d.WorkerPool 从工作者池取到一个工作者
jobChannel <- job 任务给到工作者
job := <-w.JobChannel 工作者取出任务
{{1}} 执行任务
w.WorkerPool <- w.JobChannel 工作者在放回工作者池
</span><span style="color: rgba(0, 128, 0, 1)">*/</span></pre>
</div>
<p> 这样,我们已经能够主动的控制worker的数量。这时候,我问哈大家,我们完全解决问题了么?如果有大量的任务同时涌入,会发生什么样的结果。程序会阻塞等待可用的worker</p>
<div class="cnblogs_code">
<pre>jobChannel := <-d.WorkerPool</pre>
</div>
<p>下面是我们的dispatcher实现代码:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">调度</span>
func (d *<span style="color: rgba(0, 0, 0, 1)">Dispatcher) dispatch() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">case</span> job := <-<span style="color: rgba(0, 0, 0, 1)">JobQueue:
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">job := <-JobQueue:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
go func(job Job) {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">等待空闲worker (任务多的时候会阻塞这里)</span>
jobChannel := <-<span style="color: rgba(0, 0, 0, 1)">d.WorkerPool
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">jobChannel := <-d.WorkerPool</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, reflect.TypeOf(jobChannel))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 将任务放到上述woker的私有任务channal中</span>
jobChannel <-<span style="color: rgba(0, 0, 0, 1)"> job
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">jobChannel <- job</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
}(job)
}
}
}</span></pre>
</div>
<p>这里我们提供了创建worker的最大数目作为参数,并把这些worker加入到worker池里。不要忘记,这个调度方法也是在不断的创建协程等待空闲的worker。我们再改一下代码如下:</p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 0, 0, 1)">package main
import (
</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">fmt</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">reflect</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">runtime</span><span style="color: rgba(128, 0, 0, 1)">"</span>
<span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">time</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">
)
</span><span style="color: rgba(0, 0, 255, 1)">var</span><span style="color: rgba(0, 0, 0, 1)"> (
MaxWorker </span>= <span style="color: rgba(128, 0, 128, 1)">10</span><span style="color: rgba(0, 0, 0, 1)">
)
type Payload </span><span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
Num </span><span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">待执行的工作</span>
type Job <span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
Payload Payload
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">任务channal</span>
<span style="color: rgba(0, 0, 255, 1)">var</span><span style="color: rgba(0, 0, 0, 1)"> JobQueue chan Job
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">执行任务的工作者单元</span>
type Worker <span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
WorkerPool chan chan Job </span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者池--每个元素是一个工作者的私有任务channal</span>
JobChannel chan Job <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">每个工作者单元包含一个任务管道 用于获取任务</span>
quit chan <span style="color: rgba(0, 0, 255, 1)">bool</span> <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">退出信号</span>
no <span style="color: rgba(0, 0, 255, 1)">int</span> <span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">编号</span>
<span style="color: rgba(0, 0, 0, 1)">}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建一个新工作者单元</span>
func NewWorker(workerPool chan chan Job, no <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)">) Worker {
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">创建一个新工作者单元</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)"> Worker{
WorkerPool: workerPool,
JobChannel: make(chan Job),
quit: make(chan </span><span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)">),
no: no,
}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">循环监听任务和结束信号</span>
<span style="color: rgba(0, 0, 0, 1)">func (w Worker) Start() {
go func() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> register the current worker into the worker queue.</span>
w.WorkerPool <-<span style="color: rgba(0, 0, 0, 1)"> w.JobChannel
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">w.WorkerPool <- w.JobChannel</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, w)
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">case</span> job := <-<span style="color: rgba(0, 0, 0, 1)">w.JobChannel:
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">job := <-w.JobChannel</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 收到任务</span>
<span style="color: rgba(0, 0, 0, 1)"> fmt.Println(job)
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">100</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second)
</span><span style="color: rgba(0, 0, 255, 1)">case</span> <-<span style="color: rgba(0, 0, 0, 1)">w.quit:
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 收到退出信号</span>
<span style="color: rgba(0, 0, 255, 1)">return</span><span style="color: rgba(0, 0, 0, 1)">
}
}
}()
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 停止信号</span>
<span style="color: rgba(0, 0, 0, 1)">func (w Worker) Stop() {
go func() {
w.quit </span><- <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
}()
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">调度中心</span>
type Dispatcher <span style="color: rgba(0, 0, 255, 1)">struct</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者池</span>
<span style="color: rgba(0, 0, 0, 1)"> WorkerPool chan chan Job
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者数量</span>
MaxWorkers <span style="color: rgba(0, 0, 255, 1)">int</span><span style="color: rgba(0, 0, 0, 1)">
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">创建调度中心</span>
func NewDispatcher(maxWorkers <span style="color: rgba(0, 0, 255, 1)">int</span>) *<span style="color: rgba(0, 0, 0, 1)">Dispatcher {
pool :</span>=<span style="color: rgba(0, 0, 0, 1)"> make(chan chan Job, maxWorkers)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> &<span style="color: rgba(0, 0, 0, 1)">Dispatcher{WorkerPool: pool, MaxWorkers: maxWorkers}
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">工作者池的初始化</span>
func (d *<span style="color: rgba(0, 0, 0, 1)">Dispatcher) Run() {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> starting n number of workers</span>
<span style="color: rgba(0, 0, 255, 1)">for</span> i := <span style="color: rgba(128, 0, 128, 1)">1</span>; i < d.MaxWorkers+<span style="color: rgba(128, 0, 128, 1)">1</span>; i++<span style="color: rgba(0, 0, 0, 1)"> {
worker :</span>=<span style="color: rgba(0, 0, 0, 1)"> NewWorker(d.WorkerPool, i)
worker.Start()
}
go d.dispatch()
}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">调度</span>
func (d *<span style="color: rgba(0, 0, 0, 1)">Dispatcher) dispatch() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">case</span> job := <-<span style="color: rgba(0, 0, 0, 1)">JobQueue:
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">job := <-JobQueue:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
go func(job Job) {
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">等待空闲worker (任务多的时候会阻塞这里</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">等待空闲worker (任务多的时候会阻塞这里)</span>
jobChannel := <-<span style="color: rgba(0, 0, 0, 1)">d.WorkerPool
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">jobChannel := <-d.WorkerPool</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, reflect.TypeOf(jobChannel))
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 将任务放到上述woker的私有任务channal中</span>
jobChannel <-<span style="color: rgba(0, 0, 0, 1)"> job
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">jobChannel <- job</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
}(job)
}
}
}
func main() {
JobQueue </span>= make(chan Job, <span style="color: rgba(128, 0, 128, 1)">10</span><span style="color: rgba(0, 0, 0, 1)">)
dispatcher :</span>=<span style="color: rgba(0, 0, 0, 1)"> NewDispatcher(MaxWorker)
dispatcher.Run()
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">1</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second)
go addQueue()
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">1000</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Second)
}
func addQueue() {
</span><span style="color: rgba(0, 0, 255, 1)">for</span> i := <span style="color: rgba(128, 0, 128, 1)">0</span>; i < <span style="color: rgba(128, 0, 128, 1)">100</span>; i++<span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 新建一个任务</span>
payLoad :=<span style="color: rgba(0, 0, 0, 1)"> Payload{Num: i}
work :</span>=<span style="color: rgba(0, 0, 0, 1)"> Job{Payload: payLoad}
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 任务放入任务队列channal</span>
JobQueue <-<span style="color: rgba(0, 0, 0, 1)"> work
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">JobQueue <- work</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, i)
fmt.Println(</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(128, 0, 0, 1)">当前协程数:</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">, runtime.NumGoroutine())
time.Sleep(</span><span style="color: rgba(128, 0, 128, 1)">100</span> *<span style="color: rgba(0, 0, 0, 1)"> time.Millisecond)
}
}</span></pre>
</div>
<p>执行结果如下:</p>
<p> <img src="https://img2020.cnblogs.com/blog/1102222/202006/1102222-20200630012652749-1581739008.png"></p>
<p> </p>
<p> <img src="https://img2020.cnblogs.com/blog/1102222/202006/1102222-20200630012732271-424337045.png"></p>
<p>这里我们发现,我们依然没能控制住协程数量,我们只是控制住了worker的数量。这种情况下,如果worker数量设置的合理且异步任务耗时不是特别长的情况下一般没有问题。但是出于安全的考虑,我要把这个阻塞的协程数做一个控制,如果达到限制时候拒绝服务以保护系统该怎么处理?</p>
<h3 id="真正控制协程数量(并发执行的任务数)">真正控制协程数量(并发执行的任务数)</h3>
<p>我们可以控制并发执行(包括等待执行)的任务数。我们加入任务使用如下判断。用一个带缓冲的Channel控制并发执行的任务数。</p>
<p>当任务异步处理完成的时候执行<code><- DispatchNumControl</code>释放控制即可。用这种方法,</p>
<p>我们可以根据压测结果设置合适的并发数从而保证系统能够尽可能的发挥自己的能力,同时保证不会因为任务量太大而崩溃(因为达到极限的时候,系统会告诉调用方:牛仔我很忙)。</p>
<p><span style="color: rgba(51, 51, 51, 1); font-family: "Microsoft YaHei", Arial, sans-serif; font-size: 13px"> 比如定义一个limit函数读取是否存在发送的任务队列:</span></p>
<div class="cnblogs_code">
<pre><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)">用于控制并发处理的协程数</span>
<span style="color: rgba(0, 0, 255, 1)">var</span> DispatchNumControl = make(chan <span style="color: rgba(0, 0, 255, 1)">bool</span>, <span style="color: rgba(128, 0, 128, 1)">10000</span><span style="color: rgba(0, 0, 0, 1)">)
func Limit(work Job) </span><span style="color: rgba(0, 0, 255, 1)">bool</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">select</span><span style="color: rgba(0, 0, 0, 1)"> {
</span><span style="color: rgba(0, 0, 255, 1)">case</span> <-time.After(time.Millisecond * <span style="color: rgba(128, 0, 128, 1)">100</span><span style="color: rgba(0, 0, 0, 1)">):
fmt.println(</span><span style="color: rgba(128, 0, 0, 1)">"牛仔</span><span style="color: rgba(128, 0, 0, 1)">我很忙</span><span style="color: rgba(128, 0, 0, 1)">"</span><span style="color: rgba(0, 0, 0, 1)">)
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">false</span>
<span style="color: rgba(0, 0, 255, 1)">case</span> DispatchNumControl <- <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">:
</span><span style="color: rgba(0, 128, 0, 1)">//</span><span style="color: rgba(0, 128, 0, 1)"> 任务放入任务队列channal</span>
jobChannel <-<span style="color: rgba(0, 0, 0, 1)"> work
</span><span style="color: rgba(0, 0, 255, 1)">return</span> <span style="color: rgba(0, 0, 255, 1)">true</span><span style="color: rgba(0, 0, 0, 1)">
}
}</span></pre>
</div>
<h3 id="总结">结束语</h3>
<p>我们本可以通过大量的队列,后台workers,复杂的调度来设计一套复杂的系统,协程是个好的设计,但任何东西都不能过度使用。</p>
<p>我们做系统设计的时候,一定也要时刻想着<strong>控制:</strong>要对自己设计的系统有着足够的控制力。<br>另外综合上面的实现。为什么 dispatch 这里要用 协程 呢?阻塞完全没问题? 欢迎广大博友拍砖留言。。。。</p>
<p> </p>
</div>
<div id="MySignature" role="contentinfo">
无论从事什么行业,只要做好两件事就够了,一个是你的专业、一个是你的人品,专业决定了你的存在,人品决定了你的人脉,剩下的就是坚持,用善良專業和真诚赢取更多的信任。<br><br>
来源:https://www.cnblogs.com/phpper/p/13211343.html
頁:
[1]