当前位置:网站首页>Dead letter queue and message TTL expiration code
Dead letter queue and message TTL expiration code
2022-07-25 23:48:00 【A light wind and light clouds】
The concept of dead letter
First clarify this definition from the conceptual explanation , Dead letter , As the name suggests, it is news that cannot be consumed , The literal meaning can be understood in this way , Generally speaking ,producer Deliver a message to broker Or go straight to queue In the ,consumer from queue Take out the message for consumption , But sometimes for certain reasons queue Some messages in cannot be consumed , If there is no subsequent processing for such a message , It becomes a dead letter , If there is a dead letter, there will be a dead letter queue .
Application scenarios : In order to ensure that the message data of the order business is not lost , Need to be used RabbitMQ The dead letter queue mechanism of , When message consumption is abnormal , Put the message in the dead letter queue . And, for example : After the user successfully places an order in the mall and clicks to pay, it will automatically become invalid if the user fails to pay within the specified time
The source of the dead letter
news TTL Be overdue
The queue has reached its maximum length ( The queue is full , No more data can be added to mq in )
The news was rejected (basic.reject or basic.nack) also requeue=false.
news TTL Expired code practice :
Tool class :
public class untils {
public static Channel getChannel() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.231.133");
factory.setUsername("admin");
factory.setPassword("123");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
return channel;
}
}
producer :
public class Producer {
private static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
// Set up TTL Time
AMQP.BasicProperties properties=new AMQP.BasicProperties().builder().expiration("10000").build();
// This message is used as the number limit of queues
for(int i=0;i<10;i++)
{
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes(StandardCharsets.UTF_8));
System.out.println(" Producer sends message "+message);
}
}
}consumer :
public class Consumer01 {
// General switch
private static final String NORMAL_EXCHANGE="normal_exchange";
// Dead letter switch
private static final String DEAD_EXCHANGE="dead_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
// Declare dead letter switch , The type is direct
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
// Declare dead letter queue
String deadQueue="dead_queue";
channel.queueDeclare(deadQueue,false,false,false,null);
// Dead letter queue binding exchange and routingKey value
channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
// The normal queue is bound to the dead letter queue
Map<String,Object> params=new HashMap<>();
// Set the dead letter switch in the normal queue , Parameters key It's a fixed value
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
// Set dead letter in normal queue routing-key, Parameters key It's a fixed value
params.put("x-dead-letter-routing-key", "lisi");
System.out.println(" Waiting to receive message ....");
String normalQueue="normal_queue";
channel.queueDeclare(normalQueue,false,false,false,params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
DeliverCallback deliverCallback=(consumerTag, message) -> {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("01 Message received "+message);
};
channel.basicConsume(normalQueue,true,deliverCallback,consumerTag -> {});
}
}
/**
* Dead letter queue consumers
*/
public class Consumer02 {
// Dead letter switch
private static final String DEAD_EXCHANGE="dead_exchange";
public static void main(String[] args) throws Exception{
Channel channel = untils.getChannel();
// Declare dead letter switch , The type is direct
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
String dealQueue="dead_queue";
DeliverCallback deliverCallback=(consumerTag, message) -> {
String s = new String(message.getBody(), StandardCharsets.UTF_8);
System.out.println("01 Message received "+s);
};
channel.basicConsume(dealQueue,true,deliverCallback,consumerTag -> {});
}
}Normal condition :( Don't shut down consumers , Only open consumers 1. consumer 2 Don't open )

After the normal situation is over :
Shut down consumers 01, Start the producer again :

You can see that the normal queue still exists at this time 10 Waiting for consumption
10s Later we can find , This queue went to the dead letter queue

This is the time to start consumers 02, That is, consumers in the dead letter queue

You can see that the consumption of the dead letter queue becomes 0
边栏推荐
- Qpprogressbar for QT style (QSS) application
- Native JS perfectly realizes deep copy
- Static agent + dynamic agent
- 1913. Maximum product difference between two number pairs - no sorting required
- 从哪些维度评判代码质量的好坏?如何具备写出高质量代码的能力?
- 赋值时'1和'b1有什么区别
- [code case] blog page design (with complete source code)
- 1913. 两个数对之间的最大乘积差-无需排序法
- 热部署和热加载有什么区别?
- A long detailed explanation of C language operators
猜你喜欢

Learning exploration-3d rotation card

Zhiniu stock -- 09

Leetcode 0135. distribute candy

智牛股--09

红娘的话
![[Database Foundation] summary of MySQL Foundation](/img/89/e22c232b0183eaae35a9f45a40ff36.jpg)
[Database Foundation] summary of MySQL Foundation
![[day.2] Joseph Ring problem, how to use arrays to replace circular linked lists (detailed explanation)](/img/2b/b354e52a9eb1d53475fa8d0339d33b.jpg)
[day.2] Joseph Ring problem, how to use arrays to replace circular linked lists (detailed explanation)

调用钉钉api报错:机器人发送签名过期;solution:签名生成时间和发送时间请保持在 timestampms 以内

String functions and memory operation functions

Bubble sort idea and Implementation
随机推荐
What is the difference between hot deployment and hot loading?
Demo of pointer function
The late Apple co-founder Steve Jobs was posthumously awarded the U.S. presidential medal of freedom
Generating random number random learning uniform_ int_ distribution,uniform_ real_ distribution
Grain Academy p98 trample pit e.globalexceptionhandler: null
LeetCode 0919. 完全二叉树插入器:完全二叉树的数组表示
[debug bug] JS: getFullYear is not a function
Moment.js
Interview focus - TCP protocol of transport layer
S4/HANA MM & SD EDI基于NAST的集成配置(ORDERS, ORDRSP, DESADV, INVOIC)
利用用户脚本优化 Yandere/Konachan 站点浏览体验
Scroll case: return to top with animation
762. Prime number calculation setting in binary representation
程序员面试金典 4.12 求和路径
Duplicate numbers in array
统计之歌 歌词
R语言安装教程 | 图文介绍超详细
Redis basic data type (string/list/set/hash/zset)
Release of v6.5.1/2/3 series of versions of Xingyun housekeeper: the ability of database OpenAPI continues to be strengthened
什么叫做 inode ?带你理解 inode 和对于创建文件和删除文件时 inode 都提供了哪些帮助。