当前位置:网站首页>Distributed transaction solution 2: message queue to achieve final consistency
Distributed transaction solution 2: message queue to achieve final consistency
2022-06-12 09:09:00 【Eden Garden】
1、 Reliable messages achieve ultimate consistency
We have learned before CAP theory , Know that we usually guarantee P and A, Abandon C, Ensure ultimate consistency .
Distributed transactions
The core of using message queue to achieve final consistency is to split distributed transactions into multiple local transactions , Then all transactions are coordinated by the message queue through the network , Achieve ultimate consistency .
This plan is easy to understand , But it will also face many problems :
1. The atomicity of the message sender executing local transactions and sending messages , That is, how to ensure successful execution of local transactions , The message must have been sent successfully
begin transaction
1. Database operation
2. Send a message
commit transation
In this case , There seems to be no problem , If sending a message fails , Will throw an exception , Cause database transaction rollback . But if the timeout is abnormal , Database rollback , But the message has been sent normally , It also leads to inconsistencies .
2. The atomicity of message receiving and local transaction , That is, how to ensure that the message is received successfully , The local transaction must be executed successfully
3. Because messages may be sent repeatedly , This requires that the message receiver must implement idempotency
Because in a production environment , The consumer is likely to be a cluster , If a consumption node times out but the consumption is successful , This will cause the cluster to consume this message repeatedly by other nodes in the same group . In addition, recovery after unexpected downtime , The consumption progress is not written to the disk in time , It will cause partial loss of consumption progress , This leads to repeated consumption of messages .
2、RocketMQ
RocketMQ It is a distributed message middleware from Alibaba , On 2012 In open source , And in 2017 Officially became Apache Top projects .Apache RocketMQ 4.3 Later versions officially support transaction messages , It provides convenient support for distributed transaction implementation . therefore , We go through RocketMQ We can solve the previous problem .
1. The atomicity of the message sender executing local transactions and sending messages , That is, how to ensure successful execution of local transactions , The message must have been sent successfully
RocketMQ Medium Broker And The sender has two-way communication capability , bring broker Born as a transaction coordinator ; also RocketMQ It provides a storage mechanism , Make transaction messages persistent ; These excellent designs ensure that even if something goes wrong ,RocketMQ It can still guarantee the final consistency of the transaction .
- The sender sends a transaction message to Broker,RocketMQ Will mark the message status as “Prepared”, At this time, this message cannot be consumed by the receiver for the time being . Such news is called Half Message, Half message .
- Broker Return the successful sending to the sender
- The sender performs local transactions , For example, operating a database
- If the local transaction is executed successfully , send out commit A message to Broker,RocketMQ Will mark the message status as “ Consumable ”, At this point, the message can be consumed by the receiver ; If the local transaction fails , send out rollback A message to Broker,RocketMQ The message will be deleted .
- If the sender is in a local transaction , The service hangs up , Network flash off or timeout , that Broker No confirmation will be received
- here RocketMQ The sender will be constantly asked to obtain the execution status of the local transaction ( I.e. transaction query )
- Determine according to the result of the transaction review Commit or Rollback, This ensures that both the message sending and the local transaction succeed or fail at the same time .
The above main process has been completed by RocketMQ Realization , For us, we only need to implement the local transaction execution method and the local transaction backlookup method respectively , Specifically, it implements the following interface :
public interface TransactionListener {
/** - send out prepare After the message succeeds, the method is called back to execute the local transaction - @param msg The news back , utilize transactionId You can get the only Id - @param arg call send Parameters passed on method , When send If there are additional parameters, they can be passed to send In the method , You can get - @return Return transaction status ,COMMIT: Submit ROLLBACK: Roll back UNKNOW: Unknown , Need to check back */
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/** - @param msg By acquiring transactionId To determine the local transaction execution status of this message - @return Return transaction status ,COMMIT: Submit ROLLBACK: Roll back UNKNOW: Unknown , Need to check back */
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
2. The atomicity of message receiving and local transaction , That is, how to ensure that the message is received successfully , The local transaction must be executed successfully
If there is an exception ,RocketMQ Through the retry mechanism , Consume messages every once in a while , Then execute the local transaction ; If it is timeout ,RocketMQ There will be unlimited consumption messages , Continue to execute local affairs , Until we succeed .
3、 a master hand 's first small display
Environmental requirements
- database :MySQL-5.7+
- JDK:64 position jdk1.8+
- Microservices :spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
- RocketMQ Server side :RocketMQ-4.5.0
- RocketMQ client :RocketMQ-spring-boot-starter.2.0.2-RELEASE
Create database
This case requires two databases , One is bank1, One is bank2, No need to create , Use it directly Hmily The database in the quick start case is just . in addition , In order to realize idempotence , Need to be separately in bank1、bank2 Add... To the database de_duplication surface , That is, the transaction record table ( De duplication ).
DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication` (
`tx_no` bigint(20) NOT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
start-up RocketMQ
start-up nameserver:
set ROCKETMQ_HOME=[RocketMQ Server decompression path ]
start [RocketMQ Server decompression path ]/bin/mqnamesrv.cmd
start-up broker:
set ROCKETMQ_HOME=[RocketMQ Server decompression path ]
start [RocketMQ Server decompression path ]/bin/mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
Maven engineering
Create two maven engineering , Connect to different databases
Function realization
Message sender bank1
- Define a class to encapsulate the transfer message :
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountChangeEvent implements Serializable {
/** * account number */
private String accountNo;
/** * Change amount */
private double amount;
/** * Transaction number , Time stamp */
private long txNo;
}
- Implement the data access layer , There are four functions
@Mapper
@Component
public interface AccountInfoDao {
/** * Modify the balance of an account * @param accountNo account number * @param amount Change amount * @return */
@Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
/** * Query the information of an account * @param accountNo account number * @return */
@Select("select * from account_info where where account_no=#{accountNo}")
AccountInfo findByIdAccountNo(@Param("accountNo") String accountNo);
/** * Query whether a transaction record has been executed * @param txNo Transaction number * @return */
@Select("select count(1) from de_duplication where tx_no = #{txNo}")
int isExistTx(long txNo);
/** * Save a transaction execution record * @param txNo Transaction number * @return */
@Insert("insert into de_duplication values(#{txNo},now());")
int addTx(long txNo);
}
- Send transfer message
@Component
@Slf4j
public class BankMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendAccountChangeEvent(AccountChangeEvent accountChangeEvent) {
// 1. To construct a message
JSONObject object = new JSONObject();
object.put("accountChange", accountChangeEvent);
Message<String> msg = MessageBuilder.withPayload(object.toJSONString()).build();
// 2. Send a message
rocketMQTemplate.sendMessageInTransaction("producer_ensure_transfer",
"topic_ensure_transfer",
msg, null);
}
}
- Implement the business layer code , It realizes the deduction of transaction message and local transaction , Be careful doUpdateAccountBalance If the local transaction of is executed successfully , It will be repeated in the transaction record (de_duplication) Save the data .
public interface AccountInfoService {
/** * Update account balance - Send a message * @param accountChange */
void updateAccountBalance(AccountChangeEvent accountChange);
/** * Update account balance - Local transactions * @param accountChange */
void doUpdateAccountBalance(AccountChangeEvent accountChange);
}
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
private BankMessageProducer bankMessageProducer;
@Autowired
private AccountInfoDao accountInfoDao;
/** * Update account balance - Sending notice * @param accountChange */
@Override
public void updateAccountBalance(AccountChangeEvent accountChange) {
bankMessageProducer.sendAccountChangeEvent(accountChange);
}
/** * Update account balance - Local transactions * @param accountChange */
@Override
@Transactional(isolation = Isolation.SERIALIZABLE)
public void doUpdateAccountBalance(AccountChangeEvent accountChange) {
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount() * -1);
accountInfoDao.addTx(accountChange.getTxNo());
}
}
- Realization RocketMQ Transaction message listener , There are two functions :
(1)executeLocalTransaction, This method performs local transactions , Will be RocketMQ Automatically call
(2)checkLocalTransaction, This method implements the transaction check , Utilizing the transaction de duplication table (de_duplication), Will be RocketMQ Automatically call
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_ensure_transfer")
public class TransferTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private AccountInfoService accountInfoService;
@Autowired
private AccountInfoDao accountInfoDao;
/** * Performing local transactions * @param msg * @param arg * @return */
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//1. Receive and parse messages
final JSONObject jsonObject = JSON.parseObject(new String((byte[])
msg.getPayload()));
AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
class);
//2. Performing local transactions
Boolean isCommit = true;
try {
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
}catch (Exception e){
isCommit = false;
}
//3. Return execution result
if(isCommit){
return RocketMQLocalTransactionState.COMMIT;
}else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/** * Back to business * @param msg * @return */
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
//1. Receive and parse messages
final JSONObject jsonObject = JSON.parseObject(new String((byte[])
msg.getPayload()));
AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
class);
//2. Inquire about de_duplication surface
int isExistTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());
//3. Return the value according to the query result
if(isExistTx>0){
return RocketMQLocalTransactionState.COMMIT;
}else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
- perfect Controller Code
@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
@GetMapping(value = "/transfer")
public String transfer(){
accountInfoService.updateAccountBalance(new AccountChangeEvent("1",100,System.currentTimeMillis()));
return " Transfer succeeded ";
}
}
The receiver of the message bank2
- Implement the data access layer , and bank1 equally , You can use it directly
- Implement business layer functions , Increase the account balance , Note that the transaction record de duplication table is used here (de_duplication) Achieve idempotency control
public interface AccountInfoService {
/** * Update account balance * @param accountChange */
void updateAccountBalance(AccountChangeEvent accountChange);
}
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
private AccountInfoDao accountInfoDao;
@Override
@Transactional(isolation = Isolation.SERIALIZABLE)
public void updateAccountBalance(AccountChangeEvent accountChange) {
int isExistTx = accountInfoDao.isExistTx(accountChange.getTxNo());
if(isExistTx == 0){
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
accountInfoDao.addTx(accountChange.getTxNo());
}
}
}
- Realization RocketMQ Transaction message listener , After receiving the message , Parsing message , Call the business layer for processing
@Component
@RocketMQMessageListener(topic = "topic_ensure_transfer", consumerGroup = "consumer_ensure_transfer")
@Slf4j
public class EnsureMessageConsumer implements RocketMQListener<String>{
@Autowired
private AccountInfoService accountInfoService;
@Override
public void onMessage(String projectStr) {
System.out.println(" Start consuming messages :" + projectStr);
final JSONObject jsonObject = JSON.parseObject(projectStr);
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
accountChangeEvent.setAccountNo("2");
accountInfoService.updateAccountBalance(accountChangeEvent);
}
}
A functional test
- bank1 and bank2 They all succeeded
- bank1 Failed to execute local transaction , be bank2 Transfer message not received .
- bank1 After executing the local transaction , Don't return any information , be Broker Will conduct transaction back check .
- bank2 Failed to execute local transaction , Will retry consumption .
Finally, reliable message consistent transactions are suitable for scenarios with long execution cycle and low real-time requirements . After introducing this mechanism , Synchronous transaction operations become asynchronous operations based on message execution , Avoid the impact of synchronous blocking operations in distributed transactions , And realize the decoupling of two services .
-------------------------- The content of the article comes from hmB Station courses , Learn to use --------------------------
边栏推荐
- Unittest测试框架
- Leetcode689 wrong greedy thinking
- Redis installation test
- L1-019 who goes first
- 【字符集九】gbk拷贝到Unicode会乱码?
- Codecraft-22 and codeforces round 795 (Div. 2)
- Does database and table splitting cause reading diffusion problems? How to solve it?
- (十二)交互组件Selectable
- ERROR 1630 (42000): FUNCTION a.avg does not exist. Check the ‘Function Name Parsing and Resolution‘
- [character set 7] what are the wide character codes and multi byte codes of Chinese characters
猜你喜欢

Analysis of 43 cases of MATLAB neural network: Chapter 7 regression of RBF Network -- Realization of nonlinear function regression

Analysis of 43 cases of MATLAB neural network: Chapter 8 prediction of GRNN Network - Freight Volume Prediction Based on generalized regression neural network

清华大学数据挖掘笔记(一)

Node sample background setup

Problems that cannot be resolved by tar command

Chapter 3 registers (memory access)
![(node:22344) [DEP0123] DeprecationWarning: Setting the TLS ServerName to an IP address is not permit](/img/c1/d56ec09663857afa52f20848aeadac.png)
(node:22344) [DEP0123] DeprecationWarning: Setting the TLS ServerName to an IP address is not permit

Unittest测试框架

Black screen solution for computer boot
还在原地踏步,提高软件测试能力的方法你知道吗?
随机推荐
Sword finger offer:[day 8 dynamic planning (simple)] --- > frog jumping on steps
(十二)交互组件Selectable
Complete knapsack problem 1
[computer use] how to change a computer disk into a mobile disk?
机器学习笔记 - 循环神经网络备忘清单
Jupyter notebook sets the default browser to open with an error syntaxerror: (Unicode error) 'UTF-8' codec can't decode byte 0xd4
【字符集九】gbk拷贝到Unicode会乱码?
(十五) TweenRunner
目标识别、检测和 6D 姿态估算源码与方案(最先进的方法和数据集)
Tool classes for extracting zip files
Bash tutorial
Application method of new version UI of idea + use method of non test qualification and related introduction
利用nvm动态调整nodejs版本,解决因为node版本过高或过低导致项目无法运行和打包
90%以上软件公司都会问的软件测试面试题赶紧来背吧
Notes on data mining in Tsinghua University (1)
软件测试报告中常见的疏漏,给自己提个醒
Basic knowledge of Linear Algebra -- concepts and relationships of common matrices
Introduction to Chang'an chain node certificate, role and authority management
测试计划应该怎么写?一个思路教会你
Solution of hmaster process flash back after starting