当前位置:网站首页>golang 源码分析:juju/ratelimit
golang 源码分析:juju/ratelimit
2022-08-02 19:47:00 【用户9710217】
https://github.com/juju/ratelimit 是一个基于令牌桶算法的限流器:令牌桶就是想象有一个固定大小的桶,系统会以恒定速率向桶中放 Token,桶满则暂时不放。漏桶算法和令牌桶算法的主要区别在于,"漏桶算法"能够强行限制数据的传输速率(或请求频率),而"令牌桶算法"在能够限制数据的平均传输速率外,还允许某种程度的突发传输。
首先看下如何使用:
import "github.com/juju/ratelimit"
var tokenBucket ratelimit.Bucket = nil
func init() {
// func NewBucket(fillInterval time.Duration, capacity int64) *Bucket
// fillInterval令牌填充的时间间隔
// capacity令牌桶的最大容量
tokenBucket = ratelimit.NewBucket(200time.Millisecond, 20)
}
func Handler() {
available := tokenBucket.TakeAvailable(1)
if available <= 0 {
// 限流处理
}
// handling
}下面看下源码实现,juju/ratelimit实现很简单,一共只有两个源码文件和一个测试文件:
ratelimit.go
ratelimit_test.go
reader.go下面我们分析下常用的这两个接口的实现:
1,ratelimit.NewBucket
传入的两个参数分别是产生令牌的的间隔和桶的容量。
func NewBucket(fillInterval time.Duration, capacity int64) *Bucket {
return NewBucketWithClock(fillInterval, capacity, nil)
}func NewBucketWithClock(fillInterval time.Duration, capacity int64, clock Clock) *Bucket {
return NewBucketWithQuantumAndClock(fillInterval, capacity, 1, clock)
}默认一个间隔周期内就产生一个token,如果是高并发情况下,可以通过参数quantum控制产生多个。第三个参数是一个clock interface,主要是方便mock测试,如果传nil用的就是realClock{}
// Clock represents the passage of time in a way that
// can be faked out for tests.
type Clock interface {
// Now returns the current time.
Now() time.Time
// Sleep sleeps for at least the given duration.
Sleep(d time.Duration)
}realClock是实现了上述接口的结构体:
// realClock implements Clock in terms of standard time functions.
type realClock struct{}
// Now implements Clock.Now by calling time.Now.
func (realClock) Now() time.Time {
return time.Now()
}
// Now implements Clock.Sleep by calling time.Sleep.
func (realClock) Sleep(d time.Duration) {
time.Sleep(d)
}上面几个函数仅仅是对这个函数的一个简单包装,加上默认参数,方便一般场景的使用,最终都是调用了这个函数
func NewBucketWithQuantumAndClock(fillInterval time.Duration, capacity, quantum int64, clock Clock) *Bucket {
if clock == nil {
clock = realClock{}
}
if fillInterval <= 0 {
panic("token bucket fill interval is not > 0")
}
if capacity <= 0 {
panic("token bucket capacity is not > 0")
}
if quantum <= 0 {
panic("token bucket quantum is not > 0")
}
return &Bucket{
clock: clock,
startTime: clock.Now(),
latestTick: 0,
fillInterval: fillInterval,
capacity: capacity,
quantum: quantum,
availableTokens: capacity,
}
}出来参数检验外,最后生成了结构体Bucket的指针
type Bucket struct {
clock Clock
// startTime holds the moment when the bucket was
// first created and ticks began.
startTime time.Time
// capacity holds the overall capacity of the bucket.
capacity int64
// quantum holds how many tokens are added on
// each tick.
quantum int64
// fillInterval holds the interval between each tick.
fillInterval time.Duration
// mu guards the fields below it.
mu sync.Mutex
// availableTokens holds the number of available
// tokens as of the associated latestTick.
// It will be negative when there are consumers
// waiting for tokens.
availableTokens int64
// latestTick holds the latest tick for which
// we know the number of tokens in the bucket.
latestTick int64
}Bucket里面出了存储初始化必要的参数外,多了两个变量:
availableTokens:当前可用的令牌数量
latestTick:从程序运行到上一次访问的时候,一共产生了多少次计数(如果quantum等于1的话 ,就是一共产生的令牌数量)
2,TakeAvailable
有一个参数,每次取的token数量,一般是一个,为了并发安全,一般会加锁:
func (tb *Bucket) TakeAvailable(count int64) int64 {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.takeAvailable(tb.clock.Now(), count)
}调用了令牌桶计算的核心函数takeAvailable,第一个参数表示是当前时间,用于计算一共产生了多少个token:
func (tb *Bucket) takeAvailable(now time.Time, count int64) int64 {
if count <= 0 {
return 0
}
tb.adjustavailableTokens(tb.currentTick(now))
if tb.availableTokens <= 0 {
return 0
}
if count > tb.availableTokens {
count = tb.availableTokens
}
tb.availableTokens -= count
return count
}其中tb.adjustavailableTokens(tb.currentTick(now))用于计算修改可用token数量availableTokens,如果availableTokens<=0,说明限流了;如果输入的count比availableTokens,我么最多只能获取availableTokens个token,获取后,我们把availableTokens减去已经使用的token数量。
func (tb *Bucket) currentTick(now time.Time) int64 {
return int64(now.Sub(tb.startTime) / tb.fillInterval)
}计算出了从开始运行到,当前时间内时间一共跳变了多少次,也就是一共产生了多少次令牌。
func (tb *Bucket) adjustavailableTokens(tick int64) {
lastTick := tb.latestTick
tb.latestTick = tick
if tb.availableTokens >= tb.capacity {
return
}
tb.availableTokens += (tick - lastTick) * tb.quantum
if tb.availableTokens > tb.capacity {
tb.availableTokens = tb.capacity
}
return
}1,如果可用token数量大于等于令牌桶的容量,说明很长时间没有流量来获取token了,不用处理。
2,计算上一次获取token 到现时刻,产生的token数量,把它加到availableTokens上
3,如果availableTokens数量比capacity大,说明溢出了,修改availableTokens为capacity。
以上就是令牌桶算法的核心逻辑。当然,这个包还封装了一些其他的灵活的取令牌的接口,比如
func (tb *Bucket) TakeMaxDuration(count int64, maxWait time.Duration) (time.Duration, bool) {
tb.mu.Lock()
defer tb.mu.Unlock()
return tb.take(tb.clock.Now(), count, maxWait)
}这个函数就是获取,在maxWait time.Duration超时的前提下,产生count个token,需要等待的时间间隔。
func (tb *Bucket) take(now time.Time, count int64, maxWait time.Duration) (time.Duration, bool) {
if count <= 0 {
return 0, true
}
tick := tb.currentTick(now)
tb.adjustavailableTokens(tick)
avail := tb.availableTokens - count
if avail >= 0 {
tb.availableTokens = avail
return 0, true
}
// Round up the missing tokens to the nearest multiple
// of quantum - the tokens won't be available until
// that tick.
// endTick holds the tick when all the requested tokens will
// become available.
endTick := tick + (-avail+tb.quantum-1)/tb.quantum
endTime := tb.startTime.Add(time.Duration(endTick) * tb.fillInterval)
waitTime := endTime.Sub(now)
if waitTime > maxWait {
return 0, false
}
tb.availableTokens = avail
return waitTime, true
}函数的前半部分和takeAvailable一模一样,后面逻辑表示,如果令牌不够的情况下:
1,计算还缺多少个令牌
2,计算缺这么多令牌需要跳变多少次
3,计算跳变这些次数需要的时间
4,判断需要的时间是否超时
还有一个wait接口,用来计算,获取count个令牌需要的时间,然后sleep这么长时间。
func (tb *Bucket) Wait(count int64) {
if d := tb.Take(count); d > 0 {
tb.clock.Sleep(d)
}
}以上就是令牌桶算法的核心源码实现,
ratelimit/reader.go里面实现了基于上述限流器实现的读限速和写限速,原理是通过读写buff的长度来控制Wait函数的等待时间,实现读写限速的
func (r *reader) Read(buf []byte) (int, error) {
n, err := r.r.Read(buf)
if n <= 0 {
return n, err
}
r.bucket.Wait(int64(n))
return n, err
}边栏推荐
- Five data structures of Redis and their corresponding usage scenarios
- LeetCode 622 设计循环队列[数组 队列] HERODING的LeetCode之路
- 【LeetCode】1374. 生成每种字符都是奇数个的字符串
- Wintun:一款惊艳的 WireGuard 虚拟网卡接口驱动
- 2022-08-01
- LM小型可编程控制器软件(基于CoDeSys)笔记二十五:plc的数据存储区(数字量输入通道部分)
- LeetCode:622. 设计循环队列【模拟循环队列】
- golang刷leetcode动态规划(12)最小路径和
- golang刷leetcode 数学(1) 丑数系列
- 当TIME_WAIT状态的TCP正常挥手,收到SYN后…
猜你喜欢

基于 outline 实现头像剪裁以及预览

J9 digital theory: the Internet across chain bridge has what effect?

Redis 5 种数据结构及对应使用场景

unittest自动化测试框架总结

MySQL安装(详细,适合小白)

AI科学家:自动发现物理系统的隐藏状态变量

AI Scientist: Automatically discover hidden state variables of physical systems

Flutter with internationalized adapter automatically generated
分享一个 web 应用版本监测 (更新) 的工具库

服务器Centos7 静默安装Oracle Database 12.2
随机推荐
【LeetCode】1374. 生成每种字符都是奇数个的字符串
golang刷leetcode 经典(12) 完全二叉树插入器
第一次进入前20名
所谓武功再高也怕菜刀-分区、分库、分表和分布式的优劣
2022-07-27
es DELETE index 源码分析
Five data structures of Redis and their corresponding usage scenarios
Mysql安装流程 【压缩版】
es 官方诊断工具
ECCV 2022 | 通往数据高效的Transformer目标检测器
Wintun:一款惊艳的 WireGuard 虚拟网卡接口驱动
腾讯云孟凡杰:我所经历的云原生降本增效最佳实践案例
Metaverse 001 | Can't control your emotions?The Metaverse is here to help you
What is a Field Service Management System (FSM)?what is the benefit?
Cannot find declaration to go to
ALV概念讲解
实战:10 种实现延迟任务的方法,附代码!
软考 ----- UML设计与分析(下)
云平台简介
Flutter with internationalized adapter automatically generated