当前位置:网站首页>9.延迟队列
9.延迟队列
2022-07-29 04:14:00 【Machoul】
延迟队列
延迟队列的概念
延时队列,队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素是希望 在指定时间到了以后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的 元素的队列。
队列TTL实现延迟队列
创建两个队列 QA 和 QB,两者队列 TTL 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,创建一个死信队列 QD,它们的绑定关系如下:

创建springboot项目,修改pom.xml文件
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
</dependencies>
修改配置文件application.properties
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
添加rabbitmq的配置类TtlQueueConfig
@Configuration
public class TtlQueueConfig {
public static final String X_EXCHANGE = "X";
public static final String QUEUE_A = "QA";
public static final String QUEUE_B = "QB";
public static final String Y_DEAD_LETTER_EXCHANGE ="Y";
public static final String DEAD_LETTER_QUEUE ="QD";
@Bean("xExchange")
public DirectExchange xExchange(){
return new DirectExchange(X_EXCHANGE);
}
@Bean("yExchange")
public DirectExchange yExchange(){
return new DirectExchange(Y_DEAD_LETTER_EXCHANGE);
}
@Bean("queueA")
public Queue queueA(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",10000);
return QueueBuilder.durable(QUEUE_A).withArguments(params).build();
}
@Bean
public Binding queueABindingX(@Qualifier("queueA") Queue queueA,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueA).to(xExchange).with("XA");
}
@Bean("queueB")
public Queue queueB(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
params.put("x-message-ttl",40000);
return QueueBuilder.durable(QUEUE_B).withArguments(params).build();
}
@Bean
public Binding queueBBindingX(@Qualifier("queueB") Queue queueB,@Qualifier("xExchange") DirectExchange xExchange){
return BindingBuilder.bind(queueB).to(xExchange).with("XB");
}
@Bean("queueD")
public Queue queueD(){
return new Queue(DEAD_LETTER_QUEUE);
}
@Bean
public Binding deadLetterBindingQAD(@Qualifier("queueD") Queue queueD,@Qualifier("yExchange") DirectExchange yExchange){
return BindingBuilder.bind(queueD).to(yExchange).with("YD");
}
}
添加生产者代码
@Slf4j
@RequestMapping("ttl")
@RestController
public class SendMsgController {
@Resource
private RabbitTemplate rabbitTemplate;
@GetMapping("/sendMsg/{message}")
public void sendMsg(@PathVariable String message){
log.info("当前时间:{},发送一条消息给两个TTL队列:{}",new Date(),message);
rabbitTemplate.convertAndSend("X","XA","消息来自ttl为10s的队列:"+message);
rabbitTemplate.convertAndSend("X","XB","消息来自ttl为40s的队列:"+message);
}
}
添加消费者代码
@Slf4j
@Component
public class DeadLetterQueueConsumer {
@RabbitListener(queues = "QD")
public void receiveD(Message message){
String msg = new String( message.getBody());
log.info("当前时间:{},收到死信队列信息:{}",new Date(),msg);
}
}
发起一个请求localhost:8080/ttl/sendMsg/hello-world

不过,如果这样使用的话,岂不是每增加一个新的时间需求,就要新增一个队列,这里只有 10S 和 40S 两个时间选项,如果需要一个小时后处理,那么就需要增加 TTL 为一个小时的队列,如果是预定会议室然 后提前通知这样的场景,岂不是要增加无数个队列才能满足需求
延迟队列优化
在这里新增一个队列QC,绑定关系如下,该队列不设置TTL时间

创建配置类MsgTtlQueueConfig
@Component
public class MsgTtlQueueConfig {
public static final String Y_DEAD_LETTER_EXCHANGE = "Y";
public static final String QUEUE_C = "QC";
@Bean("queueC")
public Queue queueC(){
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",Y_DEAD_LETTER_EXCHANGE);
params.put("x-dead-letter-routing-key","YD");
return QueueBuilder.durable(QUEUE_C).withArguments(params).build();
}
@Bean
public Binding queueCBindingX(@Qualifier("queueC") Queue queueC, @Qualifier("xExchange")DirectExchange xExchange){
return BindingBuilder.bind(queueC).to(xExchange).with("XC");
}
}
添加消息生产者代码
@GetMapping("/sendExpirationMsg/{message}/{ttlTime}")
public void sendMsg(@PathVariable String message,@PathVariable String ttlTime){
log.info("当前时间:{},发送一条消息给{}毫秒TTL队列:{}",new Date(),ttlTime,message);
rabbitTemplate.convertAndSend("X","XC",message,correlationData ->{
correlationData.getMessageProperties().setExpiration(ttlTime);
return correlationData;
});
}
发起请求
localhost:8080/ttl/sendExpirationMsg/hello-world-1/20000localhost:8080/ttl/sendExpirationMsg/hello-world-2/2000

看起来似乎没什么问题,但是在最开始的时候,就介绍过如果使用在消息属性上设置 TTL 的方式,消 息可能并不会按时“死亡“,因为 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列, 如果第一个消息的延时时长很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行
延迟插件实现延迟队列
如果不能实现在消息粒度上的 TTL,并使其在设置的 TTL 时间及时死亡,就无法设计成一个通用的延时队列
安装延迟队列插件
下载地址:Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub
启动命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
图示
在这里新增了一个队列 delayed.queue,一个自定义交换机 delayed.exchange,绑定关系如下

案例演示
在我们自定义的交换机中,这是一种新的交换类型,该类型消息支持延迟投递机制 消息传递后并 不会立即投递到目标队列中,而是存储在 mnesia(一个分布式数据系统)表中,当达到投递时间时,才 投递到目标队列中
添加配置文件代码
@Configuration
public class DelayedQueueConfig {
public static final String DELAYED_QUEUE_NAME="delayed.queue";
public static final String DELAYED_EXCHANGE_NAME ="delayed.exchange";
public static final String DELAYED_ROUTING_KEY = "delayed.routingKey";
@Bean
public Queue delayedQueue(){
return new Queue(DELAYED_QUEUE_NAME);
}
@Bean
public CustomExchange delayedExchange(){
Map<String,Object> params = new HashMap<>();
params.put("x-delayed-type","direct");
return new CustomExchange(DELAYED_EXCHANGE_NAME,"x-delayed-message",true,false,params);
}
@Bean
public Binding bindingDelayedQueue(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") CustomExchange delayedExchange){
return BindingBuilder.bind(queue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();
}
}
添加生产者代码
@GetMapping("sendDelayMsg/{message}/{delayTime}")
public void sendMsg(@PathVariable String message , @PathVariable Integer delayTime){
rabbitTemplate.convertAndSend("delayed.exchange","delayed.routingKey",message,correlationData->{
correlationData.getMessageProperties().setDelay(delayTime);
return correlationData;
});
log.info("当前时间:{},发送一条消息给{}毫秒队列delayed.queue:{}",new Date(),delayTime,message);
}
添加消费者代码
@RabbitListener(queues = "delayed.queue")
public void receiveDelayedQueue(Message message){
String msg = new String( message.getBody());
log.info("当前时间:{},收到延时队列信息:{}",new Date(),msg);
}
发起请求
localhost:8080/ttl/sendDelayMsg/hello-world-1/20000localhost:8080/ttl/sendDelayMsg/hello-world-2/2000

第二个消息先被消费掉,符合预期。
总结
延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正 确处理的消息不会被丢弃。另外,通过 RabbitMQ 集群的特性,可以很好的解决单点故障问题,不会因为 单个节点挂掉导致延时队列不可用或者消息丢失。
当然,延时队列还有很多其它选择,比如利用 Java 的 DelayQueue,利用 Redis 的 zset,利用 Quartz 或者利用 kafka 的时间轮,这些方式各有特点,看需要适用的场景
边栏推荐
- Svg -- loading animation
- Shielding ODBC load balancing mode in gbase 8A special scenarios?
- Jenkins 参数化构建中 各参数介绍与示例
- 这个报错是什么鬼啊,不影响执行结果,但是在执行sql时一直报错。。。连接maxComputer是使用
- opengauss预检查安装
- Methods of using multiple deformations on an element
- Note: restframe work records many to one tables, how to serialize in that table (reverse query)
- Pat a1069/b1019 the black hole of numbers
- 数据库SQL语句实现数据分解的函数查询
- Openfeign asynchronous call problem
猜你喜欢

SVG--loading动画

MPU6050

Value transmission and address transmission of C language, pointer of pointer

顺序表和链表
![[Openstack] keystone,nova](/img/de/70b654a29a813c8fe828c4018bd4e7.png)
[Openstack] keystone,nova

编译与链接

The principle of inverse Fourier transform (IFFT) in signal processing
![[kvm] create virtual machine from kickstart file](/img/0e/292ccb6862e29d948ad6ece86b7945.png)
[kvm] create virtual machine from kickstart file

Common components of solder pad (2021.4.6)

通过js来实现一元二次方程的效果,输入a,b,c系数后可计算出x1和x2的值
随机推荐
Lua语言(stm32+2G/4G模块)和C语言(stm32+esp8266)从字符串中提取相关数据的方法-整理
What the hell is this error? It doesn't affect the execution result, but it always reports errors when executing SQL... Connecting maxcomputer uses
The return value of the function is the attention of the pointer, the local variables inside the static limit sub function, and how the pointer to the array represents the array elements
MPU6050
The solution of porting stm32f103zet6 program to c8t6+c8t6 download program flash timeout
不会就坚持67天吧 平方根
flink-sql 如何设置 sql执行超时时间
Change the value of the argument by address through malloc and pointer
有一种密码学专用语言叫做ASN.1
请问,在sql client中,执行insert into select from job时,如何单
Copy products with one click from Taobao, tmall, 1688, wechat, jd.com, Suning, taote and other platforms to pinduoduo platform (batch upload baby details Interface tutorial)
A little understanding of pointer, secondary pointer, wild pointer, pointer as function return value
对一个元素使用多种变形的方法
When array is used as a function parameter, it is better to use the array size as a function parameter
数据集成这个地方的过滤条件该咋写,用的啥语法?sql语法处理bizdate可以不
[kvm] install KVM
[untitled]
The pit I walked through: the first ad Sketchpad
Note: restframe work records many to one tables, how to serialize in that table (reverse query)
Press the missing number of interview question 17.04 | | 260. the number that appears only once (including bit operation knowledge points)