当前位置:网站首页>Golang source code analysis: time/rate

Golang source code analysis: time/rate

2022-08-02 23:05:00 User 9710217

这是golang The current limiter implemented in the source code,It is based on the token bucket algorithm:

官方地址: golang.org/x/time/rate

github地址:github.com/golang/time/rate

   r := rate.Every(100 * time.Millisecond)
   limit := rate.NewLimiter(r, 20)
   for {
       if limit.AllowN(time.Now(), 8) {
           log.Info("log:event happen")
       } else {
           log.Info("log:event not allow")
       }

   }

Generated in one second10 个令牌,桶的容量是20,Take the current time8个token

The source code is very simple with only two files:

rate.go
rate_test.go

1,NewLimiter

// NewLimiter returns a new Limiter that allows events up to rate r and permits
// bursts of at most b tokens.
func NewLimiter(r Limit, b int) *Limiter {
  return &Limiter{
    limit: r,
    burst: b,
  }
}

Simply constructed onelimiter对象

type Limiter struct {
  mu     sync.Mutex
  limit  Limit
  burst  int
  tokens float64
  // last is the last time the limiter's tokens field was updated
  last time.Time
  // lastEvent is the latest time of a rate-limited event (past or future)
  lastEvent time.Time
}

The last distribution was recordedtoken的时间,and the last requesttoken的时间

func Every(interval time.Duration) Limit {
  if interval <= 0 {
    return Inf
  }
  return 1 / Limit(interval.Seconds())
}

Only the conversion from time interval to frequency is done.

2,AllowN/Allow

// Allow is shorthand for AllowN(time.Now(), 1).
func (lim *Limiter) Allow() bool {
  return lim.AllowN(time.Now(), 1)
}

// AllowN reports whether n events may happen at time now.
// Use this method if you intend to drop / skip events that exceed the rate limit.
// Otherwise use Reserve or Wait.
func (lim *Limiter) AllowN(now time.Time, n int) bool {
  return lim.reserveN(now, n, 0).ok
}

底层都是调用了reserveN函数,maxFutureReserve参数传的是0

// reserveN is a helper method for AllowN, ReserveN, and WaitN.
// maxFutureReserve specifies the maximum reservation wait duration allowed.
// reserveN returns Reservation, not *Reservation, to avoid allocation in AllowN and WaitN.
func (lim *Limiter) reserveN(now time.Time, n int, maxFutureReserve time.Duration) Reservation {
  lim.mu.Lock()

  if lim.limit == Inf {
    lim.mu.Unlock()
    return Reservation{
      ok:        true,
      lim:       lim,
      tokens:    n,
      timeToAct: now,
    }
  }

  now, last, tokens := lim.advance(now)

  // Calculate the remaining number of tokens resulting from the request.
  tokens -= float64(n)

  // Calculate the wait duration
  var waitDuration time.Duration
  if tokens < 0 {
    waitDuration = lim.limit.durationFromTokens(-tokens)
  }

  // Decide result
  ok := n <= lim.burst && waitDuration <= maxFutureReserve

  // Prepare reservation
  r := Reservation{
    ok:    ok,
    lim:   lim,
    limit: lim.limit,
  }
  if ok {
    r.tokens = n
    r.timeToAct = now.Add(waitDuration)
  }

  // Update state
  if ok {
    lim.last = now
    lim.tokens = tokens
    lim.lastEvent = r.timeToAct
  } else {
    lim.last = last
  }

  lim.mu.Unlock()
  return r
}

1,如果lim.limit == Inf,返回Reservation对象

// A Reservation holds information about events that are permitted by a Limiter to happen after a delay.
// A Reservation may be canceled, which may enable the Limiter to permit additional events.
type Reservation struct {
  ok        bool
  lim       *Limiter
  tokens    int
  timeToAct time.Time
  // This is the Limit at reservation time, it can change later.
  limit Limit
}

2,获取当前时间,上一次产生token的时间和,产生的token

// advance calculates and returns an updated state for lim resulting from the passage of time.
// lim is not changed.
// advance requires that lim.mu is held.
func (lim *Limiter) advance(now time.Time) (newNow time.Time, newLast time.Time, newTokens float64) {
  last := lim.last
  if now.Before(last) {
    last = now
  }

  // Calculate the new number of tokens, due to time that passed.
  elapsed := now.Sub(last)
  delta := lim.limit.tokensFromDuration(elapsed)
  tokens := lim.tokens + delta
  if burst := float64(lim.burst); tokens > burst {
    tokens = burst
  }
  return now, last, tokens
}

A,If the current time is older than the last timetoken时间早(Indicates that there is a request waiting to be obtainedtoken),Then update the current time to the last acquisitiontoken时间(Wait with the previous request)

B,Calculated from the last acquisitiontokentime interval up to now C,计算产生的token增量

 delta := lim.limit.tokensFromDuration(elapsed)
type Limit float64
// tokensFromDuration is a unit conversion function from a time duration to the number of tokens
// which could be accumulated during that duration at a rate of limit tokens per second.
func (limit Limit) tokensFromDuration(d time.Duration) float64 {
  return d.Seconds() * float64(limit)
}

That is, the number of seconds in the interval multiplied by each secondtoken数量.

D,计算总的token数量

E,如果桶已经满了,Discard excesstoken

3,Deductions are required for this requesttoken

4,如果token数不够,Calculate the time interval to wait

5,如果请求的tokenThe quantity is smaller than the capacity of the bucket,And the waiting time is greater than the waiting time, indicating that the request is legal.

ok := n <= lim.burst && waitDuration <= maxFutureReserve

6,构造Reservation对象,存储当前limiter对象到lim

7,如果请求合法,Stores what is needed for the current requesttokenQuantity and waiting time(当前时间+等待时间间隔)

8,如果合法,更新当前limiterof the last acquisitiontoken时间为当前时间,获取的tokenThe quantity is the remainder after deductiontoken数量,获取tokenTime for the future can really gettoken的时间点.

9,否则更新limiterof the last acquisitiontokenThe time is the last acquisition of this calculationtoken时间.

上面就是获取tokenAll code implementations.

Limiter提供了三类方法供用户消费Token,用户可以每次消费一个Token,也可以一次性消费多个Token.

1,AllowN 方法表示,截止到某一时刻,目前桶中数目是否至少为 n 个,满足则返回 true,同时从桶中消费 n 个 token.反之返回不消费 token,false.That is, the method described above.

func (lim *Limiter) Allow() bool
func (lim *Limiter) AllowN(now time.Time, n int) bool

2,当使用 Wait 方法消费 token 时,如果此时桶内 token 数组不足 (小于 N),那么 Wait 方法将会阻塞一段时间,直至 token 满足条件.如果充足则直接返回.

func (lim *Limiter) Wait(ctx context.Context) (err error)
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error)
// WaitN blocks until lim permits n events to happen.
// It returns an error if n exceeds the Limiter's burst size, the Context is
// canceled, or the expected wait time exceeds the Context's Deadline.
// The burst limit is ignored if the rate limit is Inf.
func (lim *Limiter) WaitN(ctx context.Context, n int) (err error) {
  lim.mu.Lock()
  burst := lim.burst
  limit := lim.limit
  lim.mu.Unlock()

  if n > burst && limit != Inf {
    return fmt.Errorf("rate: Wait(n=%d) exceeds limiter's burst %d", n, burst)
  }
  // Check if ctx is already cancelled
  select {
  case <-ctx.Done():
    return ctx.Err()
  default:
  }
  // Determine wait limit
  now := time.Now()
  waitLimit := InfDuration
  if deadline, ok := ctx.Deadline(); ok {
    waitLimit = deadline.Sub(now)
  }
  // Reserve
  r := lim.reserveN(now, n, waitLimit)
  if !r.ok {
    return fmt.Errorf("rate: Wait(n=%d) would exceed context deadline", n)
  }
  // Wait if necessary
  delay := r.DelayFrom(now)
  if delay == 0 {
    return nil
  }
  t := time.NewTimer(delay)
  defer t.Stop()
  select {
  case <-t.C:
    // We can proceed.
    return nil
  case <-ctx.Done():
    // Context was canceled before we could proceed.  Cancel the
    // reservation, which may permit other events to proceed sooner.
    r.Cancel()
    return ctx.Err()
  }
}

A,If the number of requests exceeds the capacity of the bucket,直接报错

B,通过ctx.Deadline()Calculates the time interval allowed to wait

C,调用r := lim.reserveN(now, n, waitLimit) 获取Reserve对象

D,如果reserveObject representation could not succeed(Exceeded barrel capacity,超出时间限制),返回错误

E,计算需要等待的时间,timeToActIndicates that it can be obtainedtoken的时间.

// DelayFrom returns the duration for which the reservation holder must wait
// before taking the reserved action.  Zero duration means act immediately.
// InfDuration means the limiter cannot grant the tokens requested in this
// Reservation within the maximum wait time.
func (r *Reservation) DelayFrom(now time.Time) time.Duration {
  if !r.ok {
    return InfDuration
  }
  delay := r.timeToAct.Sub(now)
  if delay < 0 {
    return 0
  }
  return delay
}

F,Start the timer to wait.

3,ReserveN 的用法就相对来说复杂一些,当调用完成后,无论 token 是否充足,都会返回一个 Reservation * 对象.

你可以调用该对象的 Delay() 方法,该方法返回了需要等待的时间.如果等待时间为 0,则说明不用等待.

必须等到等待时间之后,才能进行接下来的工作.

或者,如果不想等待,可以调用 Cancel() 方法,该方法会将 token 归还.

func (lim *Limiter) Reserve() *Reservation
func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation

This method is more primitive and returns directlyReserve对象,交给用户处理

func (lim *Limiter) ReserveN(now time.Time, n int) *Reservation {
  r := lim.reserveN(now, n, InfDuration)
  return &r
}
原网站

版权声明
本文为[User 9710217]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/214/202208021945332575.html