当前位置:网站首页>死信队列 和消息TTL过期代码
死信队列 和消息TTL过期代码
2022-07-25 23:41:00 【一个风轻云淡】
死信的概念
先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer将消息投递到broker或者直接到queue里了,consumer从queue取出消息进行消费,但某些时候由于特定的原因导致queue中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
应用场景:为了保证订单业务的消息数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效
死信的来源
消息TTL过期
队列达到最大长度(队列满了,无法再添加数据到mq中)
消息被拒绝(basic.reject或basic.nack)并且requeue=false.
消息TTL过期代码实战:
工具类:
public class untils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.231.133");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
生产者:
public class Producer {
private static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//设置TTL时间
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
//该消息用作队列的个数限制
for(int i=0;i<10;i++)
{
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
System.out.println("生产者发送消息"+message);
}
}
}消费者:
public class Consumer01 {
//普通交换机
private static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机
private static final String DEAD_EXCHANGE="dead_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
//声明死信交换机,类型为direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明死信队列
String deadQueue="dead_queue";
channel.queueDeclare(deadQueue,false,false,false,null);
//死信队列绑定交换和routingKey值
channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
//正常队列绑定死信队列
Map<String,Object> params=new HashMap<>();
//正常队列设置死信交换机,参数key是固定值
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
//正常队列设置死信routing-key,参数key是固定值
params.put("x-dead-letter-routing-key", "lisi");
System.out.println("等待接收消息....");
String normalQueue="normal_queue";
channel.queueDeclare(normalQueue,false,false,false,params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
DeliverCallback deliverCallback=(consumerTag, message) -> {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("01接收到消息"+message);
};
channel.basicConsume(normalQueue,true,deliverCallback,consumerTag -> {});
}
}
/**
* 死信队列消费者
*/
public class Consumer02 {
//死信交换机
private static final String DEAD_EXCHANGE="dead_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
//声明死信交换机,类型为direct
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
String dealQueue="dead_queue";
DeliverCallback deliverCallback=(consumerTag, message) -> {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("01接收到消息"+s);
};
channel.basicConsume(dealQueue,true,deliverCallback,consumerTag -> {});
}
}正常情况:(不关闭消费者,只开启消费者1.消费者2不开启)

正常情况结束以后:
关闭消费者01,再一次开启生产者:

可以看到正常队列这个时候还存在10个等待消费
10s以后可以发现,这个队列跑到了死信队列

这个时候开启消费者02,即死信队列的消费者

可以看到死信队列的被消费变为0
边栏推荐
- 《数据密集型应用系统设计》 - 应用系统概览
- S4/hana mm & SD EDI Nast based integrated configuration (orders, ordrsp, desadv, invoice)
- Write a select drop-down list
- 在应用中使用 Jetpack 库
- 新手开户选择哪个券商公司好呢?安全吗
- Data intensive application system design - Application System Overview
- How to solve cross domain problems
- [QNX Hypervisor 2.2用户手册]9.8 load
- Moment.js
- 热部署和热加载有什么区别?
猜你喜欢

Qt风格(QSS)应用之QProgressBar

redis-扩展数据类型(跳跃表/BitMaps/HyperLogLog/GeoSpatial)

S4/HANA ME21N创建PO 输出控制消息按钮丢失解决方法(切换EDI 输出模式BRF+至NAST模式)

XXE&XML-外部实体注入-利用和绕过

Numerical learning iota, accumulate

S4/hana mm & SD EDI Nast based integrated configuration (orders, ordrsp, desadv, invoice)

疫情之下的好消息

ES6 syntax (difference between let, const, VaR, deconstruction assignment, arrow function, residual parameters, extension method of array)

LeetCode 0919. 完全二叉树插入器:完全二叉树的数组表示

图的遍历-DFS,BFS(代码详解)
随机推荐
Good news under the epidemic
TS class
ratio学习之ratio_add,ratio_subtract,ratio_multiply,ratio_divide的使用
[testing technology automated testing pytest] basic summary of pytest
Learning exploration-3d rotation card
Promise resolve callback hell, async await modifier
Leetcode 0136. numbers that appear only once: XOR
赋值时'1和'b1有什么区别
Docker 安装 Redis-5.0.12(远程访问)
TS function
从哪些维度评判代码质量的好坏?如何具备写出高质量代码的能力?
Recursion of function (use recursion to find the factorial of 1-N) (use recursion to find Fibonacci sequence) (use recursion to traverse data)
【MUDUO】EventLoopThreadPool
ABAP 代码中读取会计科目的字段状态(隐藏、可选、必输)
Multimodal deep multi modal sets
疫情之下的好消息
Taobao flexible.js file realizes flexible layout
2022 Niuke multi School Game 2
Imitating the magnifying glass effect of JD products -- JS Foundation
[QNX hypervisor 2.2 user manual]9.7 generate