当前位置:网站首页>Message queuing: how to ensure that messages are not lost

Message queuing: how to ensure that messages are not lost

2022-07-07 05:40:00 Qin Tian

Catalog

One 、 Preface

Two 、 Ways to detect message loss

3、 ... and 、 Make sure the message is delivered reliably

1. Production stage

2. Storage phase

3. Consumption stage

Four 、 Summary


One 、 Preface

The most common problems encountered in using message queuing , The biggest headache is the loss of information . For most business systems , Losing a message means losing data , It's totally unacceptable .

Actually , Now the mainstream message queuing products provide a very perfect message reliability guarantee mechanism , It can be done in the process of message delivery , Even if there is a network outage or hardware failure , It also ensures reliable delivery of messages , Don't lose the news .

Most of the reasons for losing messages are that developers are not familiar with message queues , Not using and configuring message queuing properly . Although different message queues provide API Dissimilarity , The related configuration items are also different , But in the area of reliable message delivery , The principle of their implementation is the same .

Now let's explain in detail , How does message queuing ensure reliable message delivery , What's the implementation principle in this . When you know the principles , No matter what kind of message queue you use , Let's take a brief look at its API And related configuration items , You'll soon know how to configure message queuing , Write reliable code , Avoid message loss .

Two 、 Ways to detect message loss

We said , The most embarrassing situation with message queuing is not losing messages , But I don't know if I lost the news . generally speaking , A new system just came online , It's not very stable in all aspects , It needs a running in period , This is the time , In particular, we need to monitor whether there is message loss in your system .

If it is IT Companies with better infrastructure , Generally, there are distributed link tracking systems , Using a similar tracking system, you can easily track every message . If there's no such tracking system , Here I offer a relatively simple method , To check for lost messages .

We can use the orderliness of message queue to verify whether there is message loss . The principle is very simple , stay Producer End , We attach a sequential increasing sequence number to each sent message , And then in Consumer End to check the continuity of this serial number .

If no message is lost ,Consumer The sequence number of the received message must be continuously increasing , Or the news we received , The sequence number must be the sequence number of the previous message +1. If a serial number discontinuity is detected , That's the loss of news . You can also use the missing sequence number to determine which message is missing , It is convenient for further investigation .

Most message queuing clients support the interceptor mechanism , You can use this interceptor mechanism , stay Producer The interceptor before sending the message injects the sequence number into the message , stay Consumer The interceptor that receives the message detects the continuity of the sequence number , The advantage of this implementation is that the message detection code will not intrude into your business code , When your system stabilizes , It is also convenient to close or delete this part of the detection logic .

If the detection method is implemented in a distributed system , There are a few issues that need your attention .

First , image Kafka and RocketMQ Such a message queue , It is not guaranteed in Topic In strict order , Only partition can be guaranteed ( queue ) The news on the is orderly , So we have to specify the partition when sending messages , also , The continuity of message sequence number is detected separately in each partition .

If your system Producer It's multi instance , Because it's not easy to coordinate multiple Producer The order of transmission between , So it also needs every Producer Generate respective message sequence numbers , And you need to add Producer The logo of , stay Consumer According to each Producer To detect the continuity of serial number respectively .

Consumer The number of instances should be the same as the number of partitions , Achieve Consumer One to one correspondence with partition , This will be more convenient in Consumer Check the continuity of the message sequence number inside .

3、 ... and 、 Make sure the message is delivered reliably

The end of the detection of message loss method , Let's take a look at , The whole process of news from production to consumption , Which places may cause loss of information , And how to avoid message loss .

You can take a look at this picture , A message from production to consumption completes the process , It can be divided into three stages , For ease of description , I gave each stage a name .

  • Production stage : At this stage , From the news in Producer created , Over the Internet to Broker End .
  • Storage phase : At this stage , Message in Broker End storage , If it's a cluster , Messages are copied to other copies at this stage .
  • Consumption stage : At this stage ,Consumer from Broker Pull up the news , Over the Internet to Consumer On .

1. Production stage

In the production phase , Message queuing uses the most common request confirmation mechanism , To ensure reliable transmission of messages : When your code calls the send message method , The client of message queuing will send the message to Broker,Broker After receiving the message , A confirmation response will be returned to the client , Indicates that the message has been received . After the client receives the response , Finished sending a normal message .

as long as Producer received Broker Confirmation response for , This ensures that messages are not lost during the production phase . Some message queues don't receive a send confirmation response for a long time , Will automatically retry , If you try again and fail again , The user will be informed by return value or exception .

When you write code to send messages , We need to pay attention to , Handle return values correctly or catch exceptions , We can ensure that the messages at this stage will not be lost .

With Kafka For example , Let's see how to send messages reliably :

 

1) When sending synchronously , Just pay attention to catching exceptions .

try {
    RecordMetadata metadata = producer.send(record).get();
    System.out.println("  Message sent successfully .");
} catch (Throwable e) {
    System.out.println("  Message delivery failed !");
    System.out.println(e);
}

2) When sending asynchronously , You need to check in the callback method . This place needs special attention , The reason for a lot of lost news is that , We used asynchronous sending , It doesn't check the sending result in the callback .

producer.send(record, (metadata, exception) -> {
    if (metadata != null) {
        System.out.println("  Message sent successfully .");
    } else {
        System.out.println("  Message delivery failed !");
        System.out.println(exception);
    }
});

2. Storage phase

In the storage phase, normally , as long as Broker In normal operation , There is no problem of missing messages , But if Broker There's a problem , For example, the process is dead or the server is down , It's still possible to lose information .

If the reliability of the message is very high , Can be configured by Broker Parameters to avoid losing messages due to downtime .

For a single node Broker, Need configuration Broker Parameters , After receiving the message , Write the message to disk and give it to Producer Return confirmation response , So even if there's a outage , Since the message has been written to disk , You don't lose information , After recovery, you can continue to consume . for example , stay RocketMQ in , You need to turn the brush disk mode flushDiskType Configure to SYNC_FLUSH Synchronous brush set .

If it is Broker It's a cluster of multiple nodes , Need to put Broker The cluster is configured to : At least send the message to 2 More than nodes , Then send a confirmation response to the client . So when someone Broker outage , Other Broker Can replace the downtime of Broker, There will be no loss of information .

3. Consumption stage

In the consumption phase, the confirmation mechanism similar to that in the production phase is adopted to ensure the reliable transmission of messages , The client from Broker After pulling the news , Execute the user's consumption business logic , After success , To give Broker Send consumer confirmation response .

If Broker No consumer confirmation response has been received , The next time you pull a message, you will return the same message , Ensure that messages are not lost during network transmission , It will not be lost due to the client's error in executing consumption logic .

What you need to pay attention to when writing consumer code is , Don't send a consumption Confirmation immediately after receiving the message , It should be after all the consumer business logic is executed , Then send the confirmation of consumption .

Again , We use Python Language consumption RabbitMQ Message as an example , Let's see how to implement a reliable consumption code :

You can see , In the callback method of consumption callback in , The correct order is , First, save the message to the database , And then send a confirmation response . So if saving the message to the database fails , The code for consumption confirmation will not be executed , Next time I'll get this message , Until consumption succeeds .

Four 、 Summary

This paper analyzes the whole process of a message from sending to consumption , How does message queuing ensure the reliability of messages , It won't be lost . This process can be divided into three stages , Every stage needs to write the right code and set the right configuration items , In order to cooperate with the reliability mechanism of message queuing , Make sure messages are not lost .

  • In the production phase , You need to catch the message sending error , And resend the message .
  • In the storage phase , You can configure the parameters related to disk brushing and copying , Write messages to multiple copies of disk , To make sure that the message doesn't come from Broker Lost due to downtime or disk damage .
  • In the consumption stage , You need to deal with all the logic of the consumer business , Then send the confirmation of consumption .

After you understand the principles of these stages , If there is another loss of information , It should be possible to add some logs to the code , It's quick to figure out which stage went wrong , And then further analysis , Find the cause of the problem quickly .

def callback(ch, method, properties, body):
    print(" [x]  Received a message  %r" % body)
    #  Processing incoming messages here 
    database.save(body)
    print(" [x]  Consumption complete  ")
    
    #  After completing the consumption business logic, send the consumption confirmation response 
    ch.basic_ack(delivery_tag = method.delivery_tag)
 
channel.basic_consume(queue='hello', on_message_callback=callback)

原网站

版权声明
本文为[Qin Tian]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/188/202207062341084995.html