当前位置:网站首页>golang 源码分析(39)hystrix-go
golang 源码分析(39)hystrix-go
2022-08-02 17:38:00 【用户9710217】
较出名的是Netflix
的开源库hystrix。集流量控制
、熔断
、容错
等于一身的java
语言的库。今天分析的源码库是 hystrix-go,他是hystrix的的go
语言版,应该是说简化版本,用很少的代码量实现了主要功能。很推荐朋友们有时间读一读。
使用简单
hystrix
的使用是非常简单的,同步执行,直接调用Do
方法。
err := hystrix.Do("my_command", func() error {
// talk to other services
return nil
}, func(err error) error {
// do this when services are down
return nil
})
异步执行Go
方法,内部实现是启动了一个gorouting
,如果想得到自定义方法的数据,需要你传channel
来处理数据,或者输出。返回的error
也是一个channel
output := make(chan bool, 1)
errors := hystrix.Go("my_command", func() error {
// talk to other services
output <- true
return nil
}, nil)
select {
case out := <-output:
// success
case err := <-errors:
// failure
大概的执行流程图
其实方法Do
和Go
方法内部都是调用了hystrix.GoC
方法,只是Do
方法处理了异步的过程
func DoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) error {
done := make(chan struct{}, 1)
r := func(ctx context.Context) error {
err := run(ctx)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
f := func(ctx context.Context, e error) error {
err := fallback(ctx, e)
if err != nil {
return err
}
done <- struct{}{}
return nil
}
var errChan chan error
if fallback == nil {
errChan = GoC(ctx, name, r, nil)
} else {
errChan = GoC(ctx, name, r, f)
}
select {
case <-done:
return nil
case err := <-errChan:
return err
}
}
自定义Command配置
在调用Do
Go
等方法之前我们可以先自定义一些配置
hystrix.ConfigureCommand("mycommand", hystrix.CommandConfig{
Timeout: int(time.Second * 3),
MaxConcurrentRequests: 100,
SleepWindow: int(time.Second * 5),
RequestVolumeThreshold: 30,
ErrorPercentThreshold: 50,
})
err := hystrix.DoC(context.Background(), "mycommand", func(ctx context.Context) error {
// ...
return nil
}, func(i context.Context, e error) error {
// ...
return e
})
我大要说了一下CommandConfig
第个字段的意义:
- Timeout: 执行command的超时时间。
默认时间是1000毫秒
- MaxConcurrentRequests:command的最大并发量
默认值是10
- SleepWindow:当熔断器被打开后,SleepWindow的时间就是控制过多久后去尝试服务是否可用了。
默认值是5000毫秒
- RequestVolumeThreshold:一个统计窗口10秒内请求数量。达到这个请求数量后才去判断是否要开启熔断。
默认值是20
- ErrorPercentThreshold:错误百分比,请求数量大于等于
RequestVolumeThreshold
并且错误率到达这个百分比后就会启动熔断
默认值是50
当然如果不配置他们,会使用默认值
讲完了怎么用,接下来就是分析源码了。我是从下层到上层的顺序分析代码和执行流程
统计控制器
每一个Command都会有一个默认统计控制器,当然也可以添加多个自定义的控制器。 默认的统计控制器DefaultMetricCollector
保存着熔断器
的所有状态,调用次数
,失败次数
,被拒绝次数
等等
type DefaultMetricCollector struct {
mutex *sync.RWMutex
numRequests *rolling.Number
errors *rolling.Number
successes *rolling.Number
failures *rolling.Number
rejects *rolling.Number
shortCircuits *rolling.Number
timeouts *rolling.Number
contextCanceled *rolling.Number
contextDeadlineExceeded *rolling.Number
fallbackSuccesses *rolling.Number
fallbackFailures *rolling.Number
totalDuration *rolling.Timing
runDuration *rolling.Timing
}
最主要的还是要看一下rolling.Number
,rolling.Number
才是状态最终保存的地方 Number
保存了10秒内的Buckets
数据信息,每一个Bucket
的统计时长为1秒
type Number struct {
Buckets map[int64]*numberBucket
Mutex *sync.RWMutex
}
type numberBucket struct {
Value float64
}
字典字段Buckets map[int64]*numberBucket
中的Key
保存的是当前时间 可能你会好奇Number
是如何保证只保存10秒内的数据的。每一次对熔断器
的状态进行修改时,Number
都要先得到当前的时间(秒级)的Bucket
不存在则创建。
func (r *Number) getCurrentBucket() *numberBucket {
now := time.Now().Unix()
var bucket *numberBucket
var ok bool
if bucket, ok = r.Buckets[now]; !ok {
bucket = &numberBucket{}
r.Buckets[now] = bucket
}
return bucket
}
修改完后去掉10秒外的数据
func (r *Number) removeOldBuckets() {
now := time.Now().Unix() - 10
for timestamp := range r.Buckets {
// TODO: configurable rolling window
if timestamp <= now {
delete(r.Buckets, timestamp)
}
}
}
比如Increment
方法,先得到Bucket
再删除旧的数据
func (r *Number) Increment(i float64) {
if i == 0 {
return
}
r.Mutex.Lock()
defer r.Mutex.Unlock()
b := r.getCurrentBucket()
b.Value += i
r.removeOldBuckets()
}
统计控制器是最基层和最重要的一个实现,上层所有的执行判断都是基于他的数据进行逻辑处理的
上报执行状态信息
断路器-->执行-->上报执行状态信息-->保存到相应的Buckets
每一次断路器逻辑的执行都会上报执行过程中的状态,
// ReportEvent records command metrics for tracking recent error rates and exposing data to the dashboard.
func (circuit *CircuitBreaker) ReportEvent(eventTypes []string, start time.Time, runDuration time.Duration) error {
// ...
circuit.mutex.RLock()
o := circuit.open
circuit.mutex.RUnlock()
if eventTypes[0] == "success" && o {
circuit.setClose()
}
var concurrencyInUse float64
if circuit.executorPool.Max > 0 {
concurrencyInUse = float64(circuit.executorPool.ActiveCount()) / float64(circuit.executorPool.Max)
}
select {
case circuit.metrics.Updates <- &commandExecution{
Types: eventTypes,
Start: start,
RunDuration: runDuration,
ConcurrencyInUse: concurrencyInUse,
}:
default:
return CircuitError{Message: fmt.Sprintf("metrics channel (%v) is at capacity", circuit.Name)}
}
return nil
}
circuit.metrics.Updates
这个信道就是处理上报信息的,上报执行状态自信的结构是metricExchange
,结构体很简单只有4个字段。要的就是
channel
字段Updates
他是一个有buffer
的channel
默认的数量是2000
个,所有的状态信息都在他里面metricCollectors
字段,就是保存的具体的这个command
执行过程中的各种信息
type metricExchange struct {
Name string
Updates chan *commandExecution
Mutex *sync.RWMutex
metricCollectors []metricCollector.MetricCollector
}
type commandExecution struct {
Types []string `json:"types"`
Start time.Time `json:"start_time"`
RunDuration time.Duration `json:"run_duration"`
ConcurrencyInUse float64 `json:"concurrency_inuse"`
}
func newMetricExchange(name string) *metricExchange {
m := &metricExchange{}
m.Name = name
m.Updates = make(chan *commandExecution, 2000)
m.Mutex = &sync.RWMutex{}
m.metricCollectors = metricCollector.Registry.InitializeMetricCollectors(name)
m.Reset()
go m.Monitor()
return m
}
在执行newMetricExchange
的时候会启动一个协程 go m.Monitor()
去监控Updates
的数据,然后上报给metricCollectors
保存执行的信息数据比如前面提到的调用次数
,失败次数
,被拒绝次数
等等
func (m *metricExchange) Monitor() {
for update := range m.Updates {
// we only grab a read lock to make sure Reset() isn't changing the numbers.
m.Mutex.RLock()
totalDuration := time.Since(update.Start)
wg := &sync.WaitGroup{}
for _, collector := range m.metricCollectors {
wg.Add(1)
go m.IncrementMetrics(wg, collector, update, totalDuration)
}
wg.Wait()
m.Mutex.RUnlock()
}
}
更新调用的是go m.IncrementMetrics(wg, collector, update, totalDuration)
,里面判断了他的状态
func (m *metricExchange) IncrementMetrics(wg *sync.WaitGroup, collector metricCollector.MetricCollector, update *commandExecution, totalDuration time.Duration) {
// granular metrics
r := metricCollector.MetricResult{
Attempts: 1,
TotalDuration: totalDuration,
RunDuration: update.RunDuration,
ConcurrencyInUse: update.ConcurrencyInUse,
}
switch update.Types[0] {
case "success":
r.Successes = 1
case "failure":
r.Failures = 1
r.Errors = 1
case "rejected":
r.Rejects = 1
r.Errors = 1
// ...
}
// ...
collector.Update(r)
wg.Done()
}
流量控制
hystrix-go
对流量控制的代码是很简单的。用了一个简单的令牌算法,能得到令牌的就可以执行后继的工作,执行完后要返还令牌。得不到令牌就拒绝,拒绝后调用用户设置的callback
方法,如果没有设置就不执行。 结构体executorPool
就是hystrix-go
流量控制
的具体实现。字段Max
就是每秒最大的并发值。
type executorPool struct {
Name string
Metrics *poolMetrics
Max int
Tickets chan *struct{}
}
在创建executorPool
的时候,会根据Max
值来创建令牌
。Max值如果没有设置会使用默认值10
func newExecutorPool(name string) *executorPool {
p := &executorPool{}
p.Name = name
p.Metrics = newPoolMetrics(name)
p.Max = getSettings(name).MaxConcurrentRequests
p.Tickets = make(chan *struct{}, p.Max)
for i := 0; i < p.Max; i++ {
p.Tickets <- &struct{}{}
}
return p
}
流量控制上报状态
注意一下字段 Metrics
他用于统计执行数量,比如:执行的总数量
,最大的并发数
具体的代码就不贴上来了。这个数量也可以显露出,供可视化程序直观的表现出来。
令牌使用完后是需要返还的,返回的时候才会做上面所说的统计工作。
func (p *executorPool) Return(ticket *struct{}) {
if ticket == nil {
return
}
p.Metrics.Updates <- poolMetricsUpdate{
activeCount: p.ActiveCount(),
}
p.Tickets <- ticket
}
func (p *executorPool) ActiveCount() int {
return p.Max - len(p.Tickets)
}
一次Command的执行的流程
上面把 统计控制器
、流量控制
、上报执行状态
讲完了,主要的实现也就讲的差不多了。最后就是串一次command的执行都经历了啥:
err := hystrix.Do("my_command", func() error {
// talk to other services
return nil
}, func(err error) error {
// do this when services are down
return nil
})
hystrix
在执行一次command的前面也有提到过会调用GoC
方法,下面我把代码贴出来来,篇幅问题去掉了一些代码
,主要逻辑都在。就是在判断断路器是否已打开
,得到Ticket
得不到就限流,执行我们自己的的方法
,判断context是否Done或者执行是否超时
当然,每次执行结果都要上报执行状态
,最后要返还Ticket
func GoC(ctx context.Context, name string, run runFuncC, fallback fallbackFuncC) chan error {
cmd := &command{
run: run,
fallback: fallback,
start: time.Now(),
errChan: make(chan error, 1),
finished: make(chan bool, 1),
}
//得到断路器,不存在则创建
circuit, _, err := GetCircuit(name)
if err != nil {
cmd.errChan <- err
return cmd.errChan
}
//...
// 返还ticket
returnTicket := func() {
// ...
cmd.circuit.executorPool.Return(cmd.ticket)
}
// 上报执行状态
reportAllEvent := func() {
err := cmd.circuit.ReportEvent(cmd.events, cmd.start, cmd.runDuration)
// ...
}
go func() {
defer func() { cmd.finished <- true }()
// 查看断路器是否已打开
if !cmd.circuit.AllowRequest() {
// ...
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrCircuitOpen)
reportAllEvent()
})
return
}
// ...
// 获取ticket 如果得不到就限流
select {
case cmd.ticket = <-circuit.executorPool.Tickets:
ticketChecked = true
ticketCond.Signal()
cmd.Unlock()
default:
// ...
returnOnce.Do(func() {
returnTicket()
cmd.errorWithFallback(ctx, ErrMaxConcurrency)
reportAllEvent()
})
return
}
// 执行我们自已的方法,并上报执行信息
returnOnce.Do(func() {
defer reportAllEvent()
cmd.runDuration = time.Since(runStart)
returnTicket()
if runErr != nil {
cmd.errorWithFallback(ctx, runErr)
return
}
cmd.reportEvent("success")
})
}()
// 等待context是否被结束,或执行者超时,并上报
go func() {
timer := time.NewTimer(getSettings(name).Timeout)
defer timer.Stop()
select {
case <-cmd.finished:
// returnOnce has been executed in another goroutine
case <-ctx.Done():
// ...
return
case <-timer.C:
// ...
}
}()
return cmd.errChan
}
dashboard 可视化hystrix的上报信息
代码中StreamHandler
就是把所有断路器
的状态以流的方式不断的推送到dashboard. 这部分代码我就不用说了,很简单。 需要在你的服务端加3行代码,启动我们的流服务
hystrixStreamHandler := hystrix.NewStreamHandler()
hystrixStreamHandler.Start()
go http.ListenAndServe(net.JoinHostPort("", "81"), hystrixStreamHandler)
dashboard
我使用的是docker
版。
docker run -d -p 8888:9002 --name hystrix-dashboard mlabouardy/hystrix-dashboard:latest
在下面输入你服务的地址,我是 http://192.168.1.67:81/hystrix.stream
如果是集群可以使用Turbine进行监控,有时间大家自己来看吧
边栏推荐
猜你喜欢
解决多版本jar包冲突问题
新特性解读 | MySQL 8.0 GIPK 不可见主键
NeRF:火爆科研圈的三维重建技术大揭秘
二舅“反转”了?
研发运营一体化(DevOps)能力成熟度模型
MySQL索引
redis总结_分布式缓存
Security First: Tools You Need to Know to Implement DevSecOps Best Practices
Wechat Gymnasium Appointment Mini Program Graduation Design Finished Works (7) Mid-term Inspection Report
租房小程序自动定位城市
随机推荐
golang源码分析(19)简单编译器-计算器
Remember the stuck analysis of an industrial automation control system in .NET
Mini Program Graduation Works WeChat Gymnasium Reservation Mini Program Graduation Design Finished Product (8) Graduation Design Thesis Template
多聚体/壳聚糖修饰白蛋白纳米球/mPEG-HSA聚乙二醇人血清白蛋白纳米球的制备与研究
创新云集技术咖,工赋汇聚实战派:2022工赋开发者峰会
【案例】2D变换-旋转动画
golang源码分析(3):thrift
cpolar应用实例之多设备数据采集
Redis总结_实战篇
IDEA相关配置(特别完整)看完此篇就将所有的IDEA的相关配置都配置好了、设置鼠标滚轮修改字体大小、设置鼠标悬浮提示、设置主题、设置窗体及菜单的字体及字体大小、设置编辑区主题、通过插件更换主题
Go编译原理系列6(类型检查)
golang源码分析(6):sync.Mutex sync.RWMutex
ffmpeg cannot find libx264 after compilation
解决多版本jar包冲突问题
executeScript异步执行的时候没有返回值的原因
MySQL表的约束
Several common cross-domain solutions
Simulink脚本自动创建Autosar Parameter Port及Mapping
golang刷leetcode 经典(5)设计哈希集合
用函数递归的方法解决汉诺塔问题