当前位置:网站首页>Golang source code analysis: time/rate
Golang source code analysis: time/rate
2022-08-02 23:05:00 【User 9710217】
这是golang The current limiter implemented in the source code,It is based on the token bucket algorithm:
官方地址: golang.org/x/time/rate
github地址:github.com/golang/time/rate
r := rate.Every(100 * time.Millisecond)
limit := rate.NewLimiter(r, 20)
for {
if limit.AllowN(time.Now(), 8) {
log.Info("log:event happen")
} else {
log.Info("log:event not allow")
}
}
Generated in one second10 个令牌,桶的容量是20,Take the current time8个token
The source code is very simple with only two files:
rate.go
rate_test.go
1,NewLimiter
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
Simply constructed onelimiter对象
type Limiter struct {
mu sync.Mutex
limit Limit
burst int
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
The last distribution was recordedtoken的时间,and the last requesttoken的时间
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
Only the conversion from time interval to frequency is done.
2,AllowN/Allow
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
底层都是调用了reserveN函数,maxFutureReserve参数传的是0
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
1,如果lim.limit == Inf,返回Reservation对象
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens int
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
limit Limit
}
2,获取当前时间,上一次产生token的时间和,产生的token
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Calculate the new number of tokens, due to time that passed.
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
A,If the current time is older than the last timetoken时间早(Indicates that there is a request waiting to be obtainedtoken),Then update the current time to the last acquisitiontoken时间(Wait with the previous request)
B,Calculated from the last acquisitiontokentime interval up to now C,计算产生的token增量
delta := lim.limit.tokensFromDuration(elapsed)
type Limit float64
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}
That is, the number of seconds in the interval multiplied by each secondtoken数量.
D,计算总的token数量
E,如果桶已经满了,Discard excesstoken
3,Deductions are required for this requesttoken
4,如果token数不够,Calculate the time interval to wait
5,如果请求的tokenThe quantity is smaller than the capacity of the bucket,And the waiting time is greater than the waiting time, indicating that the request is legal.
ok := n <= lim.burst && waitDuration <= maxFutureReserve
6,构造Reservation对象,存储当前limiter对象到lim
7,如果请求合法,Stores what is needed for the current requesttokenQuantity and waiting time(当前时间+等待时间间隔)
8,如果合法,更新当前limiterof the last acquisitiontoken时间为当前时间,获取的tokenThe quantity is the remainder after deductiontoken数量,获取tokenTime for the future can really gettoken的时间点.
9,否则更新limiterof the last acquisitiontokenThe time is the last acquisition of this calculationtoken时间.
上面就是获取tokenAll code implementations.
Limiter提供了三类方法供用户消费Token,用户可以每次消费一个Token,也可以一次性消费多个Token.
1,AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token.反之返回不消费 token,false.That is, the method described above.
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
2,当使用 Wait 方法消费 token 时,如果此时桶内 token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 token 满足条件.如果充足则直接返回.
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
// Check if ctx is already cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// Reserve
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
A,If the number of requests exceeds the capacity of the bucket,直接报错
B,通过ctx.Deadline()Calculates the time interval allowed to wait
C,调用r := lim.reserveN(now, n, waitLimit) 获取Reserve对象
D,如果reserveObject representation could not succeed(Exceeded barrel capacity,超出时间限制),返回错误
E,计算需要等待的时间,timeToActIndicates that it can be obtainedtoken的时间.
// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
// InfDuration means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok {
return InfDuration
}
delay := r.timeToAct.Sub(now)
if delay < 0 {
return 0
}
return delay
}
F,Start the timer to wait.
3,ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 token 是否充足,都会返回一个 Reservation * 对象.
你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间.如果等待时间为 0,则说明不用等待.
必须等到等待时间之后,才能进行接下来的工作.
或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 token 归还.
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
This method is more primitive and returns directlyReserve对象,交给用户处理
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
边栏推荐
- 技术分享 | Apache Linkis 快速集成网页IDE工具 Scriptis
- The so-called fighting skill again gao also afraid of the chopper - partition, depots, table, and the merits of the distributed
- 【LeetCode】1374. 生成每种字符都是奇数个的字符串
- Shell: conditional statements
- J9 Digital Currency Theory: Identifying Web3's New Scarcity: Open Source Developers
- 即时通讯开发移动端网络短连接的优化手段
- golang源码分析之geoip2-golang
- [安洵杯 2019]easy_web
- MaxCompute 近期发布上线的版本的 SQL 引擎新功能参数化视图有什么优势?
- [AnXun cup 2019] easy_web
猜你喜欢
B站HR对面试者声称其核心用户都是生活中的Loser
Office2021 安装MathType
SCANIA SCANIA OTL tag is introduced
Redis cluster configuration
Leetcode刷题——单调栈问题(739每日温度问题、496下一个更大元素I、503下一个更大元素 II)
牛客题目——滑动窗口的最大值、矩阵最长递增路径、顺时针旋转矩阵、接雨水问题
J9数字论:互联网跨链桥有什么作用呢?
el-tree渲染大量数据的解决方案(不通过懒加载)
What is a Field Service Management System (FSM)?what is the benefit?
shell:条件语句
随机推荐
LeetCode:622. 设计循环队列【模拟循环队列】
网络协议介绍
分享一个 web 应用版本监测 (更新) 的工具库
训练双塔检索模型,可以不用query-doc样本了?明星机构联合发文
J9数字论:互联网跨链桥有什么作用呢?
【LeetCode】622. 设计循环队列
PyTorch分布式backends
SQL 入门之第一讲——MySQL 8.0.29安装教程(windows 64位)
谷歌竞价机器学习如何去理解?
溜不溜是个问题
对话亚洲高校首个博士论文奖-裘捷中丨KDD2022
golang源码分析:time/rate
即时通讯开发移动端网络短连接的优化手段
Geoserver+mysql+openlayers2
程序员也许都缺一个“二舅”精神
Fetch 请求不转换BLOB正常显示GBK编码的数据
Translate My Wonderful | July Moli Translation Program Winners Announced
setup syntax sugar defineProps defineEmits defineExpose
arm64麒麟安装paddlehub(国产化)
es 读流程源码解析