当前位置:网站首页>Message queuing: how to ensure that messages are not lost
Message queuing: how to ensure that messages are not lost
2022-07-07 05:40:00 【Qin Tian】
Catalog
Two 、 Ways to detect message loss
3、 ... and 、 Make sure the message is delivered reliably
One 、 Preface
The most common problems encountered in using message queuing , The biggest headache is the loss of information . For most business systems , Losing a message means losing data , It's totally unacceptable .
Actually , Now the mainstream message queuing products provide a very perfect message reliability guarantee mechanism , It can be done in the process of message delivery , Even if there is a network outage or hardware failure , It also ensures reliable delivery of messages , Don't lose the news .
Most of the reasons for losing messages are that developers are not familiar with message queues , Not using and configuring message queuing properly . Although different message queues provide API Dissimilarity , The related configuration items are also different , But in the area of reliable message delivery , The principle of their implementation is the same .
Now let's explain in detail , How does message queuing ensure reliable message delivery , What's the implementation principle in this . When you know the principles , No matter what kind of message queue you use , Let's take a brief look at its API And related configuration items , You'll soon know how to configure message queuing , Write reliable code , Avoid message loss .
Two 、 Ways to detect message loss
We said , The most embarrassing situation with message queuing is not losing messages , But I don't know if I lost the news . generally speaking , A new system just came online , It's not very stable in all aspects , It needs a running in period , This is the time , In particular, we need to monitor whether there is message loss in your system .
If it is IT Companies with better infrastructure , Generally, there are distributed link tracking systems , Using a similar tracking system, you can easily track every message . If there's no such tracking system , Here I offer a relatively simple method , To check for lost messages .
We can use the orderliness of message queue to verify whether there is message loss . The principle is very simple , stay Producer End , We attach a sequential increasing sequence number to each sent message , And then in Consumer End to check the continuity of this serial number .
If no message is lost ,Consumer The sequence number of the received message must be continuously increasing , Or the news we received , The sequence number must be the sequence number of the previous message +1. If a serial number discontinuity is detected , That's the loss of news . You can also use the missing sequence number to determine which message is missing , It is convenient for further investigation .
Most message queuing clients support the interceptor mechanism , You can use this interceptor mechanism , stay Producer The interceptor before sending the message injects the sequence number into the message , stay Consumer The interceptor that receives the message detects the continuity of the sequence number , The advantage of this implementation is that the message detection code will not intrude into your business code , When your system stabilizes , It is also convenient to close or delete this part of the detection logic .
If the detection method is implemented in a distributed system , There are a few issues that need your attention .
First , image Kafka and RocketMQ Such a message queue , It is not guaranteed in Topic In strict order , Only partition can be guaranteed ( queue ) The news on the is orderly , So we have to specify the partition when sending messages , also , The continuity of message sequence number is detected separately in each partition .
If your system Producer It's multi instance , Because it's not easy to coordinate multiple Producer The order of transmission between , So it also needs every Producer Generate respective message sequence numbers , And you need to add Producer The logo of , stay Consumer According to each Producer To detect the continuity of serial number respectively .
Consumer The number of instances should be the same as the number of partitions , Achieve Consumer One to one correspondence with partition , This will be more convenient in Consumer Check the continuity of the message sequence number inside .
3、 ... and 、 Make sure the message is delivered reliably
The end of the detection of message loss method , Let's take a look at , The whole process of news from production to consumption , Which places may cause loss of information , And how to avoid message loss .
You can take a look at this picture , A message from production to consumption completes the process , It can be divided into three stages , For ease of description , I gave each stage a name .
- Production stage : At this stage , From the news in Producer created , Over the Internet to Broker End .
- Storage phase : At this stage , Message in Broker End storage , If it's a cluster , Messages are copied to other copies at this stage .
- Consumption stage : At this stage ,Consumer from Broker Pull up the news , Over the Internet to Consumer On .
1. Production stage
In the production phase , Message queuing uses the most common request confirmation mechanism , To ensure reliable transmission of messages : When your code calls the send message method , The client of message queuing will send the message to Broker,Broker After receiving the message , A confirmation response will be returned to the client , Indicates that the message has been received . After the client receives the response , Finished sending a normal message .
as long as Producer received Broker Confirmation response for , This ensures that messages are not lost during the production phase . Some message queues don't receive a send confirmation response for a long time , Will automatically retry , If you try again and fail again , The user will be informed by return value or exception .
When you write code to send messages , We need to pay attention to , Handle return values correctly or catch exceptions , We can ensure that the messages at this stage will not be lost .
With Kafka For example , Let's see how to send messages reliably :
1) When sending synchronously , Just pay attention to catching exceptions .
try {
RecordMetadata metadata = producer.send(record).get();
System.out.println(" Message sent successfully .");
} catch (Throwable e) {
System.out.println(" Message delivery failed !");
System.out.println(e);
}
2) When sending asynchronously , You need to check in the callback method . This place needs special attention , The reason for a lot of lost news is that , We used asynchronous sending , It doesn't check the sending result in the callback .
producer.send(record, (metadata, exception) -> {
if (metadata != null) {
System.out.println(" Message sent successfully .");
} else {
System.out.println(" Message delivery failed !");
System.out.println(exception);
}
});
2. Storage phase
In the storage phase, normally , as long as Broker In normal operation , There is no problem of missing messages , But if Broker There's a problem , For example, the process is dead or the server is down , It's still possible to lose information .
If the reliability of the message is very high , Can be configured by Broker Parameters to avoid losing messages due to downtime .
For a single node Broker, Need configuration Broker Parameters , After receiving the message , Write the message to disk and give it to Producer Return confirmation response , So even if there's a outage , Since the message has been written to disk , You don't lose information , After recovery, you can continue to consume . for example , stay RocketMQ in , You need to turn the brush disk mode flushDiskType Configure to SYNC_FLUSH Synchronous brush set .
If it is Broker It's a cluster of multiple nodes , Need to put Broker The cluster is configured to : At least send the message to 2 More than nodes , Then send a confirmation response to the client . So when someone Broker outage , Other Broker Can replace the downtime of Broker, There will be no loss of information .
3. Consumption stage
In the consumption phase, the confirmation mechanism similar to that in the production phase is adopted to ensure the reliable transmission of messages , The client from Broker After pulling the news , Execute the user's consumption business logic , After success , To give Broker Send consumer confirmation response .
If Broker No consumer confirmation response has been received , The next time you pull a message, you will return the same message , Ensure that messages are not lost during network transmission , It will not be lost due to the client's error in executing consumption logic .
What you need to pay attention to when writing consumer code is , Don't send a consumption Confirmation immediately after receiving the message , It should be after all the consumer business logic is executed , Then send the confirmation of consumption .
Again , We use Python Language consumption RabbitMQ Message as an example , Let's see how to implement a reliable consumption code :
You can see , In the callback method of consumption callback in , The correct order is , First, save the message to the database , And then send a confirmation response . So if saving the message to the database fails , The code for consumption confirmation will not be executed , Next time I'll get this message , Until consumption succeeds .
Four 、 Summary
This paper analyzes the whole process of a message from sending to consumption , How does message queuing ensure the reliability of messages , It won't be lost . This process can be divided into three stages , Every stage needs to write the right code and set the right configuration items , In order to cooperate with the reliability mechanism of message queuing , Make sure messages are not lost .
- In the production phase , You need to catch the message sending error , And resend the message .
- In the storage phase , You can configure the parameters related to disk brushing and copying , Write messages to multiple copies of disk , To make sure that the message doesn't come from Broker Lost due to downtime or disk damage .
- In the consumption stage , You need to deal with all the logic of the consumer business , Then send the confirmation of consumption .
After you understand the principles of these stages , If there is another loss of information , It should be possible to add some logs to the code , It's quick to figure out which stage went wrong , And then further analysis , Find the cause of the problem quickly .
def callback(ch, method, properties, body):
print(" [x] Received a message %r" % body)
# Processing incoming messages here
database.save(body)
print(" [x] Consumption complete ")
# After completing the consumption business logic, send the consumption confirmation response
ch.basic_ack(delivery_tag = method.delivery_tag)
channel.basic_consume(queue='hello', on_message_callback=callback)
边栏推荐
- “多模态”概念
- 4. Object mapping Mapster
- Design, configuration and points for attention of network specified source multicast (SSM) simulation using OPNET
- C#可空类型
- English语法_名词 - 所有格
- JVM(十九) -- 字节码与类的加载(四) -- 再谈类的加载器
- C nullable type
- Distributed global ID generation scheme
- app clear data源码追踪
- 京东商品详情页API接口、京东商品销量API接口、京东商品列表API接口、京东APP详情API接口、京东详情API接口,京东SKU信息接口
猜你喜欢
Common skills and understanding of SQL optimization
SQL query: subtract the previous row from the next row and make corresponding calculations
随机生成session_id
Différenciation et introduction des services groupés, distribués et microservices
CVE-2021-3156 漏洞复现笔记
基于NCF的多模块协同实例
分布式事务解决方案之2PC
Unity keeps the camera behind and above the player
Mapbox Chinese map address
不同网段之间实现GDB远程调试功能
随机推荐
Senior programmers must know and master. This article explains in detail the principle of MySQL master-slave synchronization, and recommends collecting
Web Authentication API兼容版本信息
batch size设置技巧
消息队列:如何确保消息不会丢失
【js组件】date日期显示。
JSP setting header information export to excel
Taobao Commodity details page API interface, Taobao Commodity List API interface, Taobao Commodity sales API interface, Taobao app details API interface, Taobao details API interface
说一说MVCC多版本并发控制器?
app clear data源码追踪
Cve-2021-3156 vulnerability recurrence notes
Distributed global ID generation scheme
Design, configuration and points for attention of network specified source multicast (SSM) simulation using OPNET
What are the common message queues?
基于 hugging face 预训练模型的实体识别智能标注方案:生成doccano要求json格式
JVM(十九) -- 字节码与类的加载(四) -- 再谈类的加载器
Design, configuration and points for attention of network arbitrary source multicast (ASM) simulation using OPNET
Annotation初体验
Two person game based on bevy game engine and FPGA
Initial experience of annotation
【oracle】简单的日期时间的格式化与排序问题