当前位置:网站首页>Distributed transaction solution 2: message queue to achieve final consistency
Distributed transaction solution 2: message queue to achieve final consistency
2022-06-12 09:09:00 【Eden Garden】
1、 Reliable messages achieve ultimate consistency
We have learned before CAP theory , Know that we usually guarantee P and A, Abandon C, Ensure ultimate consistency .
Distributed transactions
The core of using message queue to achieve final consistency is to split distributed transactions into multiple local transactions , Then all transactions are coordinated by the message queue through the network , Achieve ultimate consistency .
This plan is easy to understand , But it will also face many problems :
1. The atomicity of the message sender executing local transactions and sending messages , That is, how to ensure successful execution of local transactions , The message must have been sent successfully
begin transaction
1. Database operation
2. Send a message
commit transation
In this case , There seems to be no problem , If sending a message fails , Will throw an exception , Cause database transaction rollback . But if the timeout is abnormal , Database rollback , But the message has been sent normally , It also leads to inconsistencies .
2. The atomicity of message receiving and local transaction , That is, how to ensure that the message is received successfully , The local transaction must be executed successfully
3. Because messages may be sent repeatedly , This requires that the message receiver must implement idempotency
Because in a production environment , The consumer is likely to be a cluster , If a consumption node times out but the consumption is successful , This will cause the cluster to consume this message repeatedly by other nodes in the same group . In addition, recovery after unexpected downtime , The consumption progress is not written to the disk in time , It will cause partial loss of consumption progress , This leads to repeated consumption of messages .
2、RocketMQ
RocketMQ It is a distributed message middleware from Alibaba , On 2012 In open source , And in 2017 Officially became Apache Top projects .Apache RocketMQ 4.3 Later versions officially support transaction messages , It provides convenient support for distributed transaction implementation . therefore , We go through RocketMQ We can solve the previous problem .
1. The atomicity of the message sender executing local transactions and sending messages , That is, how to ensure successful execution of local transactions , The message must have been sent successfully
RocketMQ Medium Broker And The sender has two-way communication capability , bring broker Born as a transaction coordinator ; also RocketMQ It provides a storage mechanism , Make transaction messages persistent ; These excellent designs ensure that even if something goes wrong ,RocketMQ It can still guarantee the final consistency of the transaction .
- The sender sends a transaction message to Broker,RocketMQ Will mark the message status as “Prepared”, At this time, this message cannot be consumed by the receiver for the time being . Such news is called Half Message, Half message .
- Broker Return the successful sending to the sender
- The sender performs local transactions , For example, operating a database
- If the local transaction is executed successfully , send out commit A message to Broker,RocketMQ Will mark the message status as “ Consumable ”, At this point, the message can be consumed by the receiver ; If the local transaction fails , send out rollback A message to Broker,RocketMQ The message will be deleted .
- If the sender is in a local transaction , The service hangs up , Network flash off or timeout , that Broker No confirmation will be received
- here RocketMQ The sender will be constantly asked to obtain the execution status of the local transaction ( I.e. transaction query )
- Determine according to the result of the transaction review Commit or Rollback, This ensures that both the message sending and the local transaction succeed or fail at the same time .
The above main process has been completed by RocketMQ Realization , For us, we only need to implement the local transaction execution method and the local transaction backlookup method respectively , Specifically, it implements the following interface :
public interface TransactionListener {
/** - send out prepare After the message succeeds, the method is called back to execute the local transaction - @param msg The news back , utilize transactionId You can get the only Id - @param arg call send Parameters passed on method , When send If there are additional parameters, they can be passed to send In the method , You can get - @return Return transaction status ,COMMIT: Submit ROLLBACK: Roll back UNKNOW: Unknown , Need to check back */
LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
/** - @param msg By acquiring transactionId To determine the local transaction execution status of this message - @return Return transaction status ,COMMIT: Submit ROLLBACK: Roll back UNKNOW: Unknown , Need to check back */
LocalTransactionState checkLocalTransaction(final MessageExt msg);
}
2. The atomicity of message receiving and local transaction , That is, how to ensure that the message is received successfully , The local transaction must be executed successfully
If there is an exception ,RocketMQ Through the retry mechanism , Consume messages every once in a while , Then execute the local transaction ; If it is timeout ,RocketMQ There will be unlimited consumption messages , Continue to execute local affairs , Until we succeed .
3、 a master hand 's first small display
Environmental requirements
- database :MySQL-5.7+
- JDK:64 position jdk1.8+
- Microservices :spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
- RocketMQ Server side :RocketMQ-4.5.0
- RocketMQ client :RocketMQ-spring-boot-starter.2.0.2-RELEASE
Create database
This case requires two databases , One is bank1, One is bank2, No need to create , Use it directly Hmily The database in the quick start case is just . in addition , In order to realize idempotence , Need to be separately in bank1、bank2 Add... To the database de_duplication surface , That is, the transaction record table ( De duplication ).
DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication` (
`tx_no` bigint(20) NOT NULL,
`create_time` datetime(0) NULL DEFAULT NULL,
PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
start-up RocketMQ
start-up nameserver:
set ROCKETMQ_HOME=[RocketMQ Server decompression path ]
start [RocketMQ Server decompression path ]/bin/mqnamesrv.cmd
start-up broker:
set ROCKETMQ_HOME=[RocketMQ Server decompression path ]
start [RocketMQ Server decompression path ]/bin/mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
Maven engineering
Create two maven engineering , Connect to different databases
Function realization
Message sender bank1
- Define a class to encapsulate the transfer message :
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountChangeEvent implements Serializable {
/** * account number */
private String accountNo;
/** * Change amount */
private double amount;
/** * Transaction number , Time stamp */
private long txNo;
}
- Implement the data access layer , There are four functions
@Mapper
@Component
public interface AccountInfoDao {
/** * Modify the balance of an account * @param accountNo account number * @param amount Change amount * @return */
@Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
/** * Query the information of an account * @param accountNo account number * @return */
@Select("select * from account_info where where account_no=#{accountNo}")
AccountInfo findByIdAccountNo(@Param("accountNo") String accountNo);
/** * Query whether a transaction record has been executed * @param txNo Transaction number * @return */
@Select("select count(1) from de_duplication where tx_no = #{txNo}")
int isExistTx(long txNo);
/** * Save a transaction execution record * @param txNo Transaction number * @return */
@Insert("insert into de_duplication values(#{txNo},now());")
int addTx(long txNo);
}
- Send transfer message
@Component
@Slf4j
public class BankMessageProducer {
@Resource
private RocketMQTemplate rocketMQTemplate;
public void sendAccountChangeEvent(AccountChangeEvent accountChangeEvent) {
// 1. To construct a message
JSONObject object = new JSONObject();
object.put("accountChange", accountChangeEvent);
Message<String> msg = MessageBuilder.withPayload(object.toJSONString()).build();
// 2. Send a message
rocketMQTemplate.sendMessageInTransaction("producer_ensure_transfer",
"topic_ensure_transfer",
msg, null);
}
}
- Implement the business layer code , It realizes the deduction of transaction message and local transaction , Be careful doUpdateAccountBalance If the local transaction of is executed successfully , It will be repeated in the transaction record (de_duplication) Save the data .
public interface AccountInfoService {
/** * Update account balance - Send a message * @param accountChange */
void updateAccountBalance(AccountChangeEvent accountChange);
/** * Update account balance - Local transactions * @param accountChange */
void doUpdateAccountBalance(AccountChangeEvent accountChange);
}
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
private BankMessageProducer bankMessageProducer;
@Autowired
private AccountInfoDao accountInfoDao;
/** * Update account balance - Sending notice * @param accountChange */
@Override
public void updateAccountBalance(AccountChangeEvent accountChange) {
bankMessageProducer.sendAccountChangeEvent(accountChange);
}
/** * Update account balance - Local transactions * @param accountChange */
@Override
@Transactional(isolation = Isolation.SERIALIZABLE)
public void doUpdateAccountBalance(AccountChangeEvent accountChange) {
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount() * -1);
accountInfoDao.addTx(accountChange.getTxNo());
}
}
- Realization RocketMQ Transaction message listener , There are two functions :
(1)executeLocalTransaction, This method performs local transactions , Will be RocketMQ Automatically call
(2)checkLocalTransaction, This method implements the transaction check , Utilizing the transaction de duplication table (de_duplication), Will be RocketMQ Automatically call
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_ensure_transfer")
public class TransferTransactionListenerImpl implements RocketMQLocalTransactionListener {
@Autowired
private AccountInfoService accountInfoService;
@Autowired
private AccountInfoDao accountInfoDao;
/** * Performing local transactions * @param msg * @param arg * @return */
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
//1. Receive and parse messages
final JSONObject jsonObject = JSON.parseObject(new String((byte[])
msg.getPayload()));
AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
class);
//2. Performing local transactions
Boolean isCommit = true;
try {
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
}catch (Exception e){
isCommit = false;
}
//3. Return execution result
if(isCommit){
return RocketMQLocalTransactionState.COMMIT;
}else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
/** * Back to business * @param msg * @return */
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
//1. Receive and parse messages
final JSONObject jsonObject = JSON.parseObject(new String((byte[])
msg.getPayload()));
AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
class);
//2. Inquire about de_duplication surface
int isExistTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());
//3. Return the value according to the query result
if(isExistTx>0){
return RocketMQLocalTransactionState.COMMIT;
}else {
return RocketMQLocalTransactionState.ROLLBACK;
}
}
}
- perfect Controller Code
@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
@GetMapping(value = "/transfer")
public String transfer(){
accountInfoService.updateAccountBalance(new AccountChangeEvent("1",100,System.currentTimeMillis()));
return " Transfer succeeded ";
}
}
The receiver of the message bank2
- Implement the data access layer , and bank1 equally , You can use it directly
- Implement business layer functions , Increase the account balance , Note that the transaction record de duplication table is used here (de_duplication) Achieve idempotency control
public interface AccountInfoService {
/** * Update account balance * @param accountChange */
void updateAccountBalance(AccountChangeEvent accountChange);
}
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
private AccountInfoDao accountInfoDao;
@Override
@Transactional(isolation = Isolation.SERIALIZABLE)
public void updateAccountBalance(AccountChangeEvent accountChange) {
int isExistTx = accountInfoDao.isExistTx(accountChange.getTxNo());
if(isExistTx == 0){
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
accountInfoDao.addTx(accountChange.getTxNo());
}
}
}
- Realization RocketMQ Transaction message listener , After receiving the message , Parsing message , Call the business layer for processing
@Component
@RocketMQMessageListener(topic = "topic_ensure_transfer", consumerGroup = "consumer_ensure_transfer")
@Slf4j
public class EnsureMessageConsumer implements RocketMQListener<String>{
@Autowired
private AccountInfoService accountInfoService;
@Override
public void onMessage(String projectStr) {
System.out.println(" Start consuming messages :" + projectStr);
final JSONObject jsonObject = JSON.parseObject(projectStr);
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
accountChangeEvent.setAccountNo("2");
accountInfoService.updateAccountBalance(accountChangeEvent);
}
}
A functional test
- bank1 and bank2 They all succeeded
- bank1 Failed to execute local transaction , be bank2 Transfer message not received .
- bank1 After executing the local transaction , Don't return any information , be Broker Will conduct transaction back check .
- bank2 Failed to execute local transaction , Will retry consumption .
Finally, reliable message consistent transactions are suitable for scenarios with long execution cycle and low real-time requirements . After introducing this mechanism , Synchronous transaction operations become asynchronous operations based on message execution , Avoid the impact of synchronous blocking operations in distributed transactions , And realize the decoupling of two services .
-------------------------- The content of the article comes from hmB Station courses , Learn to use --------------------------
边栏推荐
- Introduction to Chang'an chain node certificate, role and authority management
- Jenkins pipeline syntax
- 软件测试报告中常见的疏漏,给自己提个醒
- Diff prime pairs
- mySql学习记录——二、mySql建表命令
- 利用nvm动态调整nodejs版本,解决因为node版本过高或过低导致项目无法运行和打包
- (14) Inputfield logic analysis
- (13) Text rendering text
- Chapter 8 - two basic problems of data processing
- Basic exercise decomposing prime factors
猜你喜欢
![(node:22344) [DEP0123] DeprecationWarning: Setting the TLS ServerName to an IP address is not permit](/img/c1/d56ec09663857afa52f20848aeadac.png)
(node:22344) [DEP0123] DeprecationWarning: Setting the TLS ServerName to an IP address is not permit
![Sword finger offer:[day 9 dynamic planning (medium)] --- > maximum sum of continuous subarrays](/img/6b/6dcc86bfe0f48103ef8420b9996c1e.jpg)
Sword finger offer:[day 9 dynamic planning (medium)] --- > maximum sum of continuous subarrays

90%以上软件公司都会问的软件测试面试题赶紧来背吧

Binlog in mysql:

【字符集九】gbk拷贝到Unicode会乱码?

第三章 寄存器 (内存访问)

node示例后台搭建

MySQL learning record - II. MySQL create table command

After receiving the picture, caigou was very happy and played with PDF. The submission format was flag{xxx}, and the decryption characters should be in lowercase

mySql学习记录——二、mySql建表命令
随机推荐
MFS详解(四)——MFS管理服务器安装与配置
Introduction to applet cloud development -- questionnaire evaluation applet practice (7)
Solution of hmaster process flash back after starting
Minimum transfer times
目标识别、检测和 6D 姿态估算源码与方案(最先进的方法和数据集)
第五章-[bx]和Loop指令
Chapter 3 registers (memory access)
L1-019 who goes first
Several ways to restart kubernetes pod
(13) Text rendering text
Chapter 7 - more flexible location of memory addresses
(js)三位用逗号隔开,保留两位小数(or 四舍五入取整)
Introduction Fibonacci series
Filters and listeners
Source code and scheme for target recognition, detection and 6D attitude estimation (the most advanced method and data set)
MySQL - Import / export operation
Detailed explanation of iSCSI (V) -- actual operation of iSCSI client configuration
Summary of common character sets
Sword finger offer:[day 8 dynamic planning (simple)] --- > frog jumping on steps
(十四)InputField逻辑分析