当前位置:网站首页>How to implement a delay queue?
How to implement a delay queue?
2022-07-04 17:02:00 【InfoQ】
One 、 What is delay queue

Two 、 Scenarios for delay queues ?
2.1 For example, overtime customs clearance :
2.2 For example, callback retry :
2.3 Meeting reminder :
2.4 Various delay reminders :
3、 ... and 、 Implementation of delay queue ?
Four 、 How to use redis Implement delay queue ?
4.1 We use redisson To achieve , First introduce redisson package
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.3</version>
</dependency>4.2、 To configure redisson
redisson:
redisModel: SINGLE
singleConfig:
address: redis://127.0.0.1:63794.3 Delay service : Add delay task , Cancel delay task
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* Add a delay task , add to job Meta information
*
* @param job Meta information ,job Contained in the taskId Primary key ,topicId queue id,retryNum Retry count ,delaytime Delay Time etc. , Not expanded here .
*/
@Override
public void addJob(Job job) {
// Add distributed lock , Prevent repeated addition of tasks
RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. take job Add to JobPool in ,jobpool As a global index , All unexecuted tasks exist jobPool in .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());
}
jobPool.put(topicId, job);
// 2. take job Add to DelayBucket in , Sort by delay time
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("add delay job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* Delete job Information , Why delete job Information ?
* When we are sure that the last delayed task is unnecessary , We can cancel the execution of delayed tasks in advance .
*
* @param jobDie Meta information
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
// Delete task from global index .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
// from zset Delete task in
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("delete job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}4.4 Handling thread :
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* Start timing start handling JOB Information
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
//System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
// Take out the expired tasks
RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
if (CollectionUtils.isEmpty(jobCollection)) {
return;
}
// Move the expired tasks to the consumption queue ,zset Delete the task in
List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);
if (CollectionUtils.isEmpty(jobList)) {
return;
}
if (readyQueue.addAll(jobList)) {
bucketSet.removeAllAsync(jobList);
}
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}4.5 Consumer thread :
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC consumption
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, " Turn on TOPIC Consumer thread ");
}
/**
* Turn on TOPIC Consumer thread
* Put all possible exceptions catch live , Make sure While(true) Can be uninterrupted
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// Distributed lock time ratio Blpop More blocking time 1S, Avoid releasing the lock , The lock has been released overtime ,unlock Report errors
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. obtain ReadyQueue Data to be consumed
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. obtain job Meta information content
RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. consumption
FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "--> consumption JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 Consumption success , Delete JobPool and DelayBucket Of job Information
jobPoolMap.remove(topicId);
} else {
/**
* Retries are 0 Go straight back
*/
if (job.getRetry() == 0) {
return;
}
int retrySum = job.getRetry() + 1;
// 3.2 Consumption failure , Then rejoin according to the strategy Bucket
// If the number of retries is greater than 5, Will jobPool Data deletion in , Persist to DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
// log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 Failed times to update meta information
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}About lingchuang group (Advance Intelligence Group)
Looking back BREAK AWAY
边栏推荐
- APOC custom functions and procedures
- Readis configuration and optimization of NoSQL (final chapter)
- System. Currenttimemillis() and system Nanotime (), which is faster? Don't use it wrong!
- 建筑建材行业经销商协同系统解决方案:赋能企业构建核心竞争力
- Market trend report, technical innovation and market forecast of taillight components in China
- Oracle监听器Server端与Client端配置实例
- [glide] cache implementation - memory and disk cache
- I let the database lock the table! Almost fired!
- Smart Logistics Park supply chain management system solution: digital intelligent supply chain enables a new supply chain model for the logistics transportation industry
- How to decrypt worksheet protection password in Excel file
猜你喜欢

Solution du système de gestion de la chaîne d'approvisionnement du parc logistique intelligent

overflow:auto与felx结合的用法

新的职业已经出现,怎么能够停滞不前 ,人社部公布建筑新职业

"Cannot initialize Photoshop because the temporary storage disk is full" graphic solution

Years of training, towards Kata 3.0! Enter the safe container experience out of the box | dragon lizard Technology

Start by counting

容器环境minor gc异常频繁分析
Why do you say that the maximum single table of MySQL database is 20million? Based on what?

Opencv learning -- geometric transformation of image processing

多年锤炼,迈向Kata 3.0 !走进开箱即用的安全容器体验之旅| 龙蜥技术
随机推荐
go-micro教程 — 第二章 go-micro v3 使用Gin、Etcd
L1-072 scratch lottery
对人胜率84%,DeepMind AI首次在西洋陆军棋中达到人类专家水平
Statistical learning: logistic regression and cross entropy loss (pytoch Implementation)
Research Report on market supply and demand and strategy of China's four sided flat bag industry
ECCV 2022放榜了:1629篇论文中选,录用率不到20%
How can programmers improve the speed of code writing?
Sql实现Split
Lv166 turned over
Research Report on market supply and demand and strategy of China's well completion equipment industry
程序员怎么才能提高代码编写速度?
Blood spitting finishing nanny level series tutorial - play Fiddler bag grabbing tutorial (2) - first meet fiddler, let you have a rational understanding
Maximum subarray and matrix multiplication
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
高度剩余法
Accounting regulations and professional ethics [10]
同构图与异构图CYPHER-TASK设计与TASK锁机制
NoSQL之readis配置与优化(终章)
Accounting regulations and professional ethics [9]
Accounting regulations and professional ethics [11]