当前位置:网站首页>如何实现一个延时队列 ?

如何实现一个延时队列 ?

2022-07-04 15:05:00 InfoQ

一、什么是延时队列
延时队列,顾名思义,就是元素在入队列时,会指定一个延时时间,期望在经过指定时间后再处理该元素。
实现延时队列的方式有多种,可以采用我们熟悉的方式实现自己的延时队列。一般延时队列可以作为基础公共服务提供给全公司使用,这种方式需要独立维护一个项目,对qps稳定性,数据一致性,精度等要求更高,可以采用时间轮算法实现。 我在项目中遇到一个使用延时队列的场景,因为项目中使用了redis,所以我用redis 实现了一个延时队列来满足需求,本文介绍下如何用redis 实现一个延时队列。

作者可爱儿子
二、延时队列适用的场景?
延时队列适用的场景有很多:
2.1比如超时关单:
即用户在电商平台下单后没有立即支付,等超过指定时间后订单自动关闭。
2.2 比如回调重试:
对于异步接口来说,如果给调用方回调时,由于网络不通或其他原因导致回调失败时,我们可以采用延时策略对调用方的回调接口进行重试。为了避免因网络抖动或其他原因造成的回调失败,我们可以采用的延时策略为 1min  5 min  10 min  30 min  1hour 等间隔进行回调。
2.3 会议提醒:
比如我们用的lark会议,在会议开始前10分钟对参会人进行提醒,这个功能也可以采用延时队列来实现。
2.4 各种延时提醒:
比如用户下单未支付时,系统在关单前10分钟提醒用户去支付。比如我曾做过的二手车的一个需求就是提醒买手尽快出价等。
这些场景都用到了延时队列,其实上述场景采用定时任务也能实现,但是相比于定时任务,延时队的时间把控更精准,延时队列不用扫描库表,对系统消耗更少。
三、延时队列的实现方式?
•DelayQueue,
这个是jdk自带的一种延时队列,位于java.util.concurrent 包下,它是一个有界的阻塞队列,它内部封装了一个 PriorityQueue(优先队列)
PriorityQueue 内部使用完全二叉堆来实现队列元素排序,当向 DelayQueue 队列中添加元素时,会给元素一个 Delay(延迟时间)作为排序条件,队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。有兴趣的同学可以详细查看源码。
•Quartz 定时任务,
对时间精准度要求不高,数据量较小的任务,可以采用定时任务替代延时队列。
•redis 过期回调,
我们可以开启监听key 是否过期的事件,一旦key 过期会触发一个callback 事件。这样我们就能通过设置key的过期时间,来实现延时队列的效果。
•redis  sorted set,
主要是利用 zset 的score 属性,我们将延时时间转成(当前时间+延时时间)(时间单位毫秒)作为scroe 属性。然后开启一个消费线程轮训redis队列,当score属性的值小于当前时间时,证明延时消息到期,可以进行消费。
•Mq,
通过rabbitMq  或 rocketMq 可以实现延时队列,具体实现方式省略。
•时间轮,
基于kafka 或 netty 的时间轮算法来实现延时队列,实现方式省略,有兴趣的同学可以自己查看。

实现延时队列的方式有多种,可以采用我们熟悉的方式实现自己的延时队列。一般延时队列可以作为基础公共服务提供给全公司使用,这种方式需要独立维护一个项目,对qps稳定性,数据一致性,精度等要求更高,可以采用时间轮算法实现。
我在项目中遇到一个使用延时队列的场景,因为项目中使用了redis,所以我用redis 实现了一个延时队列来满足需求,下面介绍下如何用redis 实现一个延时队列。
四、如何用redis实现延时队列?
通过 redis 实现延时队列有两种方式,第一种是redis 的过期回调,但是这种方式需要修改redis 的配置文件并重启服务,这对于我们正在使用的 redis 服务来说比较困难。
所以我们用第二种方式来实现一个延时队列:
4.1我们使用redisson 来实现,首先引入redisson 包
 <dependency>
 <groupId>org.redisson</groupId>
 <artifactId>redisson-spring-boot-starter</artifactId>
 <version>3.16.3</version>
 </dependency>
4.2、配置redisson
redisson:
 redisModel: SINGLE
 singleConfig:
 address: redis://127.0.0.1:6379

4.3延时服务:添加延时任务,取消延时任务
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {

 @Autowired
 private RedissonClient redissonClient;

 /**
 * 新增一个延时任务,添加job元信息
 *
 * @param job 元信息,job中包含 taskId 主键,topicId 队列id,retryNum 重试次数,delaytime 延时时间 等,此处不展开。
 */
 @Override
 public void addJob(Job job) {
 //添加分布式锁,防止重复添加任务
 RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
 try {
 boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
 }
 String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());

 // 1. 将job添加到 JobPool中,jobpool 作为一个全局索引,所有未执行任务都存在jobPool 中。
 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
 if (jobPool.get(topicId) != null) {
 throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());
 }

 jobPool.put(topicId, job);

 // 2. 将job添加到 DelayBucket中,按延时时间进行排序
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.add(job.getDelay(), topicId);
 } catch (InterruptedException e) {
 log.error(&quot;addJob error&quot;, e);
 throw new BizException(&quot;add delay job error,reason:&quot; + e.getMessage());
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }


 /**
 * 删除job信息,为什么要删除job 信息?
 * 当我们确信上一个延时任务没有必要执行时,我们可以提前取消延时任务的执行。
 *
 * @param jobDie 元信息
 */
 @Override
 public void deleteJob(JobDie jobDie) {

 RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
 try {
 boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
 }
 String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
 //从全局索引中删除任务。
 RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
 jobPool.remove(topicId);
 //从zset 中删除任务
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.remove(topicId);
 } catch (InterruptedException e) {
 log.error(&quot;addJob error&quot;, e);
 throw new BizException(&quot;delete job error,reason:&quot; + e.getMessage());
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }
}
4.4搬运线程:
搬运线程的目的是将 zset 中已经到期的任务搬运到消费队列中,消费队列中的任务会被消费线程消费。之所以会增加一个消费队列,是考虑到我们的消费能力和数据安全,
如果消费能力比较弱,可能会造成消费线程阻塞,或者数据丢失
。我们把到期任务放到一个阻塞队列中,可以让消费线程顺序消费。

这个地方还能继续优化,比如可以落库,可以建立多个阻塞队列,每个阻塞队列可以指定一个线程池进行消费等。
@Slf4j
@Component
public class CarryJobScheduled {

 @Autowired
 private RedissonClient redissonClient;

 /**
 * 启动定时开启搬运JOB信息
 */
 @Scheduled(cron = &quot;*/1 * * * * *&quot;)
 public void carryJobToQueue() {
 //System.out.println(&quot;carryJobToQueue --->&quot;);
 RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
 try {
 boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
 }
 // 将到期的任务取出来
 RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 long now = System.currentTimeMillis();
 Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
 if (CollectionUtils.isEmpty(jobCollection)) {
 return;
 }
 
 // 将到期的任务搬运到消费队列中,zset 中的任务删除
 List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
 RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);

 if (CollectionUtils.isEmpty(jobList)) {
 return;
 }
 if (readyQueue.addAll(jobList)) {
 bucketSet.removeAllAsync(jobList);
 }

 } catch (InterruptedException e) {
 log.error(&quot;carryJobToQueue error&quot;, e);
 } finally {
 if (lock != null) {
 lock.unlock();
 }
 }
 }
}
4.5消费线程:
开启一个消费线程,消费线程会消费阻塞队列中的到期任务,其中 ConsumerService 可以采用策略模式,根据不同的topic 进行不同的业务处理。
@Slf4j
@Component
public class ReadyQueueContext {

 @Autowired
 private RedissonClient redissonClient;

 @Autowired
 private ConsumerService consumerService;

 /**
 * TOPIC消费
 */
 @PostConstruct
 public void startTopicConsumer() {
 TaskManager.doTask(this::runTopicThreads, &quot;开启TOPIC消费线程&quot;);
 }

 /**
 * 开启TOPIC消费线程
 * 将所有可能出现的异常全部catch住,确保While(true)能够不中断
 */
 @SuppressWarnings(&quot;InfiniteLoopStatement&quot;)
 private void runTopicThreads() {
 while (true) {
 RLock lock = null;
 try {
 lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);
 } catch (Exception e) {
 log.error(&quot;runTopicThreads getLock error&quot;, e);
 }
 try {
 if (lock == null) {
 continue;
 }
 // 分布式锁时间比Blpop阻塞时间多1S,避免出现释放锁的时候,锁已经超时释放,unlock报错
 boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
 if (!lockFlag) {
 continue;
 }

 // 1. 获取ReadyQueue中待消费的数据
 RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);
 String topicId = queue.poll(60, TimeUnit.SECONDS);
 if (StringUtils.isEmpty(topicId)) {
 continue;
 }

 // 2. 获取job元信息内容
 RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
 Job job = jobPoolMap.get(topicId);

 // 3. 消费
 FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + &quot;-->消费JobId-->&quot; + job.getJobId());
 if (taskResult.get()) {
 // 3.1 消费成功,删除JobPool和DelayBucket的job信息
 jobPoolMap.remove(topicId);
 } else {
 /**
 * 重试次数为0就直接返回
 */
 if (job.getRetry() == 0) {
 return;
 }
 int retrySum = job.getRetry() + 1;
 // 3.2 消费失败,则根据策略重新加入Bucket

 // 如果重试次数大于5,则将jobPool中的数据删除,持久化到DB
 if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
 jobPoolMap.remove(topicId);
 continue;
 }
 job.setRetry(retrySum);
 long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
 // log.info(&quot;next retryTime is [{}]&quot;, DateUtil.long2Str(nextTime));
 RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
 delayBucket.add(nextTime, topicId);
 // 3.3 更新元信息失败次数
 jobPoolMap.put(topicId, job);
 }
 } catch (Exception e) {
 log.error(&quot;runTopicThreads error&quot;, e);
 } finally {
 if (lock != null) {
 try {
 lock.unlock();
 } catch (Exception e) {
 log.error(&quot;runTopicThreads unlock error&quot;, e);
 }
 }
 }
 }
 }

关于领创集团(Advance Intelligence Group)
领创集团成立于 2016 年,致力于通过科技创新的本地化应用,改造和重塑金融和零售行业,以多元化的业务布局打造一个服务于消费者、企业和商户的生态圈。集团旗下包含企业业务和消费者业务两大板块,企业业务包含 ADVANCE.AI 和 Ginee,分别为银行、金融、金融科技、零售和电商行业客户提供基于 AI 技术的数字身份验证、风险管理产品和全渠道电商服务解决方案;消费者业务 Atome Financial 包括亚洲领先的先享后付平台 Atome 和数字金融服务。
2021 年 9 月,领创集团宣布完成超 4 亿美元 D 轮融资,融资完成后领创集团估值已超 20 亿美元,成为新加坡最大的独立科技创业公司之一。

往期回顾 BREAK AWAY
Spring data JPA 实践和原理浅析
如何解决海量数据更新场景下的 Mysql 死锁问题
企业级 APIs 安全实践指南 (建议初中级工程师收藏)
Cypress UI 自动化测试框架
serverless让我们的运维更轻松

▼ 如果觉得这篇内容对你有所帮助,有所启发,欢迎点赞收藏:
1、
点赞、关注
领创集团
,获取最新技术分享和公司动态。
2、关注我们的公众号 & 知乎号「领创集团 Advance Group」或访问
官方网站
,了解更多企业动态。

原网站

版权声明
本文为[InfoQ]所创,转载请带上原文链接,感谢
https://xie.infoq.cn/article/a7eb2a84e4e04adc0c098dec1