当前位置:网站首页>golang source code analysis: uber-go/ratelimit
golang source code analysis: uber-go/ratelimit
2022-08-02 23:05:00 【User 9710217】
https://github.com/uber-go/ratelimit Is a bucket of current limiter to realize,
rl := ratelimit.New(100) // per second
prev := time.Now()
for i := 0; i < 10; i++ {
now := rl.Take()
fmt.Println(i, now.Sub(prev))
prev = now
}在这个例子中,我们给定限流器每秒可以通过 100 个请求,也就是The average request interval 10ms.因此,最终会每 10ms 打印一行数据.输出结果如下:
// Output:
// 0 0
// 1 10ms
// 2 10msThe whole package in the source code is as follows:
example_test.go
limiter_atomic.go
limiter_mutexbased.go
ratelimit.go
ratelimit_bench_test.go
ratelimit_test.go1,ratelimit.New
Look at the first initialization process:
// New returns a Limiter that will limit to the given RPS.
func New(rate int, opts ...Option) Limiter {
return newAtomicBased(rate, opts...)
}传入的参数是1s内产生的token数量:
// newAtomicBased returns a new atomic based limiter.
func newAtomicBased(rate int, opts ...Option) *atomicLimiter {
// TODO consider moving config building to the implementation
// independent code.
config := buildConfig(opts)
perRequest := config.per / time.Duration(rate)
l := &atomicLimiter{
perRequest: perRequest,
maxSlack: -1 * time.Duration(config.slack) * perRequest,
clock: config.clock,
}
initialState := state{
last: time.Time{},
sleepFor: 0,
}
atomic.StorePointer(&l.state, unsafe.Pointer(&initialState))
return l
}1,通过options修改配置参数 config := buildConfig(opts)
func buildConfig(opts []Option) config {
c := config{
clock: clock.New(),
slack: 10,
per: time.Second,
}
for _, opt := range opts {
opt.apply(&c)
}
return c
}可以看到,默认情况下per是1s
2,Computed a token money(时间间隔)
perRequest := config.per / time.Duration(rate)
3,初始化atomicLimiter,Tokens generated time interval,时钟
type atomicLimiter struct {
state unsafe.Pointer
//lint:ignore U1000 Padding is unused but it is crucial to maintain performance
// of this rate limiter in case of collocation with other frequently accessed memory.
padding [56]byte // cache line size - state pointer size = 64 - 8; created to avoid false sharing.
perRequest time.Duration
maxSlack time.Duration
clock Clock
}4,Record the initialization state:The current time and sleep time
type state struct {
last time.Time
sleepFor time.Duration
}After complete the initialization process,We have entered the token of the process.
2,rl.Take
Take是一个接口,返回当前时间
// Limiter is used to rate-limit some process, possibly across goroutines.
// The process is expected to call Take() before every iteration, which
// may block to throttle the goroutine.
type Limiter interface {
// Take should block to make sure that the RPS is met.
Take() time.Time
}atomicLimiter 实现了这个接口
// Take blocks to ensure that the time spent between multiple
// Take calls is on average time.Second/rate.
func (t *atomicLimiter) Take() time.Time {
var (
newState state
taken bool
interval time.Duration
)
for !taken {
now := t.clock.Now()
previousStatePointer := atomic.LoadPointer(&t.state)
oldState := (*state)(previousStatePointer)
newState = state{
last: now,
sleepFor: oldState.sleepFor,
}
// If this is our first request, then we allow it.
if oldState.last.IsZero() {
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
continue
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
newState.sleepFor += t.perRequest - now.Sub(oldState.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if newState.sleepFor < t.maxSlack {
newState.sleepFor = t.maxSlack
}
if newState.sleepFor > 0 {
newState.last = newState.last.Add(newState.sleepFor)
interval, newState.sleepFor = newState.sleepFor, 0
}
taken = atomic.CompareAndSwapPointer(&t.state, previousStatePointer, unsafe.Pointer(&newState))
}
t.clock.Sleep(interval)
return newState.last
}1,获取当前时间
2,If it is initialized state,That is the last access time is0,Then set the last access time is the time,直接返回.
3,Calculation of the sleep time,睡眠时间=The previous record of sleep+Each token of interval-(当前时间-上一次访问时间),Is the access time interval
4,If sleep time is less thanmaxSlack,Description the request volume is small,From the last access time for a long time,Amend the sleep timemaxSlack,Otherwise unable to cope with a large number of sudden traffic.
5,If sleep time is greater than0,That request quantity is big,Need to wait for a while to return,调用 t.clock.Sleep(t.sleepFor),进入睡眠状态,At the same time modify the last access time and sleep time
6,如果小于等于0,That request is,可以立即返回,并记录当前时间
mutexLimiter Also implements the interface:
type mutexLimiter struct {
sync.Mutex
last time.Time
sleepFor time.Duration
perRequest time.Duration
maxSlack time.Duration
clock Clock
}The difference is one is based on mutexes implement,One is based on atomic operation to realize
func (t *mutexLimiter) Take() time.Time {
t.Lock()
defer t.Unlock()
now := t.clock.Now()
// If this is our first request, then we allow it.
if t.last.IsZero() {
t.last = now
return t.last
}
// sleepFor calculates how much time we should sleep based on
// the perRequest budget and how long the last request took.
// Since the request may take longer than the budget, this number
// can get negative, and is summed across requests.
t.sleepFor += t.perRequest - now.Sub(t.last)
// We shouldn't allow sleepFor to get too negative, since it would mean that
// a service that slowed down a lot for a short period of time would get
// a much higher RPS following that.
if t.sleepFor < t.maxSlack {
t.sleepFor = t.maxSlack
}
// If sleepFor is positive, then we should sleep now.
if t.sleepFor > 0 {
t.clock.Sleep(t.sleepFor)
t.last = now.Add(t.sleepFor)
t.sleepFor = 0
} else {
t.last = now
}
return t.last
}Leaky Bucket,每个请求的间隔是固定的,然而,在实际上的互联网应用中,流量经常是突发性的.对于这种情况,uber-go 对 Leaky Bucket 做了一些改良,引入了最大松弛量 (maxSlack) 的概念.We understand the overall background: If we request per second limit 100 个请求,The average request interval 10ms.但是实际情况下,Some request interval is longer,Some request interval shorter.
(1)当 t.sleepFor > 0,On behalf of the previous request that extra time,Can't completely offset the required quantity,因此需要 sleep 相应时间, 同时将 t.sleepFor 置为 0.
(2)当 t.sleepFor < 0,Describe the request more than expected intervals,Will accumulate the extra time to t.sleepFor 即可.
但是,对于某种情况,请求 1 完成后,请求 2 After a long time to arrive (好几个小时都有可能),So for the request at this time 2 的请求间隔 now.Sub(t.last),会非常大.Behind that even if a large number of requests arrive instantaneous,Also can't offset all the time.It thus lost the meaning of the current limiting.
为了防止这种情况,ratelimit Introduced maximum relaxation (maxSlack) 的概念, 该值为负值,表示允许抵消的最长时间,防止以上情况的出现.
边栏推荐
猜你喜欢
随机推荐
ALV report learning summary
js Fetch返回数据res.json()报错问题
SQL Server安装教程
APP自动化uiautomator2获取toast
JWT学习
shell:条件语句
遇上Mysql亿级优化,怎么办
Electron User Guide Beginning Experience
ECCV 2022 | 通往数据高效的Transformer目标检测器
Five data structures of Redis and their corresponding usage scenarios
unittest自动化测试框架总结
golang刷leetcode 经典(10) tire树与ac自动机
Parse common methods in the Collection interface that are overridden by subclasses
Kali命令ifconfig报错command not found
Redis 5 种数据结构及对应使用场景
KDD 2022 | 深度图神经网络中的特征过相关:一个新视角
【StoneDB性能相关工具】内存监控
LM小型可编程控制器软件(基于CoDeSys)笔记二十五:plc的数据存储区(数字量输入通道部分)
PG 之 SQL执行计划
云平台简介









