当前位置:网站首页>How to implement a delay queue?
How to implement a delay queue?
2022-07-04 17:02:00 【InfoQ】
One 、 What is delay queue

Two 、 Scenarios for delay queues ?
2.1 For example, overtime customs clearance :
2.2 For example, callback retry :
2.3 Meeting reminder :
2.4 Various delay reminders :
3、 ... and 、 Implementation of delay queue ?
Four 、 How to use redis Implement delay queue ?
4.1 We use redisson To achieve , First introduce redisson package
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.3</version>
</dependency>
4.2、 To configure redisson
redisson:
redisModel: SINGLE
singleConfig:
address: redis://127.0.0.1:6379
4.3 Delay service : Add delay task , Cancel delay task
public class RedisDelayQueueServiceImpl implements RedisDelayQueueService {
@Autowired
private RedissonClient redissonClient;
/**
* Add a delay task , add to job Meta information
*
* @param job Meta information ,job Contained in the taskId Primary key ,topicId queue id,retryNum Retry count ,delaytime Delay Time etc. , Not expanded here .
*/
@Override
public void addJob(Job job) {
// Add distributed lock , Prevent repeated addition of tasks
RLock lock = redissonClient.getLock(RedisQueueKey.ADD_JOB_LOCK + job.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(job.getTopic(), job.getJobId());
// 1. take job Add to JobPool in ,jobpool As a global index , All unexecuted tasks exist jobPool in .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
if (jobPool.get(topicId) != null) {
throw new BizException(ErrorMessageEnum.JOB_ALREADY_EXIST.getInfo());
}
jobPool.put(topicId, job);
// 2. take job Add to DelayBucket in , Sort by delay time
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(job.getDelay(), topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("add delay job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
/**
* Delete job Information , Why delete job Information ?
* When we are sure that the last delayed task is unnecessary , We can cancel the execution of delayed tasks in advance .
*
* @param jobDie Meta information
*/
@Override
public void deleteJob(JobDie jobDie) {
RLock lock = redissonClient.getLock(RedisQueueKey.DELETE_JOB_LOCK + jobDie.getJobId());
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
String topicId = RedisQueueKey.getTopicId(jobDie.getTopic(), jobDie.getJobId());
// Delete task from global index .
RMap<String, Job> jobPool = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
jobPool.remove(topicId);
// from zset Delete task in
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.remove(topicId);
} catch (InterruptedException e) {
log.error("addJob error", e);
throw new BizException("delete job error,reason:" + e.getMessage());
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}
4.4 Handling thread :
@Slf4j
@Component
public class CarryJobScheduled {
@Autowired
private RedissonClient redissonClient;
/**
* Start timing start handling JOB Information
*/
@Scheduled(cron = "*/1 * * * * *")
public void carryJobToQueue() {
//System.out.println("carryJobToQueue --->");
RLock lock = redissonClient.getLock(RedisQueueKey.CARRY_THREAD_LOCK);
try {
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
throw new BizException(ErrorMessageEnum.ACQUIRE_LOCK_FAIL.getInfo());
}
// Take out the expired tasks
RScoredSortedSet<Object> bucketSet = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
long now = System.currentTimeMillis();
Collection<Object> jobCollection = bucketSet.valueRange(0, false, now, true);
if (CollectionUtils.isEmpty(jobCollection)) {
return;
}
// Move the expired tasks to the consumption queue ,zset Delete the task in
List<String> jobList = jobCollection.stream().map(String::valueOf).collect(Collectors.toList());
RList<String> readyQueue = redissonClient.getList(RedisQueueKey.RD_LIST_TOPIC_PRE);
if (CollectionUtils.isEmpty(jobList)) {
return;
}
if (readyQueue.addAll(jobList)) {
bucketSet.removeAllAsync(jobList);
}
} catch (InterruptedException e) {
log.error("carryJobToQueue error", e);
} finally {
if (lock != null) {
lock.unlock();
}
}
}
}
4.5 Consumer thread :
@Slf4j
@Component
public class ReadyQueueContext {
@Autowired
private RedissonClient redissonClient;
@Autowired
private ConsumerService consumerService;
/**
* TOPIC consumption
*/
@PostConstruct
public void startTopicConsumer() {
TaskManager.doTask(this::runTopicThreads, " Turn on TOPIC Consumer thread ");
}
/**
* Turn on TOPIC Consumer thread
* Put all possible exceptions catch live , Make sure While(true) Can be uninterrupted
*/
@SuppressWarnings("InfiniteLoopStatement")
private void runTopicThreads() {
while (true) {
RLock lock = null;
try {
lock = redissonClient.getLock(RedisQueueKey.CONSUMER_TOPIC_LOCK);
} catch (Exception e) {
log.error("runTopicThreads getLock error", e);
}
try {
if (lock == null) {
continue;
}
// Distributed lock time ratio Blpop More blocking time 1S, Avoid releasing the lock , The lock has been released overtime ,unlock Report errors
boolean lockFlag = lock.tryLock(RedisQueueKey.LOCK_WAIT_TIME, RedisQueueKey.LOCK_RELEASE_TIME, TimeUnit.SECONDS);
if (!lockFlag) {
continue;
}
// 1. obtain ReadyQueue Data to be consumed
RBlockingQueue<String> queue = redissonClient.getBlockingQueue(RedisQueueKey.RD_LIST_TOPIC_PRE);
String topicId = queue.poll(60, TimeUnit.SECONDS);
if (StringUtils.isEmpty(topicId)) {
continue;
}
// 2. obtain job Meta information content
RMap<String, Job> jobPoolMap = redissonClient.getMap(RedisQueueKey.JOB_POOL_KEY);
Job job = jobPoolMap.get(topicId);
// 3. consumption
FutureTask<Boolean> taskResult = TaskManager.doFutureTask(() -> consumerService.consumerMessage(job.getJobId(),job.getTopic(), job.getBody()), job.getTopic() + "--> consumption JobId-->" + job.getJobId());
if (taskResult.get()) {
// 3.1 Consumption success , Delete JobPool and DelayBucket Of job Information
jobPoolMap.remove(topicId);
} else {
/**
* Retries are 0 Go straight back
*/
if (job.getRetry() == 0) {
return;
}
int retrySum = job.getRetry() + 1;
// 3.2 Consumption failure , Then rejoin according to the strategy Bucket
// If the number of retries is greater than 5, Will jobPool Data deletion in , Persist to DB
if (retrySum > RetryStrategyEnum.RETRY_FIVE.getRetry()) {
jobPoolMap.remove(topicId);
continue;
}
job.setRetry(retrySum);
long nextTime = job.getDelay() + RetryStrategyEnum.getDelayTime(job.getRetry()) * 1000;
// log.info("next retryTime is [{}]", DateUtil.long2Str(nextTime));
RScoredSortedSet<Object> delayBucket = redissonClient.getScoredSortedSet(RedisQueueKey.RD_ZSET_BUCKET_PRE);
delayBucket.add(nextTime, topicId);
// 3.3 Failed times to update meta information
jobPoolMap.put(topicId, job);
}
} catch (Exception e) {
log.error("runTopicThreads error", e);
} finally {
if (lock != null) {
try {
lock.unlock();
} catch (Exception e) {
log.error("runTopicThreads unlock error", e);
}
}
}
}
}
About lingchuang group (Advance Intelligence Group)
Looking back BREAK AWAY
边栏推荐
- 散列表
- Maximum subarray and matrix multiplication
- Yanwen logistics plans to be listed on Shenzhen Stock Exchange: it is mainly engaged in international express business, and its gross profit margin is far lower than the industry level
- Go language loop statement (under Lesson 10)
- Understand asp Net core - Authentication Based on jwtbearer
- "Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
- Inside and outside: flow chart drawing elementary: six common mistakes
- Integration of ongdb graph database and spark
- Linear time sequencing
- Blood spitting finishing nanny level series tutorial - play Fiddler bag grabbing tutorial (2) - first meet fiddler, let you have a rational understanding
猜你喜欢
Start by counting
照明行业S2B2B解决方案:高效赋能产业供应链,提升企业经济效益
周大福践行「百周年承诺」,真诚服务推动绿色环保
DIY a low-cost multi-functional dot matrix clock!
【云原生】服务网格是什么“格”?
昆明三环闭合工程将经过这些地方,有在你家附近的吗?
"Cannot initialize Photoshop because the temporary storage disk is full" graphic solution
新的职业已经出现,怎么能够停滞不前 ,人社部公布建筑新职业
从数数开始
Visual Studio 2019 (LocalDB)MSSQLLocalDB SQL Server 2014 数据库版本为852无法打开,此服务器支持782
随机推荐
Object. Usage of keys()
表单传递时,如何隐式将值传过去
Maximum subarray and matrix multiplication
Research Report on plastic recycling machine industry - market status analysis and development prospect forecast
Jump table instance
~89 deformation translation
基于wifi控制的51单片机温度报警器
VMware Tools和open-vm-tools的安装与使用:解决虚拟机不全屏和无法传输文件的问题
Understand Alibaba cloud's secret weapon "dragon architecture" in the article "science popularization talent"
I let the database lock the table! Almost fired!
Market trend report, technical innovation and market forecast of taillight components in China
昆明三环闭合工程将经过这些地方,有在你家附近的吗?
Smart Logistics Park supply chain management system solution: digital intelligent supply chain enables a new supply chain model for the logistics transportation industry
祝贺Artefact首席数据科学家张鹏飞先生荣获 Campaign Asia Tech MVP 2022
Go语言循环语句(第10课下)
周大福践行「百周年承诺」,真诚服务推动绿色环保
Cypher task design and task locking mechanism of isomorphic and heterogeneous graphs
被PMP考试“折磨”出来的考试心得,值得你一览
C # realizes FFT forward and inverse transformation and frequency domain filtering
Accounting regulations and professional ethics [7]