当前位置:网站首页>Two solutions for reliable message consistency
Two solutions for reliable message consistency
2022-06-11 03:41:00 【PostTruth】
" The final consistency of reliable information " To solve the problem Producer End of the message Send to Local transaction execution The atomicity of , It's a flexible business , It belongs to asynchronous guaranteed type , Soft state , Final agreement .
** The typical scenario of the problem is :** Local to DB Insert a record in , At the same time to MQ Send a message in , It must be ensured that both succeed or fail at the same time . because DB and MQ It's a different system , May insert DB success , But send a message to MQ Failure in ; It is also possible to insert DB Failure , But send a message to MQ success . How to ensure the consistency of the two , It has become the problem we have to solve .
This article explains in depth how to implement various implementation schemes for reliable message consistency :
- Local transaction table
- RocketMQ Transaction messages in
- Binlog Subscription resolution
1 Local transaction table
To solve Producer The atomicity of message sending and local transaction execution at the end , A typical idea is , Let's start with the news The staging To a place , Before the local transaction execution completes , This message is invisible to consumers . Only after the local transaction confirms that the execution is successful , Consumers can only consume this message .
Here is a pseudocode to demonstrate this process :

- Send a message in advance , Pay attention to the use of **" Pre send ”** This keyword , Pre sent messages , Consumers can't see , Therefore, it will not consume .
- Perform local business , For example, insert a record into the database .
- Based on the results of local transaction execution , Confirm to submit this message , Or rollback this message . Only submitted messages , Consumers can consume .
obviously , If we can ensure that each of the above steps can be performed correctly , Then the execution of local transactions will be consistent with the behavior of sending messages . Then the truth is always cruel , This process is full of challenges .
1.1 How to realize pre sending
majority MQ, After the message is sent , Can be directly consumed by consumers , But we don't want to , Wait until the local transaction is successfully executed .
A very intuitive idea is , Let's start with this message , Find a place for temporary storage . For example, create a table in the database , Store messages in this table , be called ” Local transaction table ”. In this table , There can be one state Field indicates the status of the message , In the pre transmission phase , We mark it as UNKONWN.
1.2 How to confirm or rollback
Based on the results of local transaction execution , Modify the value of the status field in the local transaction table . If the local transaction is executed successfully , We can change the status field in the local transaction table to LOCAL_COMMIT; If execution fails , We can change it to LOCAL_ROLLBACK.
in addition , We use an asynchronous thread , Constantly from this table , The query status is LOCAL_COMMIT The news of , Send it to MQ in . Asynchronous threads send messages to MQ in , It may also succeed , Or failure :
send out MQ success :
here , From the sender side , The whole business is over , Mark it GLOBAL_COMMIT, The next step is consumption on the consumer side .
send out MQ Failure :
You need to retry at this time , Until success . If you want to limit the maximum number of retries , You can add a... To this table retries Field , Every time I try again , Just +1, When the number threshold is exceeded , No more sending . You can also specify a message timeout , When the time threshold is exceeded , No longer send . For sending failed messages , Mark its status as MESSAGE_ERROR. You can also add a... To the transaction table cause Field , Indicates what caused the sending failure .
So the local message table can be designed like this :
CREATE TABLE `local_message` (
`id` int(11) NOT NULL,
`state` varchar(255) DEFAULT NULL COMMENT ' UNKONWN PREPARED LOCAL_COMMIT LOCAL_SUCCESS LOCAL_ROLLBACK GLOBAL_COMMIT MEAASGE_ERROR',
`cause` varchar(255) DEFAULT NULL COMMENT ' reason ',
`retries` int(11) DEFAULT NULL COMMENT ' Retry count ',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
1.3 How to avoid state loss
The pre send message phase was mentioned earlier , The status field of the local transaction table will be set to UNKONWN. After local transaction execution , Change it to LOCAL_COMMIT perhaps LOCAL_ROLLBACK.
** However , It is possible that after the local transaction is executed , The behavior of changing the message state in the local message table failed .** In this case , The news has been in UNKONWN state , Asynchronous threads will only send messages with a status of LOCAL_COMMIT The news came to MQ in , This message will always be ignored , That is, message state loss occurs . Solution
Scheme 1 : Expand transaction boundaries
The message will be pre sent 、 Performing local transactions 、 There are three operations to modify the message status of the local transaction table , Merge into one transaction .
Start the transaction before sending the message in the first step , Commit or rollback the transaction after the execution of the third step , Because all operations are in the same transaction , To ensure that , Message records in the local transaction table , Records generated by business operations , Always succeed or fail at the same time , And the state is consistent .
Option two : Merge transaction status
obviously , You can go a step further , Messages do not need to have a pre sent status , Direct and normal database operations are combined into one transaction and written to the database , The state is directly LOCAL_COMMIT, After that, the logic sent by the asynchronous thread remains unchanged .
Option three : Yes PREPARED Status messages are also checked
Scheme 1 、 The second is characterized by , Only when the business method is executed , Judge whether the transaction can be committed only once , Then when the asynchronous thread sends a message , Check only LOCAL_SUCCESS Status messages are sent to MQ in , This can satisfy most scenarios .
** However , Sometimes there may be more complex scenes .** for example , There is a business logic that is very complex , The sponsor of the business A, In addition to operating the local database , It may be necessary to RPC Call the query service B, To get some MQ Some information that must be included in the message . However ,B This information may not be available at this time , It takes a while to provide .A Hope to save this message , wait until B Send when you can provide enough information . This is the time , Scheme 1 、 Two is not enough , We need to continue to improve .
** The specific strategy is :** On the basis of the original scheme , Let the asynchronous thread send LOCAL_SUCCESS Status messages , Also on the PREPARED Status messages are checked . Of course, you need to set a filter condition , Like a PREPARED The creation time of the status message , When the difference that must be compared with the current time is greater than a certain time threshold , To try to query the correct status of this message .
Set the threshold , It is mainly to avoid inserting messages at the beginning of a new transaction PREPARED Message state confusion , These new PREPARED The message may be immediately modified to LOCAL_SUCCESS. Only those who have been in PREPARED Status messages , It is possible that the local transaction is successfully executed , But it is caused by the failure to update the message status .
We can conclude that , The biggest feature of scheme 3 is : When the current conditions are met , Immediately determine that messages can be sent ; If the current conditions are not met , You can also asynchronously determine whether the conditions for sending messages are met . Obviously, it provides great flexibility . And scheme one or two , Only the former is supported .
2 RocketMQ Transaction message
Apache RocketMQ 4.3 Transaction messages are introduced in version . The same problem we solved earlier using local transaction tables . It's all about solving Producer End send message And Local transaction execution The atomicity of .
The idea is the same as that of the local transaction table , Is to find a place first The staging get up , But the temporary storage place is different ,RocketMQ The message is temporarily stored in the internal Topic in .
To support transaction messages ,RocketMQ Introduced Half Topic as well as Operation Topic Two internal queues are used to store the transaction message advance status . among :
- Half Topic In the corresponding queue prepare news , That is, pre sent messages , Messages will not be sent directly to the target Topic, So the consumer is invisible , Implement temporary storage
- Operation Topic The corresponding queue stores prepare message Corresponding commit/rollback news , In the message body is prepare message Corresponding offset.
Here is RocketMQ The sending process of a transaction message in , Not clear enough , You can zoom in on each step :

2.1 Transaction producers pre send messages
adopt TransactionMQProducer Send transaction message , This producer Will be in an ordinary Message Add some metadata to the , Identify that this is a pre sent transaction message .broker When the end finds that this is a transaction message , It will be stored in Half Topic in . In addition, it is necessary to formulate producer group, In case the sender fails or even goes down , Go back to the same one producer group Query transaction status by instance in .
2.2 Performing local transactions
Sending prepare After the news was successful , Local transactions are required . This needs to be achieved RocketMQ One of the TransactionListener Interface :

among :
- **executeLocalTransaction Method :** Used to execute local transactions , It is obviously our business logic code , Operating the database , Or do something else .
- checkLocalTransaction Method : Used to check transaction status , The function of this method will be explained later .
Both methods return an execution status that represents the local transaction message LocalTransactionState, The transaction producer reports it to broker. There are three states in total , See analysis below :
public enum LocalTransactionState {
COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW,}
2.3 Processing of local transaction status
No matter what state the client returns , The producer gets this status , Will report this status to broker.broker When dealing with , It is found that this is a message reporting the status of the transaction , First, we will judge the status value , Deal with it accordingly .
2.3.1 COMMIT & ROLLBACK state
broker After receiving the status of the transaction message , Will be recorded in the internal theme Operation Topic in , In the message body is prepare message Corresponding to Half Topic Medium offset. As shown in the figure below :

Besides ,broker There will also be an internal service , consumption Operation Topic The messages in the , say concretely :
- If it is rollback news ,broker Will be taken from Half Topic Delete the prepare The message will not be sent .
- If it is commit news ,broker Will take this message out of **, Send to the original destination Topic in , here consumer End can consume **
Careful readers have found out , The lengths of the two queues in the figure are deliberately not equal . Actually, it's to illustrate , In some unusual cases , The transaction message status may fail to be reported , therefore OperationTopic There is no record of , The difference between the two may be UNKOWN Messages that do not confirm the intermediate status , Special treatment required .
2.3.2 UNKNOW state
If the news is UNKNOW In the middle , It means that the transaction status cannot be determined at present ,broker You need to actively ask the client producer. The following scenarios , There may be UNKNOW In the middle :
- ** Abnormal state :** If during the execution of a local transaction , Execution end hangs up , Or a timeout
- ** Intentionally :** Review the previous explanation of the local transaction table scheme , In some special situations , Need to wait for a period of time to meet a specific scenario , To send messages to consumers for consumption . So we took the initiative to return UNKNOW.
because UNKNOW Intermediate status messages , Will not be submitted to Operation Topic in , therefore Half Topic And Operation Topic Of these two internal themes , The server finds the uncommitted timeout transaction by comparing the difference between the two topics , Check back .
** Backtracking means , The business side must provide a way for rocketmq Back and forth .** As we saw earlier TransactionListener Interface with 2 A way , Another way checkLocalTransaction It is used for back checking .
We need to implement this method ,rocketmq The message we sent before will be passed in as a parameter . According to the content of the message , Reverse query the previous business record information , Determine the state of .
Last but not least ,broker Actively ask the client producer State of affairs , It depends on broker And producer End of the two-way communication capabilities to complete , That is to say broker Will take the initiative to the client producer Send the request . Two way communication capability is based on rocketmq-remoting Based on the module .
2.4 Summary
From the above transaction message design, you can see ,RocketMQ The transaction message solves the final consistency problem of the transaction better , The transaction initiator only needs to pay attention to the local transaction execution and the implementation of the transaction status judgment provided by the callback interface , And when the upstream transaction peak is high , Can pass Message queue , Avoid putting too much pressure on downstream Services .RocketMQ Complete use cases of transaction messages are available on the official website , Readers can refer to .
Of course , Things can't be so beautiful , Here are RocketMQ Transaction message usage restrictions :
- Transaction messages have no latency and batch support , That is, you cannot use the characteristics of delayed messages and batch sending messages .
- To avoid checking a single message multiple times and causing Half Topic Message accumulation , By default, the number of checks for a single message is limited to 15 Time .
- stay broker The configuration of , By the parameter “transactionTimeout” Configure a fixed period for checking transaction messages .
- Transaction messages can be checked or consumed multiple times .
- Commit the transaction message to the user's target topic May fail .RocketMQ Its own high availability mechanism ensures high availability . If you want to ensure that transactional messages are not lost and transaction integrity is guaranteed , It is recommended to use Synchronous double write mechanism .
- Producers of transaction messages ID Cannot be associated with other types of message producers ID share . Unlike other types of messages , Transactional messages allow backward queries .MQ Server According to its producer ID Query client .
边栏推荐
- 2022 年 5 月产品大事记
- OpenGL错误指南
- Canvas+svg line particle animation web page background
- OpenGL Chapter 9 lighting map
- The tide play power is really firepower! The first big screen cinema for young people? Cool open TV Max 86 "sudden attack
- regular expression
- SSL交互过程
- Object storage Minio tutorial
- Image scaling with aspect ratio preserving by opencv
- Mavros controls UAV to conduct binocular slam in gazebo environment
猜你喜欢

【ELT.ZIP】OpenHarmony啃论文俱乐部——电子设备软件更新压缩

Nsthread of the multithreaded Trilogy

OpenGL第十一章 多光源

svg实现纸飞机自由的飞翔动画

基于SSM的大学生社团管理系统

three. JS cool technology background H5 animation

PostgreSQL source code learning (17) -- mvcc ② - Introduction to snapshot and isolation level

Simple image browsing with fragment

Integrated MP code generator

Student online education and teaching course management system based on SSM framework
随机推荐
基于SSM框架的连锁超市购物零售后台管理系统
Jeecgboot learning_ Online form first experience
[elt.zip] openharmony paper Club - electronic device software update compression
OpenGL错误指南
Nsthread of the multithreaded Trilogy
Path count 2 (DP + number of combinations)
[elt.zip] openharmony paper Club - multi tier storage hierarchical data compression
The key data of music genuine rate is missing. What is the odds of Netease cloud music IPO?
蓄力618 ,苏宁如何打下这场硬仗?
实现发布订阅模式-----手撕js系列
canvas交互式星星动画背景js特效
Lua removing elements from a loop in a list
JLINK latest version download
Implementation of publish and subscribe mode ----- hand tearing JS series
PostgreSQL source code learning (17) -- mvcc ② - Introduction to snapshot and isolation level
【可解释】|深层网络的公理化属性(Axiomatic Attribution for Deep Networks)
Difference between idea open and import project
OpenGL Chapter 11 multiple light sources
Tweenmax colorful ball bouncing animation
pmm监控oracle