当前位置:网站首页>9.延迟队列
9.延迟队列
2022-07-29 04:14:00 【Machoul】
延迟队列
延迟队列的概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
队列TTL实现延迟队列
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

创建springboot项目,修改pom.xml文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
修改配置文件application.properties
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
添加rabbitmq的配置类TtlQueueConfig
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE ="Y";
public static final String DEAD_LETTER_QUEUE ="QD";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(params).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean("queueB")
public Queue queueB(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(params).build();
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
添加生产者代码
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
}
}
添加消费者代码
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message){
String msg = new String( message.getBody());
log.info("当前时间:{},收到死信队列信息:{}",new Date(),msg);
}
}
发起一个请求localhost:8080/ttl/sendMsg/hello-world

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求
延迟队列优化
在这里新增一个队列QC,绑定关系如下,该队列不设置TTL时间

创建配置类MsgTtlQueueConfig
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(params).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
添加消息生产者代码
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条消息给{}毫秒TTL队列:{}",new Date(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
}
发起请求
localhost:8080/ttl/sendExpirationMsg/hello-world-1/20000localhost:8080/ttl/sendExpirationMsg/hello-world-2/2000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消 息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
延迟插件实现延迟队列
如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列
安装延迟队列插件
下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
启动命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
图示
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下

案例演示
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中
添加配置文件代码
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME="delayed.queue";
public static final String DELAYED_EXCHANGE_NAME ="delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> params = new HashMap<>();
params.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,params);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
添加生产者代码
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message , @PathVariable Integer delayTime){
rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingKey",message,correlationData->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info("当前时间:{},发送一条消息给{}毫秒队列delayed.queue:{}",new Date(),delayTime,message);
}
添加消费者代码
@RabbitListener(queues = "delayed.queue")
public void receiveDelayedQueue(Message message){
String msg = new String( message.getBody());
log.info("当前时间:{},收到延时队列信息:{}",new Date(),msg);
}
发起请求
localhost:8080/ttl/sendDelayMsg/hello-world-1/20000localhost:8080/ttl/sendDelayMsg/hello-world-2/2000

第二个消息先被消费掉,符合预期。
总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
边栏推荐
- Pointer variables -printf%d and%p meaning
- [kvm] create virtual machine from kickstart file
- 基于STM32和阿里云的环境检测系统设计
- STM32F103ZET6程序移植为C8T6+C8T6下载程序flash timeout的解决方案
- Installation and use of stm32cubemx (5.3.0)
- pat A1041 Be Unique
- 优炫数据库有办法查到主集群每天传给备集群的日志量吗?
- C语言:枚举知识点总结
- Solution: module 'xlrd' has no attribute 'open_ Error reporting of workbook '
- Lua语言(stm32+2G/4G模块)和C语言(stm32+esp8266)从字符串中提取相关数据的方法-整理
猜你喜欢

Applet: Area scrolling, pull-down refresh, pull-up load more

Jenkins 参数化构建中 各参数介绍与示例

安装ros的laser_scan_matche库所遇到的问题(一)

Machine vision Series 2: vs DLL debugging

不会就坚持62天吧 单词之和

基于STM32和阿里云的环境检测系统设计

不会就坚持60天吧 神奇的字典

UnicodeDecodeError: ‘ascii‘ codec can‘t decode byte 0x90 in position 614: ordinal not in range(128)

Whole house WiFi solution: mesh router networking and ac+ap

不会就坚持69天吧 合并区间
随机推荐
不会就坚持61天吧 最短的单词编码
Lua language (stm32+2g/4g module) and C language (stm32+esp8266) methods of extracting relevant data from strings - collation
Pat a1069/b1019 the black hole of numbers
opengauss预检查安装
C语言力扣第61题之旋转链表。双端队列与构造循环链表
The data source is SQL server. I want to configure the incremental data of the last two days of the date field updatedate to add
Code or script to speed up the video playback of video websites
The return value of the function is the attention of the pointer, the local variables inside the static limit sub function, and how the pointer to the array represents the array elements
[kvm] install KVM
Mmdetection preliminary use
Rhel8 patch package production
安装ros的laser_scan_matche库所遇到的问题(一)
Blood cases caused by < meta charset=UTF-8> -- Analysis of common character codes
Locker 2022.1.1
Asp.net MVC中文件夹中的控制器如何跳转到根目录的控制器中?
有一种密码学专用语言叫做ASN.1
不会就坚持58天吧 实现前缀树
[kvm] create virtual machine from kickstart file
Problems encountered in vscode connection SSH
(.*?) regular expression