当前位置:网站首页>golang 源码分析:juju/ratelimit
golang 源码分析:juju/ratelimit
2022-08-02 19:47:00 【用户9710217】
https://github.com/juju/ratelimit 是一个基于令牌桶算法的限流器:令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放。漏桶算法和令牌桶算法的主要区别在于,"漏桶算法"能够强行限制数据的传输速率(或请求频率),而"令牌桶算法"在能够限制数据的平均传输速率外,还允许某种程度的突发传输。
首先看下如何使用:
import "github.com/juju/ratelimit"
var tokenBucket ratelimit.Bucket = nil
func init() {
// func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
// fillInterval令牌填充的时间间隔
// capacity令牌桶的最大容量
tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)
}
func Handler() {
available := tokenBucket.TakeAvailable(1)
if available <= 0 {
// 限流处理
}
// handling
}下面看下源码实现,juju/ratelimit实现很简单,一共只有两个源码文件和一个测试文件:
ratelimit.go
ratelimit_test.go
reader.go下面我们分析下常用的这两个接口的实现:
1,ratelimit.NewBucket
传入的两个参数分别是产生令牌的的间隔和桶的容量。
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}默认一个间隔周期内就产生一个token,如果是高并发情况下,可以通过参数quantum控制产生多个。第三个参数是一个clock interface,主要是方便mock测试,如果传nil用的就是realClock{}
// Clock represents the passage of time in a way that
// can be faked out for tests.
type Clock interface {
// Now returns the current time.
Now() time.Time
// Sleep sleeps for at least the given duration.
Sleep(d time.Duration)
}realClock是实现了上述接口的结构体:
// realClock implements Clock in terms of standard time functions.
type realClock struct{}
// Now implements Clock.Now by calling time.Now.
func (realClock) Now() time.Time {
return time.Now()
}
// Now implements Clock.Sleep by calling time.Sleep.
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}上面几个函数仅仅是对这个函数的一个简单包装,加上默认参数,方便一般场景的使用,最终都是调用了这个函数
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if clock == nil {
clock = realClock{}
}
if fillInterval <= 0 {
panic("token bucket fill interval is not > 0")
}
if capacity <= 0 {
panic("token bucket capacity is not > 0")
}
if quantum <= 0 {
panic("token bucket quantum is not > 0")
}
return &Bucket{
clock: clock,
startTime: clock.Now(),
latestTick: 0,
fillInterval: fillInterval,
capacity: capacity,
quantum: quantum,
availableTokens: capacity,
}
}出来参数检验外,最后生成了结构体Bucket的指针
type Bucket struct {
clock Clock
// startTime holds the moment when the bucket was
// first created and ticks began.
startTime time.Time
// capacity holds the overall capacity of the bucket.
capacity int64
// quantum holds how many tokens are added on
// each tick.
quantum int64
// fillInterval holds the interval between each tick.
fillInterval time.Duration
// mu guards the fields below it.
mu sync.Mutex
// availableTokens holds the number of available
// tokens as of the associated latestTick.
// It will be negative when there are consumers
// waiting for tokens.
availableTokens int64
// latestTick holds the latest tick for which
// we know the number of tokens in the bucket.
latestTick int64
}Bucket里面出了存储初始化必要的参数外,多了两个变量:
availableTokens:当前可用的令牌数量
latestTick:从程序运行到上一次访问的时候,一共产生了多少次计数(如果quantum等于1的话 ,就是一共产生的令牌数量)
2,TakeAvailable
有一个参数,每次取的token数量,一般是一个,为了并发安全,一般会加锁:
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}调用了令牌桶计算的核心函数takeAvailable,第一个参数表示是当前时间,用于计算一共产生了多少个token:
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
}
tb.adjustavailableTokens(tb.currentTick(now))
if tb.availableTokens <= 0 {
return 0
}
if count > tb.availableTokens {
count = tb.availableTokens
}
tb.availableTokens -= count
return count
}其中tb.adjustavailableTokens(tb.currentTick(now))用于计算修改可用token数量availableTokens,如果availableTokens<=0,说明限流了;如果输入的count比availableTokens,我么最多只能获取availableTokens个token,获取后,我们把availableTokens减去已经使用的token数量。
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.startTime) / tb.fillInterval)
}计算出了从开始运行到,当前时间内时间一共跳变了多少次,也就是一共产生了多少次令牌。
func (tb *Bucket) adjustavailableTokens(tick int64) {
lastTick := tb.latestTick
tb.latestTick = tick
if tb.availableTokens >= tb.capacity {
return
}
tb.availableTokens += (tick - lastTick) * tb.quantum
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
return
}1,如果可用token数量大于等于令牌桶的容量,说明很长时间没有流量来获取token了,不用处理。
2,计算上一次获取token 到现时刻,产生的token数量,把它加到availableTokens上
3,如果availableTokens数量比capacity大,说明溢出了,修改availableTokens为capacity。
以上就是令牌桶算法的核心逻辑。当然,这个包还封装了一些其他的灵活的取令牌的接口,比如
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.take(tb.clock.Now(), count, maxWait)
}这个函数就是获取,在maxWait time.Duration超时的前提下,产生count个token,需要等待的时间间隔。
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
if count <= 0 {
return 0, true
}
tick := tb.currentTick(now)
tb.adjustavailableTokens(tick)
avail := tb.availableTokens - count
if avail >= 0 {
tb.availableTokens = avail
return 0, true
}
// Round up the missing tokens to the nearest multiple
// of quantum - the tokens won't be available until
// that tick.
// endTick holds the tick when all the requested tokens will
// become available.
endTick := tick + (-avail+tb.quantum-1)/tb.quantum
endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
waitTime := endTime.Sub(now)
if waitTime > maxWait {
return 0, false
}
tb.availableTokens = avail
return waitTime, true
}函数的前半部分和takeAvailable一模一样,后面逻辑表示,如果令牌不够的情况下:
1,计算还缺多少个令牌
2,计算缺这么多令牌需要跳变多少次
3,计算跳变这些次数需要的时间
4,判断需要的时间是否超时
还有一个wait接口,用来计算,获取count个令牌需要的时间,然后sleep这么长时间。
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
tb.clock.Sleep(d)
}
}以上就是令牌桶算法的核心源码实现,
ratelimit/reader.go里面实现了基于上述限流器实现的读限速和写限速,原理是通过读写buff的长度来控制Wait函数的等待时间,实现读写限速的
func (r *reader) Read(buf []byte) (int, error) {
n, err := r.r.Read(buf)
if n <= 0 {
return n, err
}
r.bucket.Wait(int64(n))
return n, err
}边栏推荐
- OpenCV开发中的内存管理问题
- Therapy | How to Identify and Deal with Negative Thoughts
- Three.js入门
- LeetCode 622 设计循环队列[数组 队列] HERODING的LeetCode之路
- golang刷leetcode 经典(9)为运算表达式设计优先级
- 扫码预约 | 观看Apache Linkis数据处理实践以及计算治理能力
- TPAMI2022 | TransCL: based on the study the compression of the Transformer, more flexible and more powerful
- MaxCompute 的SQL 引擎参数化视图具体有哪些增强功能?
- Detailed explanation of common examples of dynamic programming
- Redis集群配置
猜你喜欢
随机推荐
golang刷leetcode 经典(12) 完全二叉树插入器
【LeetCode】1374. 生成每种字符都是奇数个的字符串
shell:条件语句
Kali命令ifconfig报错command not found
服务器Centos7 静默安装Oracle Database 12.2
Redis cluster configuration
Lvm逻辑卷
让你的应用完美适配平板
es 读流程源码解析
golang刷leetcode 经典(13) 最小高度树
In action: 10 ways to implement delayed tasks, with code!
4 kmiles join YiSheng group, with more strong ability of digital business, accelerate China's cross-border electricity full domain full growth
TPAMI2022 | TransCL:基于Transformer的压缩学习,更灵活更强大
Thread线程类基本使用(上)
Electron使用指南之初体验
Leetcode刷题——字符串相加相关题目(415. 字符串相加、面试题 02.05. 链表求和、2. 两数相加)
姑姑:给小学生出点口算题
You want the metagenomics - microbiome knowledge in all the (2022.8)
分布式事务
Soft Exam ----- UML Design and Analysis (Part 2)









