当前位置:网站首页>Two solutions for reliable message consistency

Two solutions for reliable message consistency

2022-06-11 03:41:00 PostTruth

" The final consistency of reliable information " To solve the problem Producer End of the message Send to Local transaction execution The atomicity of , It's a flexible business , It belongs to asynchronous guaranteed type , Soft state , Final agreement .

** The typical scenario of the problem is :** Local to DB Insert a record in , At the same time to MQ Send a message in , It must be ensured that both succeed or fail at the same time . because DB and MQ It's a different system , May insert DB success , But send a message to MQ Failure in ; It is also possible to insert DB Failure , But send a message to MQ success . How to ensure the consistency of the two , It has become the problem we have to solve .

This article explains in depth how to implement various implementation schemes for reliable message consistency :

  • Local transaction table
  • RocketMQ Transaction messages in
  • Binlog Subscription resolution

1 Local transaction table

To solve Producer The atomicity of message sending and local transaction execution at the end , A typical idea is , Let's start with the news The staging To a place , Before the local transaction execution completes , This message is invisible to consumers . Only after the local transaction confirms that the execution is successful , Consumers can only consume this message .

Here is a pseudocode to demonstrate this process :

image-20210114175125174

  • Send a message in advance , Pay attention to the use of **" Pre send ”** This keyword , Pre sent messages , Consumers can't see , Therefore, it will not consume .
  • Perform local business , For example, insert a record into the database .
  • Based on the results of local transaction execution , Confirm to submit this message , Or rollback this message . Only submitted messages , Consumers can consume .

​ obviously , If we can ensure that each of the above steps can be performed correctly , Then the execution of local transactions will be consistent with the behavior of sending messages . Then the truth is always cruel , This process is full of challenges .

1.1 How to realize pre sending

​ majority MQ, After the message is sent , Can be directly consumed by consumers , But we don't want to , Wait until the local transaction is successfully executed .

A very intuitive idea is , Let's start with this message , Find a place for temporary storage . For example, create a table in the database , Store messages in this table , be called ” Local transaction table ”. In this table , There can be one state Field indicates the status of the message , In the pre transmission phase , We mark it as UNKONWN.

1.2 How to confirm or rollback

​ Based on the results of local transaction execution , Modify the value of the status field in the local transaction table . If the local transaction is executed successfully , We can change the status field in the local transaction table to LOCAL_COMMIT; If execution fails , We can change it to LOCAL_ROLLBACK.

​ in addition , We use an asynchronous thread , Constantly from this table , The query status is LOCAL_COMMIT The news of , Send it to MQ in . Asynchronous threads send messages to MQ in , It may also succeed , Or failure :

send out MQ success :

here , From the sender side , The whole business is over , Mark it GLOBAL_COMMIT, The next step is consumption on the consumer side .

send out MQ Failure :

You need to retry at this time , Until success . If you want to limit the maximum number of retries , You can add a... To this table retries Field , Every time I try again , Just +1, When the number threshold is exceeded , No more sending . You can also specify a message timeout , When the time threshold is exceeded , No longer send . For sending failed messages , Mark its status as MESSAGE_ERROR. You can also add a... To the transaction table cause Field , Indicates what caused the sending failure .

So the local message table can be designed like this :

CREATE TABLE `local_message` (
  `id` int(11) NOT NULL,
  `state` varchar(255) DEFAULT NULL COMMENT ' UNKONWN PREPARED LOCAL_COMMIT LOCAL_SUCCESS LOCAL_ROLLBACK GLOBAL_COMMIT MEAASGE_ERROR',
  `cause` varchar(255) DEFAULT NULL COMMENT ' reason ',
  `retries` int(11) DEFAULT NULL COMMENT ' Retry count ',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

1.3 How to avoid state loss

The pre send message phase was mentioned earlier , The status field of the local transaction table will be set to UNKONWN. After local transaction execution , Change it to LOCAL_COMMIT perhaps LOCAL_ROLLBACK.

** However , It is possible that after the local transaction is executed , The behavior of changing the message state in the local message table failed .** In this case , The news has been in UNKONWN state , Asynchronous threads will only send messages with a status of LOCAL_COMMIT The news came to MQ in , This message will always be ignored , That is, message state loss occurs . Solution

Scheme 1 : Expand transaction boundaries

​ The message will be pre sent 、 Performing local transactions 、 There are three operations to modify the message status of the local transaction table , Merge into one transaction .

​ Start the transaction before sending the message in the first step , Commit or rollback the transaction after the execution of the third step , Because all operations are in the same transaction , To ensure that , Message records in the local transaction table , Records generated by business operations , Always succeed or fail at the same time , And the state is consistent .

Option two : Merge transaction status

​ obviously , You can go a step further , Messages do not need to have a pre sent status , Direct and normal database operations are combined into one transaction and written to the database , The state is directly LOCAL_COMMIT, After that, the logic sent by the asynchronous thread remains unchanged .

Option three : Yes PREPARED Status messages are also checked

​ Scheme 1 、 The second is characterized by , Only when the business method is executed , Judge whether the transaction can be committed only once , Then when the asynchronous thread sends a message , Check only LOCAL_SUCCESS Status messages are sent to MQ in , This can satisfy most scenarios .

** However , Sometimes there may be more complex scenes .** for example , There is a business logic that is very complex , The sponsor of the business A, In addition to operating the local database , It may be necessary to RPC Call the query service B, To get some MQ Some information that must be included in the message . However ,B This information may not be available at this time , It takes a while to provide .A Hope to save this message , wait until B Send when you can provide enough information . This is the time , Scheme 1 、 Two is not enough , We need to continue to improve .

** The specific strategy is :** On the basis of the original scheme , Let the asynchronous thread send LOCAL_SUCCESS Status messages , Also on the PREPARED Status messages are checked . Of course, you need to set a filter condition , Like a PREPARED The creation time of the status message , When the difference that must be compared with the current time is greater than a certain time threshold , To try to query the correct status of this message .

​ Set the threshold , It is mainly to avoid inserting messages at the beginning of a new transaction PREPARED Message state confusion , These new PREPARED The message may be immediately modified to LOCAL_SUCCESS. Only those who have been in PREPARED Status messages , It is possible that the local transaction is successfully executed , But it is caused by the failure to update the message status .

​ We can conclude that , The biggest feature of scheme 3 is : When the current conditions are met , Immediately determine that messages can be sent ; If the current conditions are not met , You can also asynchronously determine whether the conditions for sending messages are met . Obviously, it provides great flexibility . And scheme one or two , Only the former is supported .

2 RocketMQ Transaction message

​ Apache RocketMQ 4.3 Transaction messages are introduced in version . The same problem we solved earlier using local transaction tables . It's all about solving Producer End send message And Local transaction execution The atomicity of .

The idea is the same as that of the local transaction table , Is to find a place first The staging get up , But the temporary storage place is different ,RocketMQ The message is temporarily stored in the internal Topic in .

To support transaction messages ,RocketMQ Introduced Half Topic as well as Operation Topic Two internal queues are used to store the transaction message advance status . among :

  • Half Topic In the corresponding queue prepare news , That is, pre sent messages , Messages will not be sent directly to the target Topic, So the consumer is invisible , Implement temporary storage
  • Operation Topic The corresponding queue stores prepare message Corresponding commit/rollback news , In the message body is prepare message Corresponding offset.

Here is RocketMQ The sending process of a transaction message in , Not clear enough , You can zoom in on each step :

image-20210114181941566

2.1 Transaction producers pre send messages

adopt TransactionMQProducer Send transaction message , This producer Will be in an ordinary Message Add some metadata to the , Identify that this is a pre sent transaction message .broker When the end finds that this is a transaction message , It will be stored in Half Topic in . In addition, it is necessary to formulate producer group, In case the sender fails or even goes down , Go back to the same one producer group Query transaction status by instance in .

2.2 Performing local transactions

Sending prepare After the news was successful , Local transactions are required . This needs to be achieved RocketMQ One of the TransactionListener Interface :

image-20210114182145801

among :

  • **executeLocalTransaction Method :** Used to execute local transactions , It is obviously our business logic code , Operating the database , Or do something else .
  • checkLocalTransaction Method : Used to check transaction status , The function of this method will be explained later .

Both methods return an execution status that represents the local transaction message LocalTransactionState, The transaction producer reports it to broker. There are three states in total , See analysis below :

public enum LocalTransactionState {
    COMMIT_MESSAGE, ROLLBACK_MESSAGE, UNKNOW,}

2.3 Processing of local transaction status

No matter what state the client returns , The producer gets this status , Will report this status to broker.broker When dealing with , It is found that this is a message reporting the status of the transaction , First, we will judge the status value , Deal with it accordingly .

2.3.1 COMMIT & ROLLBACK state

broker After receiving the status of the transaction message , Will be recorded in the internal theme Operation Topic in , In the message body is prepare message Corresponding to Half Topic Medium offset. As shown in the figure below :

image-20210114182443139

Besides ,broker There will also be an internal service , consumption Operation Topic The messages in the , say concretely :

  • If it is rollback news ,broker Will be taken from Half Topic Delete the prepare The message will not be sent .
  • If it is commit news ,broker Will take this message out of **, Send to the original destination Topic in , here consumer End can consume **

Careful readers have found out , The lengths of the two queues in the figure are deliberately not equal . Actually, it's to illustrate , In some unusual cases , The transaction message status may fail to be reported , therefore OperationTopic There is no record of , The difference between the two may be UNKOWN Messages that do not confirm the intermediate status , Special treatment required .

2.3.2 UNKNOW state

​ If the news is UNKNOW In the middle , It means that the transaction status cannot be determined at present ,broker You need to actively ask the client producer. The following scenarios , There may be UNKNOW In the middle :

  • ** Abnormal state :** If during the execution of a local transaction , Execution end hangs up , Or a timeout
  • ** Intentionally :** Review the previous explanation of the local transaction table scheme , In some special situations , Need to wait for a period of time to meet a specific scenario , To send messages to consumers for consumption . So we took the initiative to return UNKNOW.

​ because UNKNOW Intermediate status messages , Will not be submitted to Operation Topic in , therefore Half Topic And Operation Topic Of these two internal themes , The server finds the uncommitted timeout transaction by comparing the difference between the two topics , Check back .

** Backtracking means , The business side must provide a way for rocketmq Back and forth .** As we saw earlier TransactionListener Interface with 2 A way , Another way checkLocalTransaction It is used for back checking .

We need to implement this method ,rocketmq The message we sent before will be passed in as a parameter . According to the content of the message , Reverse query the previous business record information , Determine the state of .

Last but not least ,broker Actively ask the client producer State of affairs , It depends on broker And producer End of the two-way communication capabilities to complete , That is to say broker Will take the initiative to the client producer Send the request . Two way communication capability is based on rocketmq-remoting Based on the module .

2.4 Summary

​ From the above transaction message design, you can see ,RocketMQ The transaction message solves the final consistency problem of the transaction better , The transaction initiator only needs to pay attention to the local transaction execution and the implementation of the transaction status judgment provided by the callback interface , And when the upstream transaction peak is high , Can pass Message queue , Avoid putting too much pressure on downstream Services .RocketMQ Complete use cases of transaction messages are available on the official website , Readers can refer to .

​ Of course , Things can't be so beautiful , Here are RocketMQ Transaction message usage restrictions :

  • Transaction messages have no latency and batch support , That is, you cannot use the characteristics of delayed messages and batch sending messages .
  • To avoid checking a single message multiple times and causing Half Topic Message accumulation , By default, the number of checks for a single message is limited to 15 Time .
  • stay broker The configuration of , By the parameter “transactionTimeout” Configure a fixed period for checking transaction messages .
  • Transaction messages can be checked or consumed multiple times .
  • Commit the transaction message to the user's target topic May fail .RocketMQ Its own high availability mechanism ensures high availability . If you want to ensure that transactional messages are not lost and transaction integrity is guaranteed , It is recommended to use Synchronous double write mechanism .
  • Producers of transaction messages ID Cannot be associated with other types of message producers ID share . Unlike other types of messages , Transactional messages allow backward queries .MQ Server According to its producer ID Query client .
原网站

版权声明
本文为[PostTruth]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203020553546628.html