当前位置:网站首页>How to implement a delay queue?
How to implement a delay queue?
2022-07-04 17:02:00 【InfoQ】
One 、 What is delay queue

Two 、 Scenarios for delay queues ?
2.1 For example, overtime customs clearance :
2.2 For example, callback retry :
2.3 Meeting reminder :
2.4 Various delay reminders :
3、 ... and 、 Implementation of delay queue ?
Four 、 How to use redis Implement delay queue ?
4.1 We use redisson To achieve , First introduce redisson package
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.3</version>
</dependency>4.2、 To configure redisson
redisson:
redisModel: SINGLE
singleConfig:
address: redis://127.0.0.1:63794.3 Delay service : Add delay task , Cancel delay task
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* Add a delay task , add to job Meta information
*
* @param job Meta information ,job Contained in the taskId Primary key ,topicId queue id,retryNum Retry count ,delaytime Delay Time etc. , Not expanded here .
*/
@Override
public void addJob(Job job) {
// Add distributed lock , Prevent repeated addition of tasks
RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. take job Add to JobPool in ,jobpool As a global index , All unexecuted tasks exist jobPool in .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());
}
jobPool.put(topicId, job);
// 2. take job Add to DelayBucket in , Sort by delay time
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("add delay job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* Delete job Information , Why delete job Information ?
* When we are sure that the last delayed task is unnecessary , We can cancel the execution of delayed tasks in advance .
*
* @param jobDie Meta information
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
// Delete task from global index .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
// from zset Delete task in
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("delete job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}4.4 Handling thread :
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* Start timing start handling JOB Information
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
//System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
// Take out the expired tasks
RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
if (CollectionUtils.isEmpty(jobCollection)) {
return;
}
// Move the expired tasks to the consumption queue ,zset Delete the task in
List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);
if (CollectionUtils.isEmpty(jobList)) {
return;
}
if (readyQueue.addAll(jobList)) {
bucketSet.removeAllAsync(jobList);
}
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}4.5 Consumer thread :
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC consumption
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, " Turn on TOPIC Consumer thread ");
}
/**
* Turn on TOPIC Consumer thread
* Put all possible exceptions catch live , Make sure While(true) Can be uninterrupted
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// Distributed lock time ratio Blpop More blocking time 1S, Avoid releasing the lock , The lock has been released overtime ,unlock Report errors
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. obtain ReadyQueue Data to be consumed
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. obtain job Meta information content
RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. consumption
FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "--> consumption JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 Consumption success , Delete JobPool and DelayBucket Of job Information
jobPoolMap.remove(topicId);
} else {
/**
* Retries are 0 Go straight back
*/
if (job.getRetry() == 0) {
return;
}
int retrySum = job.getRetry() + 1;
// 3.2 Consumption failure , Then rejoin according to the strategy Bucket
// If the number of retries is greater than 5, Will jobPool Data deletion in , Persist to DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
// log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 Failed times to update meta information
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}About lingchuang group (Advance Intelligence Group)
Looking back BREAK AWAY
边栏推荐
- Use and principle of thread pool
- 【Go ~ 0到1 】 第六天 文件的读写与创建
- Linear time sequencing
- Years of training, towards Kata 3.0! Enter the safe container experience out of the box | dragon lizard Technology
- egg. JS learning notes
- Detailed process of DC-2 range construction and penetration practice (DC range Series)
- APOC custom functions and procedures
- 被PMP考试“折磨”出来的考试心得,值得你一览
- 智慧物流園區供應鏈管理系統解决方案:數智化供應鏈賦能物流運輸行業供應鏈新模式
- Cut! 39 year old Ali P9, saved 150million
猜你喜欢

Software Engineer vs Hardware Engineer

矿产行业商业供应链协同系统解决方案:构建数智化供应链平台,保障矿产资源安全供应

昆明三环闭合工程将经过这些地方,有在你家附近的吗?

VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题

《吐血整理》保姆级系列教程-玩转Fiddler抓包教程(2)-初识Fiddler让你理性认识一下

从数数开始
Why do you say that the maximum single table of MySQL database is 20million? Based on what?

How to decrypt worksheet protection password in Excel file

C# 服务器日志模块

被PMP考试“折磨”出来的考试心得,值得你一览
随机推荐
I let the database lock the table! Almost fired!
Li Kou today's question -1200 Minimum absolute difference
Linear time sequencing
Oracle监听器Server端与Client端配置实例
Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
Implement graph data construction task based on check point
照明行业S2B2B解决方案:高效赋能产业供应链,提升企业经济效益
Integration of ongdb graph database and spark
The test experience "tortured" by the PMP test is worth your review
Median and order statistics
祝贺Artefact首席数据科学家张鹏飞先生荣获 Campaign Asia Tech MVP 2022
Firebird experience summary
安信证券排名 网上开户安全吗
[Chongqing Guangdong education] National Open University spring 2019 1248 public sector human resource management reference questions
Research Report of exoskeleton robot industry - market status analysis and development prospect prediction
Unity interview questions (continuously updated)
Task state rollback and data blocking tasks based on check point mechanism
Can you really use MySQL explain?
矿产行业商业供应链协同系统解决方案:构建数智化供应链平台,保障矿产资源安全供应
Market trend report, technical innovation and market forecast of taillight components in China