当前位置:网站首页>How to implement a delay queue?
How to implement a delay queue?
2022-07-04 17:02:00 【InfoQ】
One 、 What is delay queue
Two 、 Scenarios for delay queues ?
2.1 For example, overtime customs clearance :
2.2 For example, callback retry :
2.3 Meeting reminder :
2.4 Various delay reminders :
3、 ... and 、 Implementation of delay queue ?
Four 、 How to use redis Implement delay queue ?
4.1 We use redisson To achieve , First introduce redisson package
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.3</version>
</dependency>
4.2、 To configure redisson
redisson:
redisModel: SINGLE
singleConfig:
address: redis://127.0.0.1:6379
4.3 Delay service : Add delay task , Cancel delay task
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* Add a delay task , add to job Meta information
*
* @param job Meta information ,job Contained in the taskId Primary key ,topicId queue id,retryNum Retry count ,delaytime Delay Time etc. , Not expanded here .
*/
@Override
public void addJob(Job job) {
// Add distributed lock , Prevent repeated addition of tasks
RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. take job Add to JobPool in ,jobpool As a global index , All unexecuted tasks exist jobPool in .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());
}
jobPool.put(topicId, job);
// 2. take job Add to DelayBucket in , Sort by delay time
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("add delay job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* Delete job Information , Why delete job Information ?
* When we are sure that the last delayed task is unnecessary , We can cancel the execution of delayed tasks in advance .
*
* @param jobDie Meta information
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
// Delete task from global index .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
// from zset Delete task in
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("delete job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}
4.4 Handling thread :
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* Start timing start handling JOB Information
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
//System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
// Take out the expired tasks
RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
if (CollectionUtils.isEmpty(jobCollection)) {
return;
}
// Move the expired tasks to the consumption queue ,zset Delete the task in
List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);
if (CollectionUtils.isEmpty(jobList)) {
return;
}
if (readyQueue.addAll(jobList)) {
bucketSet.removeAllAsync(jobList);
}
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}
4.5 Consumer thread :
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC consumption
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, " Turn on TOPIC Consumer thread ");
}
/**
* Turn on TOPIC Consumer thread
* Put all possible exceptions catch live , Make sure While(true) Can be uninterrupted
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// Distributed lock time ratio Blpop More blocking time 1S, Avoid releasing the lock , The lock has been released overtime ,unlock Report errors
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. obtain ReadyQueue Data to be consumed
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. obtain job Meta information content
RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. consumption
FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "--> consumption JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 Consumption success , Delete JobPool and DelayBucket Of job Information
jobPoolMap.remove(topicId);
} else {
/**
* Retries are 0 Go straight back
*/
if (job.getRetry() == 0) {
return;
}
int retrySum = job.getRetry() + 1;
// 3.2 Consumption failure , Then rejoin according to the strategy Bucket
// If the number of retries is greater than 5, Will jobPool Data deletion in , Persist to DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
// log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 Failed times to update meta information
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}
About lingchuang group (Advance Intelligence Group)
Looking back BREAK AWAY
边栏推荐
- 如何实现一个延时队列 ?
- Principle and general steps of SQL injection
- Implement graph data construction task based on check point
- 矿产行业商业供应链协同系统解决方案:构建数智化供应链平台,保障矿产资源安全供应
- Inside and outside: flow chart drawing elementary: six common mistakes
- Li Kou today's question -1200 Minimum absolute difference
- Opencv learning -- geometric transformation of image processing
- "Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
- 科普达人丨一文看懂阿里云的秘密武器“神龙架构”
- 安信证券手机版下载 网上开户安全吗
猜你喜欢
S2b2b solution for lighting industry: efficiently enable the industrial supply chain and improve the economic benefits of enterprises
基于wifi控制的51单片机温度报警器
一图看懂ThreadLocal
Application of clock wheel in RPC
Embedded software architecture design - function call
周大福践行「百周年承诺」,真诚服务推动绿色环保
Visual studio 2019 (localdb) mssqllocaldb SQL Server 2014 database version is 852 and cannot be opened. This server supports 782
电子元器件B2B商城系统开发:赋能企业构建进销存标准化流程实例
L1-072 scratch lottery
Statistical learning: logistic regression and cross entropy loss (pytoch Implementation)
随机推荐
S2b2b solution for lighting industry: efficiently enable the industrial supply chain and improve the economic benefits of enterprises
Go语言循环语句(第10课下)
The test experience "tortured" by the PMP test is worth your review
Research Report on market supply and demand and strategy of China's plastics and polymer industry
Capvision Rongying's prospectus in Hong Kong was "invalid": it was strictly questioned by the CSRC and required supplementary disclosure
Object.keys()的用法
【云原生】服务网格是什么“格”?
Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs
PyTorch深度学习快速入门教程
被PMP考试“折磨”出来的考试心得,值得你一览
Go development: how to use go singleton mode to ensure the security of high concurrency of streaming media?
World Environment Day | Chow Tai Fook serves wholeheartedly to promote carbon reduction and environmental protection
Hash table
Height residual method
C # realizes FFT forward and inverse transformation and frequency domain filtering
Market trend report, technical innovation and market forecast of tetrabromophthalate (pht4 diol) in China
Start by counting
线程池的使用和原理
APOC自定义函数和过程
tp配置多数据库