当前位置:网站首页>9. Delay queue
9. Delay queue
2022-07-29 04:15:00 【Machoul】
Delay queue
The concept of delay queue
Delay queue , The inside of the queue is orderly , The most important feature is its delay property , The element in the delay queue is the hope Take out and handle after or before the specified time , Simply speaking , Delay queue is used to store the data that needs to be processed at a specified time Queue element .
queue TTL Implement delay queue
Create two queues QA and QB, Both queues TTL Set as 10S and 40S, Then create a switch X Make friends with dead letter Replacement Y, They are all of the type direct, Create a dead letter queue QD, Their binding relationship is as follows :

establish springboot project , modify pom.xml file
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
Modify the configuration file application.properties
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
add to rabbitmq Configuration class TtlQueueConfig
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE ="Y";
public static final String DEAD_LETTER_QUEUE ="QD";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(params).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean("queueB")
public Queue queueB(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(params).build();
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
Add producer code
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info(" current time :{}, Send a message to two TTL queue :{}",new Date(),message);
rabbitTemplate.convertAndSend("X","XA"," The news comes from ttl by 10s Queues :"+message);
rabbitTemplate.convertAndSend("X","XB"," The news comes from ttl by 40s Queues :"+message);
}
}
Add consumer code
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message){
String msg = new String( message.getBody());
log.info(" current time :{}, Received dead letter queue information :{}",new Date(),msg);
}
}
Make a request localhost:8080/ttl/sendMsg/hello-world

however , If used in this way , Isn't it every time a new time need is added , It's about adding a new queue , There is only 10S and 40S Two time options , If it takes an hour to process , Then we need to increase TTL For an hour of queues , If the meeting room is reserved And then notify such a scenario in advance , Isn't it necessary to add countless queues to meet the demand
Delay queue optimization
Add a new queue here QC, The binding relationship is as follows , The queue is not set TTL Time

Create configuration class MsgTtlQueueConfig
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(params).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
Add message producer code
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info(" current time :{}, Send a message to {} millisecond TTL queue :{}",new Date(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
}
Initiate request
localhost:8080/ttl/sendExpirationMsg/hello-world-1/20000localhost:8080/ttl/sendExpirationMsg/hello-world-2/2000

It seems that there is no problem , But in the beginning , As mentioned above, if you use to set... On message properties TTL The way , eliminate Interest rates may not be on time “ Death “, because RabbitMQ Only the first message is checked for expiration , If it expires, it will be dropped to the dead letter queue , If the delay time of the first message is long , The delay time of the second message is very short , The second message will not be executed first
The delay plug-in implements the delay queue
If it is not possible to achieve TTL, And set it in TTL Time to die in time , You can't design a universal delay queue
Install the delay queue plug-in
Download address :Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
Start command
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Icon
A new queue is added here delayed.queue, A custom switch delayed.exchange, The binding relationship is as follows

Case presentation
In our custom switch , This is a new type of exchange , This type of message supports delayed delivery mechanism After the message is delivered and It will not be delivered to the target queue immediately , It's stored in mnesia( A distributed data system ) In the table , When the delivery time is reached , only Post to the destination queue
Add profile code
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME="delayed.queue";
public static final String DELAYED_EXCHANGE_NAME ="delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> params = new HashMap<>();
params.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,params);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
Add producer code
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message , @PathVariable Integer delayTime){
rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingKey",message,correlationData->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info(" current time :{}, Send a message to {} Millisecond queue delayed.queue:{}",new Date(),delayTime,message);
}
Add consumer code
@RabbitListener(queues = "delayed.queue")
public void receiveDelayedQueue(Message message){
String msg = new String( message.getBody());
log.info(" current time :{}, Receive delay queue information :{}",new Date(),msg);
}
Initiate request
localhost:8080/ttl/sendDelayMsg/hello-world-1/20000localhost:8080/ttl/sendDelayMsg/hello-world-2/2000

The second message is consumed first , In line with expectations .
summary
Delay queues are very useful in situations where delay processing is required , Use RabbitMQ To achieve delay queue can be a good use RabbitMQ Characteristics of , Such as : The message is sent reliably 、 Reliable delivery of news 、 The dead letter queue is used to ensure that the message is consumed at least once and not corrected Ensure that processed messages are not discarded . in addition , adopt RabbitMQ The characteristics of clustering , Can be a good solution to a single point of failure , Not because When a single node hangs up, the delay queue is unavailable or the message is lost .
Of course , There are many other options for delay queues , Such as the use of Java Of DelayQueue, utilize Redis Of zset, utilize Quartz Or make use of kafka Time wheel of , These methods have their own characteristics , Look at the scenarios that need to be applied
边栏推荐
- Methods of using multiple deformations on an element
- 一个公司的面试笔记
- 如何查询版本的提交号
- Fuzzy query of SQL
- Applet: Area scrolling, pull-down refresh, pull-up load more
- Problems encountered in vscode connection SSH
- RMAN do not mark expired backups
- 数据集成这个地方的过滤条件该咋写,用的啥语法?sql语法处理bizdate可以不
- Note: restframe work records many to one tables, how to serialize in that table (reverse query)
- [material delivery UAV] record (ROS + Px4 + yolov5 + esp8266 + steering gear)
猜你喜欢
![[kvm] create virtual machine from kickstart file](/img/0e/292ccb6862e29d948ad6ece86b7945.png)
[kvm] create virtual machine from kickstart file

UnicodeDecodeError: ‘ascii‘ codec can‘t decode byte 0x90 in position 614: ordinal not in range(128)

12.优先级队列和惰性队列

Jenkins 参数化构建中 各参数介绍与示例

RMAN do not mark expired backups

Summary on the thought of double pointer

2021 sist summer camp experience + record post of School of information, Shanghai University of science and technology

STM32F103ZET6程序移植为C8T6+C8T6下载程序flash timeout的解决方案

不会就坚持67天吧 平方根

全屋WiFi方案:Mesh路由器组网和AC+AP
随机推荐
Compilation and linking
11.备份交换机
小程序:区域滚动、下拉刷新、上拉加载更多
这个报错是什么鬼啊,不影响执行结果,但是在执行sql时一直报错。。。连接maxComputer是使用
Some problems about pointers
Pointer variables -printf%d and%p meaning
Pix2.4.8 from start to installation (2021.4.4)
Differences and principles of bio, NiO and AIO
Installation and use of stm32cubemx (5.3.0)
从淘宝,天猫,1688,微店,京东,苏宁,淘特等其他平台一键复制商品到拼多多平台(批量上传宝贝详情接口教程)
SQL server当存储过程接收的参数是int类型时,如何做判断?
Taobao product details interface (product details page data interface)
The pit I walked through: the first ad Sketchpad
AssertionError(“Torch not compiled with CUDA enabled“)
对一个元素使用多种变形的方法
C语言:联合体知识点总结
How to set the SQL execution timeout for flick SQL
VScode连接ssh遇到的问题
Asp.net MVC中文件夹中的控制器如何跳转到根目录的控制器中?
Nacos registry