当前位置:网站首页>Golang source code analysis: time/rate
Golang source code analysis: time/rate
2022-08-02 23:05:00 【User 9710217】
这是golang The current limiter implemented in the source code,It is based on the token bucket algorithm:
官方地址: golang.org/x/time/rate
github地址:github.com/golang/time/rate
r := rate.Every(100 * time.Millisecond)
limit := rate.NewLimiter(r, 20)
for {
if limit.AllowN(time.Now(), 8) {
log.Info("log:event happen")
} else {
log.Info("log:event not allow")
}
}
Generated in one second10 个令牌,桶的容量是20,Take the current time8个token
The source code is very simple with only two files:
rate.go
rate_test.go
1,NewLimiter
// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
return &Limiter{
limit: r,
burst: b,
}
}
Simply constructed onelimiter对象
type Limiter struct {
mu sync.Mutex
limit Limit
burst int
tokens float64
// last is the last time the limiter's tokens field was updated
last time.Time
// lastEvent is the latest time of a rate-limited event (past or future)
lastEvent time.Time
}
The last distribution was recordedtoken的时间,and the last requesttoken的时间
func Every(interval time.Duration) Limit {
if interval <= 0 {
return Inf
}
return 1 / Limit(interval.Seconds())
}
Only the conversion from time interval to frequency is done.
2,AllowN/Allow
// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
return lim.AllowN(time.Now(), 1)
}
// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
return lim.reserveN(now, n, 0).ok
}
底层都是调用了reserveN函数,maxFutureReserve参数传的是0
// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
lim.mu.Lock()
if lim.limit == Inf {
lim.mu.Unlock()
return Reservation{
ok: true,
lim: lim,
tokens: n,
timeToAct: now,
}
}
now, last, tokens := lim.advance(now)
// Calculate the remaining number of tokens resulting from the request.
tokens -= float64(n)
// Calculate the wait duration
var waitDuration time.Duration
if tokens < 0 {
waitDuration = lim.limit.durationFromTokens(-tokens)
}
// Decide result
ok := n <= lim.burst && waitDuration <= maxFutureReserve
// Prepare reservation
r := Reservation{
ok: ok,
lim: lim,
limit: lim.limit,
}
if ok {
r.tokens = n
r.timeToAct = now.Add(waitDuration)
}
// Update state
if ok {
lim.last = now
lim.tokens = tokens
lim.lastEvent = r.timeToAct
} else {
lim.last = last
}
lim.mu.Unlock()
return r
}
1,如果lim.limit == Inf,返回Reservation对象
// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
ok bool
lim *Limiter
tokens int
timeToAct time.Time
// This is the Limit at reservation time, it can change later.
limit Limit
}
2,获取当前时间,上一次产生token的时间和,产生的token
// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
last := lim.last
if now.Before(last) {
last = now
}
// Calculate the new number of tokens, due to time that passed.
elapsed := now.Sub(last)
delta := lim.limit.tokensFromDuration(elapsed)
tokens := lim.tokens + delta
if burst := float64(lim.burst); tokens > burst {
tokens = burst
}
return now, last, tokens
}
A,If the current time is older than the last timetoken时间早(Indicates that there is a request waiting to be obtainedtoken),Then update the current time to the last acquisitiontoken时间(Wait with the previous request)
B,Calculated from the last acquisitiontokentime interval up to now C,计算产生的token增量
delta := lim.limit.tokensFromDuration(elapsed)
type Limit float64
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
return d.Seconds() * float64(limit)
}
That is, the number of seconds in the interval multiplied by each secondtoken数量.
D,计算总的token数量
E,如果桶已经满了,Discard excesstoken
3,Deductions are required for this requesttoken
4,如果token数不够,Calculate the time interval to wait
5,如果请求的tokenThe quantity is smaller than the capacity of the bucket,And the waiting time is greater than the waiting time, indicating that the request is legal.
ok := n <= lim.burst && waitDuration <= maxFutureReserve
6,构造Reservation对象,存储当前limiter对象到lim
7,如果请求合法,Stores what is needed for the current requesttokenQuantity and waiting time(当前时间+等待时间间隔)
8,如果合法,更新当前limiterof the last acquisitiontoken时间为当前时间,获取的tokenThe quantity is the remainder after deductiontoken数量,获取tokenTime for the future can really gettoken的时间点.
9,否则更新limiterof the last acquisitiontokenThe time is the last acquisition of this calculationtoken时间.
上面就是获取tokenAll code implementations.
Limiter提供了三类方法供用户消费Token,用户可以每次消费一个Token,也可以一次性消费多个Token.
1,AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token.反之返回不消费 token,false.That is, the method described above.
func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool
2,当使用 Wait 方法消费 token 时,如果此时桶内 token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 token 满足条件.如果充足则直接返回.
func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
lim.mu.Lock()
burst := lim.burst
limit := lim.limit
lim.mu.Unlock()
if n > burst && limit != Inf {
return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
}
// Check if ctx is already cancelled
select {
case <-ctx.Done():
return ctx.Err()
default:
}
// Determine wait limit
now := time.Now()
waitLimit := InfDuration
if deadline, ok := ctx.Deadline(); ok {
waitLimit = deadline.Sub(now)
}
// Reserve
r := lim.reserveN(now, n, waitLimit)
if !r.ok {
return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
}
// Wait if necessary
delay := r.DelayFrom(now)
if delay == 0 {
return nil
}
t := time.NewTimer(delay)
defer t.Stop()
select {
case <-t.C:
// We can proceed.
return nil
case <-ctx.Done():
// Context was canceled before we could proceed. Cancel the
// reservation, which may permit other events to proceed sooner.
r.Cancel()
return ctx.Err()
}
}
A,If the number of requests exceeds the capacity of the bucket,直接报错
B,通过ctx.Deadline()Calculates the time interval allowed to wait
C,调用r := lim.reserveN(now, n, waitLimit) 获取Reserve对象
D,如果reserveObject representation could not succeed(Exceeded barrel capacity,超出时间限制),返回错误
E,计算需要等待的时间,timeToActIndicates that it can be obtainedtoken的时间.
// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action. Zero duration means act immediately.
// InfDuration means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
if !r.ok {
return InfDuration
}
delay := r.timeToAct.Sub(now)
if delay < 0 {
return 0
}
return delay
}
F,Start the timer to wait.
3,ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 token 是否充足,都会返回一个 Reservation * 对象.
你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间.如果等待时间为 0,则说明不用等待.
必须等到等待时间之后,才能进行接下来的工作.
或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 token 归还.
func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation
This method is more primitive and returns directlyReserve对象,交给用户处理
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
r := lim.reserveN(now, n, InfDuration)
return &r
}
边栏推荐
- MySQL安装配置教程(超级详细、保姆级)
- J9 Digital Currency Theory: Identifying Web3's New Scarcity: Open Source Developers
- Leetcode刷题——字符串相加相关题目(415. 字符串相加、面试题 02.05. 链表求和、2. 两数相加)
- Axure9的元件用法
- 实现fashion_minst服装图像分类
- Implement fashion_minst clothing image classification
- Wintun:一款惊艳的 WireGuard 虚拟网卡接口驱动
- PyTorch分布式backends
- Caldera(一)配置完成的虚拟机镜像及admin身份简单使用
- Leetcode刷题——23. 合并K个升序链表
猜你喜欢
LM小型可编程控制器软件(基于CoDeSys)笔记二十五:plc的数据存储区(数字量输入通道部分)
MySQL安装时一直卡在starting server
Office2021 安装MathType
【Psychology · Characters】Issue 1
4KMILES加入艾盛集团,以更强劲的数字商务能力,加速中国跨境电商的全域全效增长
腾讯云孟凡杰:我所经历的云原生降本增效最佳实践案例
MOSN 反向通道详解
Thread线程类基本使用(上)
ShardingSphere-proxy +PostgreSQL implements read-write separation (static strategy)
磁盘分区的知识
随机推荐
Silver circ: letter with material life insurance products should be by the insurance company is responsible for the management
实战:10 种实现延迟任务的方法,附代码!
即时通讯开发移动端网络短连接的优化手段
基于 flex 布局实现的三栏布局
VMware虚拟机无法上网
SCANIA SCANIA OTL tag is introduced
Wintun:一款惊艳的 WireGuard 虚拟网卡接口驱动
成为黑客不得不学的语言,看完觉得你们还可吗?
MOSN 反向通道详解
Lvm逻辑卷
SQL Server安装教程
Async的线程池使用的哪个?
Thread线程类基本使用(上)
A Review of Nature Microbiology: Focusing on the Algae--Ecological Interface of Phytoplankton-Bacteria Interactions
7月29-31 | APACHECON ASIA 2022
当TIME_WAIT状态的TCP正常挥手,收到SYN后…
软件测试分类
笑话:如果你在河边等待得足够久,你会看到你的敌人的尸体漂过,是怎么翻译出来的?
训练双塔检索模型,可以不用query-doc样本了?明星机构联合发文
姑姑:给小学生出点口算题