当前位置:网站首页>Distributed current limiting, hand & redisson implementation

Distributed current limiting, hand & redisson implementation

2022-08-02 12:14:00 tar


前言

限流,It is a necessary means for the website to prevent traffic floods,Especially the processing of some important resources,very important.The core purpose of current limiting is naturally to ensure the normal operation of the website,Avoid handling more traffic than the website itself,The last straw,你懂得.

常见的限流算法有 计数器漏桶算法令牌算法.Analysis has been done in previous articles,有兴趣可以详情查看

This paper mainly analyzes the token bucket algorithm,What are the characteristics of the token bucket?

  • 一定速度(Usually constant speed)生产
  • non-constant speed consumption,As long as there are tokens available in the token bucket,可直接使用,It can meet the traffic impact in some specific scenarios
  • Successful access token to continue,反之,拒绝处理

From these characteristics, it can be seen that,A token bucket is also similar to a queue,有生产者、消费者,There are also fixed size containers,From an implementation perspective, Maybe you have to write a producer as well,and specify the production rate.

不过,我们还有更简单的方法,In time for the mobile axis,And take the consumer request as the processing time,Trigger specific token production logic,This eliminates the producer role.

我画了张图,你可以参考下:
在这里插入图片描述


一、Hand token current limit

1. 构思

If you are asked to design a token current limiting algorithm,你会如何思考?

首先,The essence of the token algorithm is to specify(恒定)speed production,因此,there must be time,and the quantity that needs to be produced during this period;换句话说,在给定的时间间隔 (interval)内,生产 permits 个令牌.

另外,constant speed production,When consumption rate is lower than production rate,Tokens will accumulate,Will that keep adding up??when is the end?

当然不会,你一定要牢记,在给定 interval 内,最多只有 permits 个令牌.You can put this time interval interval understood as a sliding window,即,Window size does not change,And the window will slide along the time direction.
在这里插入图片描述

好,With this theory,We started to design token current limit:

我们假设 1s 限制 10 个令牌,That is, the window size is 1s,And this window can only install at most 10 个令牌.Then we write,第 1s 有 10 个令牌,第 2s 也有 10个, … 第 N s 也有10个令牌.

好,You begin to have a way of thinking,用 redis 来定义一个 key as the name of the token bucket,Then set the interval and frequency,Decreases by one for every token consumed,then the window moves over time,The new token is added,It should be eliminated.

然后,Write the code like this:

  // cursor start time of the window
  // now 当前时间
  // rateInterval 窗口大小(时间间隔)
  
  if (now - cursor > rateInterval) {
     
      // 重置
      jedis.hset(name, CURSOR, String.valueOf(now));
      jedis.hset(name, CURRENT_RATE, String.valueOf(rate));
  } else {
    
      // 可能还有 token 余额,尝试获取
      String current = jedis.hget(name, CURRENT_RATE);
      int currentRate = Integer.parseInt(Strings.isNullOrEmpty(current) ? "0" : current);
      if (currentRate < permits) {
    
          return false;
      }
  }
  // So far, it means that it has been successful., Subtract the corresponding number of tokens
  jedis.hincrBy(name, CURRENT_RATE, -permits);

搞好了,原来这么简单?

别急,There is a serious problem here,Suppose here 0.9s 的时候来了 10 个请求,1.1s 的时候又来了 10 个请求,through the above flow restrictor,这 20 requests will be processed,发现问题了吗?

0.9 到 1.1 s 才 0.2 s Let go of the interval 20 个请求,Is it against the original intention?,为啥会这样呢?

The reason is that the scale is too large,Selected by default 1 s 刻度,即 前10requests belong to the 1 s,后 10 requests belong to the 2 s,而这 20 requests focus on 0.9s -1.1s 过来,just across two windows.

在这里插入图片描述

如何解决?滑动窗口,本质来说,To log the request details for the entire window,every time a new request tries to get the token,To subtract the number of tokens already consumed.

这里我们使用 redis 的 zset 来实现,用 score 保留时间戳、value Record the number of tokens to be acquired per request,Then clear the data outside the window:

 String windowLimitName = name + "_windows";
 List<String> permitsList = jedis.zrangeByScore(windowLimitName, now - rateInterval, now);
 // The current window already uses the token sum
 int usedPermits = permitsList.stream().map(Integer::parseInt).mapToInt(Integer::intValue).sum();
 
 // The window is moved forward one tick
 if (now - cursor > rateInterval) {
    
     // The balance in the current sliding window is not enough
     if (rate - usedPermits < permits) {
    
         return false;
     }
     
     jedis.hset(name, CURSOR, String.valueOf(now));
     jedis.hset(name, CURRENT_RATE, String.valueOf(rate));
 } else {
    
     // 可能还有 token 余额,尝试获取
     String current = jedis.hget(name, CURRENT_RATE);
     int currentRate = Integer.parseInt(Strings.isNullOrEmpty(current) ? "0" : current);
     if (currentRate < permits) {
    
         return false;
     }
 }
 // Delete the specified number of tokens
 jedis.hincrBy(name, CURRENT_RATE, -permits);
 // Add the details of this acquisition window
 jedis.zadd(windowLimitName, now, String.valueOf(permits));
 // Delete data outside the window
 jedis.zremrangeByScore(windowLimitName, 0, now - rateInterval);

好,这样就差不多了,当然,some small optimizations,where can you look.

2. 实现

With the above two steps,we will write soon redis Token version of the current limiting algorithm,这里,Let's look at the complete implementation,并验证其正确性.

Case 1:

在这里插入图片描述
然后验证下,这里可以看到 200ms Consumed tokens within more than 10个:

在这里插入图片描述

根据以上问题,Let's look at the improved version,Case 2:
在这里插入图片描述
Continue to test and verify,你可以观察到,After adding window limit,任何 1s within the window,consume at most 10 个令牌:
在这里插入图片描述

好,到目前为止,one with the help of redis Implementation of the token bucket current-limiting algorithm is almost done.可能你也注意到了,The above way of getting the token is not原子操作,因此,在实际生产中,我们可以通过 lua The script atomically encapsulates all the above operations.

二、redisson 实现

到这里,你可能会问了,Do not want to reinvent the wheel in actual work,Are there any out-of-the-box components?

If you are a stand-alone current limiter,推荐你使用 Guava 的 RateLimiter,I wrote an article before to analyze its implementation in detail,感兴趣可以点击详情查看

另外就是,分布式限流算法,This is the focus of our next analysis.这里以 redisson 的 RedissonRateLimiter 展开,This component is also often used in production by the author.

1. 设计

这是 redisson 3.15.x 版本:
在这里插入图片描述

This is so far(2022/07/31)最新版本 3.17.5 ,可以看到,The code seems to be more complicated:在这里插入图片描述
当然,Don't be intimidated by the code above,At first this function was only a few lines of code,Also after long-term iterations、完善功能、修复 bug,Up to now,a little more complicated.

本质来说,Similar to what we wrote ourselves:

  • 首先,获取 rate、rateInterval Equal fixed parameters
  • Check if the parameter is valid,比如 The request parameter is greater than rate 肯定就不行了.
  • Check how many tokens are currently used within the window,And judge whether the current request is enough.
  • 随着时间推移,the window is moving,A portion of the tokens will definitely expire,try to replace these expired ones.
  • Judging if it can be successfully obtained,Access this fetch details,And from total minus the corresponding number of tokens

2. 原理

接下来我们将以 redisson 3.17.5 进行拆解分析,First look at the method definition of the call:

<T, R> RFuture<R> evalWriteAsync(String key, Codec codec, RedisCommand<T> evalCommandType, String script, List<Object> keys, Object... params);

passed when calling the method keys:

Arrays.asList(getName(), getValueName(), getClientValueName(), getPermitsName(), getClientPermitsName())

其次是 params (args):

value, System.currentTimeMillis(), ThreadLocalRandom.current().nextLong()

值得注意的是,lua The subscripts in the list are from 1 开始的,有了这些,We continue to analyze this complex lua 代码:

1)Get our preset parameters,And do parameter verification

local rate = redis.call('hget', KEYS[1], 'rate');
local interval = redis.call('hget', KEYS[1], 'interval');
local type = redis.call('hget', KEYS[1], 'type');
assert(rate ~= false and interval ~= false and type ~= false, 'RateLimiter is not initialized')

2)Calculate expired tokens

local expiredValues = redis.call('zrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); 
local released = 0; 
for i, v in ipairs(expiredValues) do 
     local random, permits = struct.unpack('Bc0I', v);
     released = released + permits;
end; 

这里 ARGV[2] 就是我们传递的参数 System.currentTimeMillis(),通过 zrangebyscore Scope finds a detailed list outside the valid window,i.e. expired list.

After we eliminate this part of the expired access list,This means that our new window can accommodate new tokens.,Will try to add later.

lua 方法 tonumber Indicates that the parameter is converted into number 类型.

另外,struct.unpack() 方法是和 struct.pack() 配套使用,Nature is compressed into a multiple parameters,or decompress into multiple,The first parameter of the method can specify the format.这种方式,Convenient to store multiple fields,It can also make the value unique.

3)补充令牌:

if released > 0 then 
     redis.call('zremrangebyscore', permitsName, 0, tonumber(ARGV[2]) - interval); 
     if tonumber(currentValue) + released > tonumber(rate) then 
          currentValue = tonumber(rate) - redis.call('zcard', permitsName); 
     else 
          currentValue = tonumber(currentValue) + released; 
     end; 
     redis.call('set', valueName, currentValue);
end;

首先,这里通过 zremrangebyscore Expired token details will be deleted(窗口之外),这时,zset Only the data within the valid window remains in the list,可以通过 zcard Calculate the current total consumption of tokens.

4)deduction token

// The current number of available tokens is less than the number of requested tokens,将请求失败
if tonumber(currentValue) < tonumber(ARGV[1]) then 
    local firstValue = redis.call('zrange', permitsName, 0, 0, 'withscores'); 
    res = 3 + interval - (tonumber(ARGV[2]) - tonumber(firstValue[2]));
else // can get token
    redis.call('zadd', permitsName, ARGV[2], struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])); 
    redis.call('decrby', valueName, ARGV[1]); 
    res = nil; 
end; 

这里 ARGV[1] Represents our request parameters value,即,The request token number.通过 zadd Record token acquisition details,decrby Execute Deduction Token.

以下 lua command means,the three parameters string.len(ARGV[3]), ARGV[3], ARGV[1] knead into the specified format Bc0I 的一个值,This method is reversible(i.e. inverse parsing)

struct.pack('Bc0I', string.len(ARGV[3]), ARGV[3], ARGV[1])

其中 ARGV[3] 表示 随机数 random,ARGV[1] 表示 请求令牌数 value.

5)设置过期时间

local ttl = redis.call('pttl', KEYS[1]);
if ttl > 0 then
    redis.call('pexpire', valueName, ttl);
    redis.call('pexpire', permitsName, ttl);
end;
return res;

当然,This is optional.This logic was added later,The previous version did not appear.

3. 动手试试

先写个 case 验证下:
在这里插入图片描述
可以看到,结果符合预期.再看看 redis 存了什么:

1)预设参数:
在这里插入图片描述
2)Token access details:
在这里插入图片描述
3)token remaining:
在这里插入图片描述


总结

限流,It is one of the commonly used security measures for websites,通常有 计数器、Three current limiting methods of leaky bucket and token bucket.This paper mainly analyzes the most used token bucket algorithm among them.

令牌桶算法,Produce tokens at a specified rate,and placed in a fixed-capacity token bucket,然后,The consumer consumes tokens at a non-constant rate;If it can be successfully obtained, perform subsequent operations,Otherwise wait or refuse to process.

另外,Often implementations do not have a specific producer to produce tokens,instead of time,Lazy trigger method to maintain the balance of token bucket.即,The corresponding token logic is triggered when the client requests.

token bucket design,It is important to maintain the correctness of the time window,同时,It is also necessary to provide atomic guarantees as much as possible,可以考虑使用 lua 处理.

随后,Our list redisson The latest version of the token bucket implementation,在实际生产中,你可以开箱即用,无需重复造轮子.




相关参考:
原网站

版权声明
本文为[tar]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/214/202208021152244731.html