史洁 發表於 2019-6-19 12:28:00

雪崩利器 hystrix-go 源码分析

<p>阅读源码的过程,就像是在像武侠小说里阅读武功秘籍一样,分析高手的一招一式,提炼出精髓,来增强自己的内力。<br>
之前的帖子说了一下微服务的雪崩效应和常见的解决方案,太水,没有上代码怎么叫解决方案。<code>github</code>上有很多开源的库来解决<code>雪崩问题</code>,比较出名的是<code>Netflix</code>的开源库hystrix。集<code>流量控制</code>、<code>熔断</code>、<code>容错</code>等于一身的<code>java</code>语言的库。今天分析的源码库是 hystrix-go,他是hystrix的的<code>go</code>语言版,应该是说简化版本,用很少的代码量实现了主要功能。很推荐朋友们有时间读一读。</p>
<h2 id="使用简单">使用简单</h2>
<p><code>hystrix</code>的使用是非常简单的,同步执行,直接调用<code>Do</code>方法。</p>
<pre><code>err := hystrix.Do("my_command", func() error {
   // talk to other services
   return nil
}, func(err error) error {
   // do this when services are down
   return nil
})
</code></pre>
<p>异步执行<code>Go</code>方法,内部实现是启动了一个<code>gorouting</code>,如果想得到自定义方法的数据,需要你传<code>channel</code>来处理数据,或者输出。返回的<code>error</code>也是一个<code>channel</code></p>
<pre><code> output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
        // talk to other services
        output &lt;- true
        return nil
}, nil)

select {
case out := &lt;-output:
        // success
case err := &lt;-errors:
        // failure
</code></pre>
<p>大概的执行流程图<br>
<img src="https://img2018.cnblogs.com/blog/342595/201906/342595-20190619121443948-1229671530.png" alt="" loading="lazy"></p>
<p>其实方法<code>Do</code>和<code>Go</code>方法内部都是调用了<code>hystrix.GoC</code>方法,只是<code>Do</code>方法处理了异步的过程</p>
<pre><code>func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
        done := make(chan struct{}, 1)
        r := func(ctx context.Context) error {
                err := run(ctx)
                if err != nil {
                        return err
                }
                done &lt;- struct{}{}
                return nil
        }
        f := func(ctx context.Context, e error) error {
                err := fallback(ctx, e)
                if err != nil {
                        return err
                }
                done &lt;- struct{}{}
                return nil
        }
        var errChan chan error
        if fallback == nil {
                errChan = GoC(ctx, name, r, nil)
        } else {
                errChan = GoC(ctx, name, r, f)
        }

        select {
        case &lt;-done:
                return nil
        case err := &lt;-errChan:
                return err
        }
}
</code></pre>
<h3 id="自定义command配置">自定义Command配置</h3>
<p>在调用<code>Do</code> <code>Go</code>等方法之前我们可以先自定义一些配置</p>
<pre><code>        hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
                Timeout:                int(time.Second * 3),
                MaxConcurrentRequests:100,
                SleepWindow:            int(time.Second * 5),
                RequestVolumeThreshold: 30,
                ErrorPercentThreshold: 50,
        })

        err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error {
                // ...
                return nil
        }, func(i context.Context, e error) error {
                // ...
                return e
        })
</code></pre>
<p>我大要说了一下<code>CommandConfig</code>第个字段的意义:</p>
<ul>
<li>Timeout: 执行command的超时时间。<code>默认时间是1000毫秒</code></li>
<li>MaxConcurrentRequests:command的最大并发量 <code>默认值是10</code></li>
<li>SleepWindow:当熔断器被打开后,SleepWindow的时间就是控制过多久后去尝试服务是否可用了。<code>默认值是5000毫秒</code></li>
<li>RequestVolumeThreshold: 一个统计窗口10秒内请求数量。达到这个请求数量后才去判断是否要开启熔断。<code>默认值是20</code></li>
<li>ErrorPercentThreshold:错误百分比,请求数量大于等于<code>RequestVolumeThreshold</code>并且错误率到达这个百分比后就会启动<code>熔断</code> <code>默认值是50</code></li>
</ul>
<p>当然如果不配置他们,会使用<code>默认值</code></p>
<p>讲完了怎么用,接下来就是分析源码了。我是从下层到上层的顺序分析代码和执行流程</p>
<h2 id="统计控制器">统计控制器</h2>
<p>每一个Command都会有一个默认统计控制器,当然也可以添加多个自定义的控制器。<br>
默认的统计控制器<code>DefaultMetricCollector</code>保存着<code>熔断器</code>的所有状态,<code>调用次数</code>,<code>失败次数</code>,<code>被拒绝次数</code>等等</p>
<pre><code>type DefaultMetricCollector struct {
        mutex *sync.RWMutex

        numRequests *rolling.Number
        errors      *rolling.Number

        successes               *rolling.Number
        failures                *rolling.Number
        rejects               *rolling.Number
        shortCircuits         *rolling.Number
        timeouts                *rolling.Number
        contextCanceled         *rolling.Number
        contextDeadlineExceeded *rolling.Number

        fallbackSuccesses *rolling.Number
        fallbackFailures*rolling.Number
        totalDuration   *rolling.Timing
        runDuration       *rolling.Timing
}
</code></pre>
<p>最主要的还是要看一下<code>rolling.Number</code>,<code>rolling.Number</code>才是状态最终保存的地方<br>
<code>Number</code>保存了10秒内的<code>Buckets</code>数据信息,每一个<code>Bucket</code>的统计时长为1秒</p>
<p><img src="https://img2018.cnblogs.com/blog/342595/201906/342595-20190619121504664-2138088134.png" alt="" loading="lazy"></p>
<pre><code>type Number struct {
        Buckets map*numberBucket
        Mutex   *sync.RWMutex
}

type numberBucket struct {
        Value float64
}
</code></pre>
<p>字典字段<code>Buckets map*numberBucket</code> 中的<code>Key</code>保存的是当前时间<br>
可能你会好奇<code>Number</code>是如何保证只保存10秒内的数据的。每一次对<code>熔断器</code>的状态进行修改时,<code>Number</code>都要先得到当前的时间(秒级)的<code>Bucket</code>不存在则创建。</p>
<pre><code>func (r *Number) getCurrentBucket() *numberBucket {
        now := time.Now().Unix()
        var bucket *numberBucket
        var ok bool

        if bucket, ok = r.Buckets; !ok {
                bucket = &amp;numberBucket{}
                r.Buckets = bucket
        }

        return bucket
}
</code></pre>
<p>修改完后去掉10秒外的数据</p>
<pre><code>func (r *Number) removeOldBuckets() {
        now := time.Now().Unix() - 10

        for timestamp := range r.Buckets {
                // TODO: configurable rolling window
                if timestamp &lt;= now {
                        delete(r.Buckets, timestamp)
                }
        }
}
</code></pre>
<p>比如<code>Increment</code>方法,先得到<code>Bucket</code>再删除旧的数据</p>
<pre><code>func (r *Number) Increment(i float64) {
        if i == 0 {
                return
        }

        r.Mutex.Lock()
        defer r.Mutex.Unlock()

        b := r.getCurrentBucket()
        b.Value += i
        r.removeOldBuckets()
}
</code></pre>
<p>统计控制器是最基层和最重要的一个实现,上层所有的执行判断都是基于他的数据进行逻辑处理的</p>
<h3 id="上报执行状态信息">上报执行状态信息</h3>
<pre><code>断路器--&gt;执行--&gt;上报执行状态信息--&gt;保存到相应的Buckets
</code></pre>
<p><img src="https://img2018.cnblogs.com/blog/342595/201906/342595-20190619121523619-305535128.png" alt="" loading="lazy"></p>
<p>每一次断路器逻辑的执行都会上报执行过程中的状态,</p>
<pre><code>// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
        // ...
        circuit.mutex.RLock()
        o := circuit.open
        circuit.mutex.RUnlock()
        if eventTypes == "success" &amp;&amp; o {
                circuit.setClose()
        }
        var concurrencyInUse float64
        if circuit.executorPool.Max &gt; 0 {
                concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
        }
        select {
        case circuit.metrics.Updates &lt;- &amp;commandExecution{
                Types:            eventTypes,
                Start:            start,
                RunDuration:      runDuration,
                ConcurrencyInUse: concurrencyInUse,
        }:
        default:
                return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
        }

        return nil
}
</code></pre>
<p><code>circuit.metrics.Updates</code> 这个信道就是处理上报信息的,上报执行状态自信的结构是<code>metricExchange</code>,结构体很简单只有4个字段。要的就是</p>
<ul>
<li><code>channel</code>字段<code>Updates</code> 他是一个有<code>buffer</code>的<code>channel</code>默认的数量是<code>2000</code>个,所有的状态信息都在他里面</li>
<li><code>metricCollectors</code>字段,就是保存的具体的这个<code>command</code>执行过程中的各种信息</li>
</ul>
<pre><code>type metricExchange struct {
        Name    string
        Updates chan *commandExecution
        Mutex   *sync.RWMutex

        metricCollectors []metricCollector.MetricCollector
}

type commandExecution struct {
        Types            []string      `json:"types"`
        Start            time.Time   `json:"start_time"`
        RunDuration      time.Duration `json:"run_duration"`
        ConcurrencyInUse float64       `json:"concurrency_inuse"`
}

func newMetricExchange(name string) *metricExchange {
        m := &amp;metricExchange{}
        m.Name = name

        m.Updates = make(chan *commandExecution, 2000)
        m.Mutex = &amp;sync.RWMutex{}
        m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
        m.Reset()

        go m.Monitor()

        return m
}
</code></pre>
<p>在执行<code>newMetricExchange</code>的时候会启动一个协程 <code>go m.Monitor()</code>去监控<code>Updates</code>的数据,然后上报给<code>metricCollectors</code> 保存执行的信息数据比如前面提到的<code>调用次数</code>,<code>失败次数</code>,<code>被拒绝次数</code>等等</p>
<pre><code>func (m *metricExchange) Monitor() {
        for update := range m.Updates {
                // we only grab a read lock to make sure Reset() isn't changing the numbers.
                m.Mutex.RLock()

                totalDuration := time.Since(update.Start)
                wg := &amp;sync.WaitGroup{}
                for _, collector := range m.metricCollectors {
                        wg.Add(1)
                        go m.IncrementMetrics(wg, collector, update, totalDuration)
                }
                wg.Wait()

                m.Mutex.RUnlock()
        }
}
</code></pre>
<p>更新调用的是<code>go m.IncrementMetrics(wg, collector, update, totalDuration)</code>,里面判断了他的状态</p>
<pre><code>func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
        // granular metrics
        r := metricCollector.MetricResult{
                Attempts:         1,
                TotalDuration:    totalDuration,
                RunDuration:      update.RunDuration,
                ConcurrencyInUse: update.ConcurrencyInUse,
        }
        switch update.Types {
        case "success":
                r.Successes = 1
        case "failure":
                r.Failures = 1
                r.Errors = 1
        case "rejected":
                r.Rejects = 1
                r.Errors = 1
        // ...
        }
        // ...
        collector.Update(r)
        wg.Done()
}
</code></pre>
<h2 id="流量控制">流量控制</h2>
<p><code>hystrix-go</code>对流量控制的代码是很简单的。用了一个简单的令牌算法,能得到令牌的就可以执行后继的工作,执行完后要返还令牌。得不到令牌就拒绝,拒绝后调用用户设置的<code>callback</code>方法,如果没有设置就不执行。<br>
结构体<code>executorPool</code>就是<code>hystrix-go</code> <code>流量控制</code>的具体实现。字段<code>Max</code>就是每秒最大的并发值。</p>
<pre><code>type executorPool struct {
        Name    string
        Metrics *poolMetrics
        Max   int
        Tickets chan *struct{}
}
</code></pre>
<p>在创建<code>executorPool</code>的时候,会根据<code>Max</code>值来创建<code>令牌</code>。Max值如果没有设置会使用默认值<code>10</code></p>
<pre><code>func newExecutorPool(name string) *executorPool {
        p := &amp;executorPool{}
        p.Name = name
        p.Metrics = newPoolMetrics(name)
        p.Max = getSettings(name).MaxConcurrentRequests

        p.Tickets = make(chan *struct{}, p.Max)
        for i := 0; i &lt; p.Max; i++ {
                p.Tickets &lt;- &amp;struct{}{}
        }

        return p
}
</code></pre>
<h3 id="流量控制上报状态">流量控制上报状态</h3>
<p>注意一下字段 <code>Metrics</code> 他用于统计执行数量,比如:<code>执行的总数量</code>,<code>最大的并发数</code> 具体的代码就不贴上来了。这个数量也可以显露出,供可视化程序直观的表现出来。</p>
<p>令牌使用完后是需要返还的,返回的时候才会做上面所说的统计工作。</p>
<pre><code>func (p *executorPool) Return(ticket *struct{}) {
        if ticket == nil {
                return
        }

        p.Metrics.Updates &lt;- poolMetricsUpdate{
                activeCount: p.ActiveCount(),
        }
        p.Tickets &lt;- ticket
}

func (p *executorPool) ActiveCount() int {
        return p.Max - len(p.Tickets)
}
</code></pre>
<h2 id="一次command的执行的流程">一次Command的执行的流程</h2>
<p>上面把 <code>统计控制器</code>、<code>流量控制</code>、<code>上报执行状态</code>讲完了,主要的实现也就讲的差不多了。最后就是串一次command的执行都经历了啥:</p>
<pre><code> err := hystrix.Do("my_command", func() error {
        // talk to other services
        return nil
}, func(err error) error {
        // do this when services are down
        return nil
})
</code></pre>
<p><code>hystrix</code>在执行一次command的前面也有提到过会调用<code>GoC</code>方法,下面我把代码贴出来来,<code>篇幅问题去掉了一些代码</code>,主要逻辑都在。就是在<code>判断断路器是否已打开</code>,<code>得到Ticket</code>得不到就限流,<code>执行我们自己的的方法</code>,<code>判断context是否Done或者执行是否超时</code><br>
当然,每次执行结果都要<code>上报执行状态</code>,最后要<code>返还Ticket</code></p>
<pre><code>func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
        cmd := &amp;command{
                run:      run,
                fallback: fallback,
                start:    time.Now(),
                errChan:make(chan error, 1),
                finished: make(chan bool, 1),
        }
        //得到断路器,不存在则创建
        circuit, _, err := GetCircuit(name)
        if err != nil {
                cmd.errChan &lt;- err
                return cmd.errChan
        }
        //...
        // 返还ticket
        returnTicket := func() {
                // ...
                cmd.circuit.executorPool.Return(cmd.ticket)
        }
        // 上报执行状态
        reportAllEvent := func() {
                err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
                // ...
        }
        go func() {
                defer func() { cmd.finished &lt;- true }()
                // 查看断路器是否已打开
                if !cmd.circuit.AllowRequest() {
                        // ...
                        returnOnce.Do(func() {
                                returnTicket()
                                cmd.errorWithFallback(ctx, ErrCircuitOpen)
                                reportAllEvent()
                        })
                        return
                }
                // ...
                // 获取ticket 如果得不到就限流
                select {
                case cmd.ticket = &lt;-circuit.executorPool.Tickets:
                        ticketChecked = true
                        ticketCond.Signal()
                        cmd.Unlock()
                default:
                        // ...
                        returnOnce.Do(func() {
                                returnTicket()
                                cmd.errorWithFallback(ctx, ErrMaxConcurrency)
                                reportAllEvent()
                        })
                        return
                }
                // 执行我们自已的方法,并上报执行信息
                returnOnce.Do(func() {
                        defer reportAllEvent()
                        cmd.runDuration = time.Since(runStart)
                        returnTicket()
                        if runErr != nil {
                                cmd.errorWithFallback(ctx, runErr)
                                return
                        }
                        cmd.reportEvent("success")
                })
        }()
        // 等待context是否被结束,或执行者超时,并上报
        go func() {
                timer := time.NewTimer(getSettings(name).Timeout)
                defer timer.Stop()

                select {
                case &lt;-cmd.finished:
                        // returnOnce has been executed in another goroutine
                case &lt;-ctx.Done():
                        // ...
                        return
                case &lt;-timer.C:
                        // ...
                }
        }()

        return cmd.errChan
}
</code></pre>
<h2 id="dashboard-可视化hystrix的上报信息">dashboard 可视化hystrix的上报信息</h2>
<p>代码中<code>StreamHandler</code>就是把所有<code>断路器</code>的状态以流的方式不断的推送到dashboard. 这部分代码我就不用说了,很简单。<br>
需要在你的服务端加3行代码,启动我们的流服务</p>
<pre><code>        hystrixStreamHandler := hystrix.NewStreamHandler()
        hystrixStreamHandler.Start()
        go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
</code></pre>
<p><code>dashboard</code>我使用的是<code>docker</code>版。</p>
<pre><code>docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest
</code></pre>
<p><img src="https://img2018.cnblogs.com/blog/342595/201906/342595-20190619121558413-476710354.png" alt="" loading="lazy"></p>
<p>在下面输入你服务的地址,我是<br>
<code>http://192.168.1.67:81/hystrix.stream</code></p>
<p><img src="https://img2018.cnblogs.com/blog/342595/201906/342595-20190619121607579-678713551.png" alt="" loading="lazy"></p>
<p>如果是集群可以使用Turbine进行监控,有时间大家自己来看吧</p>
<p><img src="https://img2018.cnblogs.com/blog/342595/201906/342595-20190619121425618-179856202.png" alt="" loading="lazy"></p>


</div>
<div id="MySignature" role="contentinfo">
    <div id="AllanboltSignature">
<div>作者:李鹏</div>
<div>出处:http://www.cnblogs.com/li-peng/</div>
<div>本文版权归作者和博客园共有,欢迎转载,但未经作者同意必须保留此段声明,且在文章页面明显位置给出原文连接,否则保留追究法律责任的权利。 </div>
</div><br><br>
来源:https://www.cnblogs.com/li-peng/p/11050563.html
頁: [1]
查看完整版本: 雪崩利器 hystrix-go 源码分析