当前位置:网站首页>Golang source code analysis: juju/ratelimit
Golang source code analysis: juju/ratelimit
2022-08-02 23:02:00 【The user 9710217】
https://github.com/juju/ratelimit It is a current limiter based on the token bucket algorithm:令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放.漏桶算法和令牌桶算法的主要区别在于,"漏桶算法"能够强行限制数据的传输速率(或请求频率),而"令牌桶算法"在能够限制数据的平均传输速率外,还允许某种程度的突发传输.
首先看下如何使用:
import "github.com/juju/ratelimit"
var tokenBucket ratelimit.Bucket = nil
func init() {
// func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
// fillIntervalThe time interval for token filling
// capacity令牌桶的最大容量
tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)
}
func Handler() {
available := tokenBucket.TakeAvailable(1)
if available <= 0 {
// 限流处理
}
// handling
}
下面看下源码实现,juju/ratelimit实现很简单,There are only two source files and one test file in total:
ratelimit.go
ratelimit_test.go
reader.go
Below we analyze the implementation of these two commonly used interfaces:
1,ratelimit.NewBucket
The two parameters passed in are the interval at which the token is generated and the capacity of the bucket.
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}
func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}
By default, one is generated within an interval periodtoken,In the case of high concurrency,可以通过参数quantumControl spawns multiple.第三个参数是一个clock interface,主要是方便mock测试,如果传nil用的就是realClock{}
// Clock represents the passage of time in a way that
// can be faked out for tests.
type Clock interface {
// Now returns the current time.
Now() time.Time
// Sleep sleeps for at least the given duration.
Sleep(d time.Duration)
}
realClockis a structure that implements the above interface:
// realClock implements Clock in terms of standard time functions.
type realClock struct{}
// Now implements Clock.Now by calling time.Now.
func (realClock) Now() time.Time {
return time.Now()
}
// Now implements Clock.Sleep by calling time.Sleep.
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}
The above functions are just a simple wrapper around this function,Plus default parameters,It is convenient to use in general scenes,In the end, this function is called
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if clock == nil {
clock = realClock{}
}
if fillInterval <= 0 {
panic("token bucket fill interval is not > 0")
}
if capacity <= 0 {
panic("token bucket capacity is not > 0")
}
if quantum <= 0 {
panic("token bucket quantum is not > 0")
}
return &Bucket{
clock: clock,
startTime: clock.Now(),
latestTick: 0,
fillInterval: fillInterval,
capacity: capacity,
quantum: quantum,
availableTokens: capacity,
}
}
Out of parameter inspection,Finally the structure is generatedBucket的指针
type Bucket struct {
clock Clock
// startTime holds the moment when the bucket was
// first created and ticks began.
startTime time.Time
// capacity holds the overall capacity of the bucket.
capacity int64
// quantum holds how many tokens are added on
// each tick.
quantum int64
// fillInterval holds the interval between each tick.
fillInterval time.Duration
// mu guards the fields below it.
mu sync.Mutex
// availableTokens holds the number of available
// tokens as of the associated latestTick.
// It will be negative when there are consumers
// waiting for tokens.
availableTokens int64
// latestTick holds the latest tick for which
// we know the number of tokens in the bucket.
latestTick int64
}
BucketIt contains the necessary parameters for storage initialization,There are two more variables:
availableTokens:The number of tokens currently available
latestTick:From the time the program was run to the time of the last access,How many counts were generated in total(如果quantum等于1的话 ,is the total number of tokens generated)
2,TakeAvailable
有一个参数,每次取的token数量,一般是一个,为了并发安全,Usually locked:
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}
The core function of token bucket calculation is calledtakeAvailable,The first parameter indicates the current time,Used to calculate how many are generated in totaltoken:
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
}
tb.adjustavailableTokens(tb.currentTick(now))
if tb.availableTokens <= 0 {
return 0
}
if count > tb.availableTokens {
count = tb.availableTokens
}
tb.availableTokens -= count
return count
}
其中tb.adjustavailableTokens(tb.currentTick(now))Modifications for calculations are availabletoken数量availableTokens,如果availableTokens<=0,说明限流了;如果输入的count比availableTokens,I can only get at mostavailableTokens个token,获取后,我们把availableTokensminus what is already usedtoken数量.
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.startTime) / tb.fillInterval)
}
Calculated from start to run,How many times the time has jumped in the current time,That is, how many tokens are generated in total.
func (tb *Bucket) adjustavailableTokens(tick int64) {
lastTick := tb.latestTick
tb.latestTick = tick
if tb.availableTokens >= tb.capacity {
return
}
tb.availableTokens += (tick - lastTick) * tb.quantum
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
return
}
1,如果可用tokenThe number is greater than or equal to the capacity of the token bucket,It means that there is no traffic to obtain for a long timetoken了,不用处理.
2,Calculate the last fetchtoken up to now,产生的token数量,把它加到availableTokens上
3,如果availableTokens数量比capacity大,说明溢出了,修改availableTokens为capacity.
The above is the core logic of the token bucket algorithm.当然,This package also encapsulates some other flexible token-fetching interfaces,比如
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.take(tb.clock.Now(), count, maxWait)
}
这个函数就是获取,在maxWait time.Durationunder the premise of timeout,产生count个token,The time interval to wait.
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
if count <= 0 {
return 0, true
}
tick := tb.currentTick(now)
tb.adjustavailableTokens(tick)
avail := tb.availableTokens - count
if avail >= 0 {
tb.availableTokens = avail
return 0, true
}
// Round up the missing tokens to the nearest multiple
// of quantum - the tokens won't be available until
// that tick.
// endTick holds the tick when all the requested tokens will
// become available.
endTick := tick + (-avail+tb.quantum-1)/tb.quantum
endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
waitTime := endTime.Sub(now)
if waitTime > maxWait {
return 0, false
}
tb.availableTokens = avail
return waitTime, true
}
The first half of the function sumstakeAvailable一模一样,Logical representation behind,If the token is not enough:
1,Calculate how many tokens are still missing
2,Calculate how many hops it takes to be short of so many tokens
3,Calculate the time required to jump these times
4,Determine whether the required time has expired
还有一个wait接口,用来计算,获取counttime required for each token,然后sleep这么长时间.
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
tb.clock.Sleep(d)
}
}
The above is the core source code implementation of the token bucket algorithm,
ratelimit/reader.go
It implements the read speed limit and write speed limit based on the above current limiter,The principle is through reading and writingbuff的长度来控制Wait函数的等待时间,Realize read and write speed limit
func (r *reader) Read(buf []byte) (int, error) {
n, err := r.r.Read(buf)
if n <= 0 {
return n, err
}
r.bucket.Wait(int64(n))
return n, err
}
边栏推荐
- 【LeetCode】622. 设计循环队列
- J9 Digital Currency Theory: Identifying Web3's New Scarcity: Open Source Developers
- SQL 入门之第一讲——MySQL 8.0.29安装教程(windows 64位)
- AI Scientist: Automatically discover hidden state variables of physical systems
- golang源码分析之geoip2-golang
- EMQX Newsletter 2022-07|EMQX 5.0 正式发布、EMQX Cloud 新增 2 个数据库集成
- ALV概念讲解
- Meta 与苹果的元宇宙碰撞
- Triacetin是什么化学材料
- SQL 嵌套 N 层太长太难写怎么办?
猜你喜欢
2022-07-28
SQL 入门之第一讲——MySQL 8.0.29安装教程(windows 64位)
A Review of Nature Microbiology: Focusing on the Algae--Ecological Interface of Phytoplankton-Bacteria Interactions
Parse common methods in the Collection interface that are overridden by subclasses
4 kmiles join YiSheng group, with more strong ability of digital business, accelerate China's cross-border electricity full domain full growth
Therapy | How to Identify and Deal with Negative Thoughts
谷歌竞价机器学习如何去理解?
Kali命令ifconfig报错command not found
J9 digital theory: the Internet across chain bridge has what effect?
程序员也许都缺一个“二舅”精神
随机推荐
J9数字货币论:识别Web3新的稀缺性:开源开发者
ShardingSphere-proxy +PostgreSQL实现读写分离(静态策略)
2022-07-26
网络协议介绍
所谓武功再高也怕菜刀-分区、分库、分表和分布式的优劣
基于 outline 实现头像剪裁以及预览
【 LeetCode 】 1374. Generate each character string is an odd number
MaxCompute 的SQL 引擎参数化视图具体有哪些增强功能?
六石管理学:入门机会只有一次,先把产品做好
leetcode刷题记录:7.整数反转,8.字符串转整数,9.回文数
Leetcode刷题——字符串相加相关题目(415. 字符串相加、面试题 02.05. 链表求和、2. 两数相加)
LeetCode 622 设计循环队列[数组 队列] HERODING的LeetCode之路
Thread线程类基本使用(下)
MOSN 反向通道详解
解析List接口中的常用的被实现子类重写的方法
【手撕AHB-APB Bridge】~ AMBA总线 之 APB
聊一聊 AS 的一些好用的功能
image could not be accessed on a registry to record its digest
软件测试分类
2022-07-28