当前位置:网站首页>Message queue -- the problem introduced: repeated consumption & sequential consumption & distributed transactions
Message queue -- the problem introduced: repeated consumption & sequential consumption & distributed transactions
2022-07-26 20:34:00 【Qihai Jianren】
Last article 《 Getting started with message queuing 》 Mentioned inside , Technology is a double-edged sword ! We use message queue because it brings us many benefits : Decoupling between systems 、 Asynchronous execution 、 Peak clipping and current limiting , But using message queuing may bring some problems : Messages in the message queue are consumed repeatedly 、 The order of messages is guaranteed 、 Data inconsistency caused by asynchronous execution ;
This article will introduce these possible problems after the introduction of consumption queues , And common solutions ;
1. The problem of repeated consumption of news
Message re consumption is after using message queue , A question that must be considered , It is also a serious and common problem ;
for example , For order scenarios , The notification of order results is generally asynchronous , The order center receives the receipt from the third party payment channel , Then it is encapsulated into business order results and delivered to the business party ; At present, the delivery methods I contact are HTTP/DUBBO Callbacks and message queues MQ;
Considering the consumption capacity of downstream business parties ( Consumption speed ), Using message queuing is a better choice ; For example, a page of cumulative consumption activities , Once your order is completed, take a look at the activity page , Sometimes the cumulative amount is immediately added , Sometimes it's delayed for a long time , Why? ? This speed depends on the consumption rate of the message queue , If the consumption is slow and blocked, I will see it later ;
Generally speaking , The receiving mode of message queue is often confirmed manually , This means that if an exception is thrown in the consumer's processing logic , The message will be re delivered ;
Besides , There are usually multiple downstream listening services for payment result messages , When a downstream business party is unavailable for a short time , When it recovers, you may want the message to be delivered again in a certain time interval , At this time, other systems will also receive these consumed messages , You need to consider when other systems receive repeated messages , Whether dirty data will be generated ? Like a message of successful payment, it was delivered twice , Will there be two prizes ;
in fact , Message resending is very common , Service The network jitter , Developer code Bug Etc. may fail to handle , And ask for message resending ; This requires our interface to have a feature —— idempotent ;
idempotent (idempotent、idempotence) It is a concept of mathematics and computer science , Common in abstract algebra ;
The characteristic of an idempotent operation in programming is that any multiple execution has the same effect as one execution ;
Idempotent function , Or idempotent method , It means that the same parameters can be used for repeated execution , And can get the same result function ; These functions do not affect the state of the system , You don't have to worry about repeated execution that will change the system ;
How to realize idempotent ?
General idempotent , It needs to be considered in different scenarios , See if it's a strong check or a weak check ; For example, scenes related to money are crucial , Just do a strong check , Don't use scenes that are not very important, such as message frequency control ( Fatigue control ), You can do weak verification ;
(1) Strong check : For example, you can monitor the success of user payment , Then execute the business order update 、 Distribution of rights and interests 、 Downstream message local persistence DB operation , These operations can be placed in a transaction ;
Every time a message comes, you should take the unique identifier such as the business order number to check the order flow table , See if you have handled this running water , After processing, directly terminate the process and return , Don't go through the following process , If not, the subsequent post-processing logic will be executed ;
(2) Weak check : This simple , Some unimportant scenes , For example, the processing of SMS sending messages , You can put { The receiver ID+ Business scenario ID} As a unique identifier, as Redis Of key Put it in the cache , Set the expiration time ( Do fatigue control ), Go every time you receive an executive SMS message Redis Judge , See if you still need to send text messages to users ;
The reason is weak verification , It's because of the use of KV Even if the Redis It doesn't matter if the internal data is cleared , The most is to spend more and send one more text message at a time , Business is also irrelevant ;
2. Message order consumption problem
About sequential consumption , There are not many such scenarios in the development process , There are more introductions on the Internet binlog Synchronization of , It seems that more scenes are gone ; The normal business scenario is to follow the steps , Natural timing , It is difficult to deliver messages in parallel ;
for instance , We all know that when there is a large amount of data , The pressure of data synchronization is still great , Sometimes a table with a large amount of data needs to synchronize hundreds of millions of data to the downstream , In this case, the data that needs to be synchronized can be connected to the queue , Then the downstream slowly consumes ;
At this time, it is possible to increase a piece of data in the library at the same time 、 Change 、 Delete three operations , But when you send the message, it becomes a change 、 Delete 、 increase , So the data is wrong ;
How to solve the message sequence consumption ?
Several different MQ Refer to this article for the way to implement sequential messages 《 several MQ Implementation of sequential messages 》; Here is a brief introduction RocketMQ A simple implementation inside ;
RocketMQ Of Official documents It gives " Message ordering " The definition of ——
Message ordering refers to a kind of message consumption , Can be consumed in the order in which they are sent ;
for example : An order generates three messages: order creation 、 Order payment 、 Order complete , Consumption can only be meaningful if it is consumed in this order , But at the same time, orders can be consumed in parallel ;RocketMQ Can strictly guarantee the orderly information ;
Sequential messages are divided into Global sequential messages And Partition order messages , Global order refers to a Topic All messages in the following order should be guaranteed ; Partial sequential messages only need to ensure that each group of messages is consumed sequentially .
- Global order For a given Topic, All messages are strictly FIFO (FIFO) Publish and consume in the same order ; Applicable scenario : Low performance requirements , All messages are strictly in accordance with FIFO Principles for news release and consumption scenarios ;
- Division order For a given Topic, All information is based on sharding key Block partition ; Messages in the same partition follow strict FIFO Publish and consume in sequence ;Sharding key It's the key field in the sequential message to distinguish different partitions , And regular news Key It's a completely different concept ; Applicable scenario : High performance requirements , With sharding key As a partition field , In the same block, strictly according to FIFO Principles for news release and consumption scenarios .
If you want to implement global sequential messages , Then only one queue can be used , And individual producers , This can seriously affect performance ;

therefore , We often say sequential messages are only partial sequential messages , In the case of the above example , We don't care about different orders ID The overall consumption order between messages , Just guarantee the same order ID The message can be created according to the order 、 Order payment 、 It's OK to consume in the order of order completion ;
Sequential consumption actually has two core points , One is Producers store in order , The other is Consumers consume in an orderly way ;
(1) Producers send in order
We know RocketMQ Messages produced by producers in will be placed in a queue , Based on queues FIFO The first in first out feature naturally ensures that the order of messages stored in the queue is consistent with the order of messages pulled , therefore , We just need to ensure that a set of the same messages are stored in a given order Into the same queue , It can ensure the orderly storage of producers ;
In the normal mode of sending messages , One topic There are multiple queues under , Producers will use polling to distribute consumption evenly to different queues , And then consumed by different consumers ; Because a group of messages are in different queues , At this time, the natural of queues cannot be used FIFO Features to ensure the ordering of messages ;
RocketMQ Support producers to customize the delivery strategy when delivering messages , That is, select the queue for the current message ;RocketMQ Provide MessageQueueSelector Queue selection mechanism , He has three realizations :

We achieve one MessageQueueSelector Interface ( Use Hash Take the mold ), To ensure that the same order is in the same queue , That is, by order ID% The number of queues gets this ID The index of the queue on which the order is placed in the queue list , Then all messages of the order will be put into the queue ;
in addition , For message producers , Sequential messages must be sent synchronously to ensure that the messages sent by producers are in order , This is also very easy to understand ;
for instance , Yes 2 A queue , Then order ID by 1,2,3 Of the three groups of messages ,1、3 Group messages are based on Hash The module results are stored in the first queue , and 2 Group messages are stored in the second queue , The following figure shows a possible message storage sequence :

Be careful : actually , There are sometimes problems with the queue selector approach ; Our goal is to send messages to the same queue , Imagine if someone broker Hang up , Then the number of queues will be reduced , If the Hash Deliver the surplus , It may cause different messages in the same business before and after the number of queues changes , They are sent to different queues , It leads to the disorder of some short messages ; alike , If you increase the number of queue servers , It will also cause temporary disorder of some messages ;
(2) Consumers consume in an orderly way
The orderly storage of producers realizes , So how to realize the orderly consumption of consumers ?
RockerMQ Of MessageListener The callback function provides two consumption modes , Orderly consumption pattern MessageListenerOrderly And concurrent consumption patterns MessageListenerConcurrently;
Consumers can register MessageListenerOrderly Type callback interface to realize sequential consumption ; If consumers adopt Concurrently Parallel consumption , The order of message consumption is still not guaranteed ;
actually , Every consumer uses thread pool to realize multi-threaded consumption , That is, the consumer side is multi-threaded consumption ; although MessageListenerOrderly It is called orderly consumption pattern , But the thread pool is still used to consume messages ;
MessageListenerConcurrently It is to pull new messages and submit them to the thread pool for consumption , and MessageListenerOrderly It is through Add distribution lock and Local lock Ensure that there is only one thread consuming data on a queue at the same time ;
MessageListenerOrderly The locking mechanism :
- When pulling messages from a queue, consumers first ask Broker The server requests a queue lock , If you apply for a lock , Then pull the message , Otherwise, give up the message pull , Wait until the next queue load cycle (20s) Try again ; This lock makes a MessageQueue You can only be consumed by one consumer client at a time ;
- Suppose the consumer is right messageQueue The locking of has been successful , Then it will start to pull messages , After pulling the message, it will also be submitted to the thread pool on the consumer side for consumption ; But before local consumption , Will get the first messageQueue Corresponding lock object , every last messageQueue Corresponding to a lock object , After obtaining the lock object , Use synchronized Blocking application thread level exclusive lock ; This lock makes it come from the same messageQueue Messages of can only be consumed by one thread in a consuming client at a local time ;
- Add locally synchronized After locking successfully , And judge : If it's broadcast mode , Then directly consume , If it's cluster mode , Then judge if messagequeue Not locked or expired ( Default 30000ms), So delay 100ms Then try again to Broker Apply for lock messageQueue, Resubmit the consumption request after locking successfully ;
For the moment , Consumer use MessageListenerOrderly There are two problems with sequential consumption :
- Used a lot of locks , Reduced throughput ;
- When the previous message consumption is blocked, subsequent messages will be blocked ; If you encounter the message of consumption failure , The current message will be retried automatically ( The interval time is 1 second ), Cannot automatically skip , The maximum number of retries is Integer.MAX_VALUE, This will cause the current queue consumption to pause , Therefore, it is usually necessary to set a maximum number of consumption , And deal with all possible exceptions ;
3. Messages and distributed transactions
What is business ?
A transaction is a series of operations , Or at the same time , Or fail at the same time ; Transaction is to ensure that a series of operations can be performed normally , It must satisfy at the same time ACID characteristic ;
What is distributed transaction ?
Think about it , Your order process may involve 10 Multiple links , You've paid for your order , But your coupon deduction failed , Failed to add points , The former company will be collected , The latter users will be unhappy , But these are all in different services, how to ensure everyone's success ?—— Use distributed transactions ;
The implementation of distributed transactions can be roughly divided into :
- 2pc( Two paragraph submission )
- 3pc( Three paragraph submission )
- TCC(Try、Confirm、Cancel)
- Best effort notification
- XA
- Local message table (ebay Developed by )
- Half the news / Final consistency (MQ)
The introduction of distributed transaction is to solve the transaction problem of a business scenario in a distributed environment , Of course, there are all kinds of disadvantages , for example :
(1) Lock database resources for a long time , This leads to a slow response of the system , It's not going up at the same time .
(2) Network jitter appears brain crack situation , Cause the participants of things , Can't execute the coordinator's instructions very well , Leading to data inconsistency .
(3) A single point of failure , For example, the coordinator of things , At some point, it goes down , Although new... Can be produced through the electoral mechanism Leader, But in the process , There are bound to be problems ;
Here is the simplest 2pc( Two-stage type ) programme , And the combination under the order scenario MQ The final consistency scheme commonly used in middleware , The purpose is to understand distributed transactions and the role of message oriented middleware ;
2pc( Two paragraph submission )

2pc( Two paragraph submission ) It can be said that it is the beginning of distributed transaction ; Pictured , Coordinate multiple systems through message oriented middleware , When both systems operate transactions, they lock resources but do not commit transactions , When both are ready , Tell message middleware , Then separately commit the transaction ;
But the problem is also very detailed , If A System transaction commit succeeded , however B The system fails to submit due to network fluctuation or various reasons , In fact, it will cause the transaction to fail ;
Final consistency

Pictured , In the whole process , We can guarantee that it is :
- The local transaction submission of the business pusher failed , The business receiver will not receive the delivery of the message ;
- As long as the local transaction of the business initiator is executed successfully , Then the message service will deliver the message to the downstream business passive party , And ultimately ensure that the business passive party can successfully consume the message ( Success or failure of consumption , That is, there must be a final state );
That's how technology works , We need to consider all kinds of extreme situations , It's hard to have a perfect plan , That's why there are so many three-stage solutions 、TCC、 Best effort notification and other distributed transaction schemes , You just need to know why , What's the advantage of doing it , What's the harm , In the actual development, pay attention to it , The system is designed according to business scenarios , Technology that leaves the business doesn't make sense , There is no foundation to leave the business of technology ;
Reference resources :
《 Business needs sequential consumption , How to ensure timing ?》
《RocketMQ The order of the message ( Sequential consumption )》 Most of the figures in this article come from this article , Write very well !
边栏推荐
- 破题数据中心绿色可持续发展
- Principle and application of one click login of local number (glory Collection Edition)
- 深度可分离卷积(DepthwiseSeparableConvolution):Depthwise卷积与Pointwise卷积
- hello 你好吗
- 消息队列知识点总结
- 解决IBGP的水平分割和BGP选路原则
- 潘多尼亚精灵 VoxEdit 创作大赛
- Array operations add, delete, modify, and query
- Parallel execution (II). Multiprocessing
- 聊天软件项目开发2
猜你喜欢

打字比赛圆满结束!

Fitting the new direction of curriculum standards, ape guidance, creating a characteristic new concept content system

软件测试-开发提测内容规范(项目提测模板)

消息队列知识点总结

Servlet

Solve the horizontal segmentation of iBGP and BGP routing principles

Gartner released the latest market guide for Chinese AI start-ups, and Hongji cyclone was once again rated as a representative enterprise

第二章:遇到阻难!绕过WAF过滤!【SQL注入攻击】

Introduction to component functions of blueprism process business object Chapter 3 of RPA

「企业管理」精诚CRM+——一体化管理企业业务流程
随机推荐
Experiment 5 OSPF comprehensive experiment
BUU刷题记1
实验5 OSPF综合实验
arpspoof 安装和使用
Task 1 report
打字比赛圆满结束!
使用百度飞桨 EasyDL 完成垃圾分类
小公司小而美的产品,如何突围?
Summary of message queue knowledge points
李彦宏遭“泼冷水”热情不减!百度结盟华为麒麟,发布“鸿鹄”芯片
YGG cooperates with my pet hooligan, AMGI's flagship NFT project, to enter the rabbit hole
Vs how to read data in MySQL (by the way, the problem of Chinese garbled code is solved through code)
【【实验分享】CCIE—BGP路由黑洞实验】
「企业管理」精诚CRM+——一体化管理企业业务流程
81.(cesium之家)cesium修改灰色背景(默认蓝色)
Definition and use of one-dimensional array
Gbase learning - install gbase 8A MPP cluster v95
第二章:遇到阻难!绕过WAF过滤!【SQL注入攻击】
BUU刷题记2
Numpy中ndarray的常见操作