当前位置:网站首页>Distributed transaction solution 2: message queue to achieve final consistency

Distributed transaction solution 2: message queue to achieve final consistency

2022-06-12 09:09:00 Eden Garden

1、 Reliable messages achieve ultimate consistency

We have learned before CAP theory , Know that we usually guarantee P and A, Abandon C, Ensure ultimate consistency .
Distributed transactions
The core of using message queue to achieve final consistency is to split distributed transactions into multiple local transactions , Then all transactions are coordinated by the message queue through the network , Achieve ultimate consistency .

This plan is easy to understand , But it will also face many problems :
1. The atomicity of the message sender executing local transactions and sending messages , That is, how to ensure successful execution of local transactions , The message must have been sent successfully

begin transaction
	1. Database operation 
	2. Send a message 
commit transation

In this case , There seems to be no problem , If sending a message fails , Will throw an exception , Cause database transaction rollback . But if the timeout is abnormal , Database rollback , But the message has been sent normally , It also leads to inconsistencies .

2. The atomicity of message receiving and local transaction , That is, how to ensure that the message is received successfully , The local transaction must be executed successfully

3. Because messages may be sent repeatedly , This requires that the message receiver must implement idempotency

​ Because in a production environment , The consumer is likely to be a cluster , If a consumption node times out but the consumption is successful , This will cause the cluster to consume this message repeatedly by other nodes in the same group . In addition, recovery after unexpected downtime , The consumption progress is not written to the disk in time , It will cause partial loss of consumption progress , This leads to repeated consumption of messages .

2、RocketMQ

RocketMQ It is a distributed message middleware from Alibaba , On 2012 In open source , And in 2017 Officially became Apache Top projects .Apache RocketMQ 4.3 Later versions officially support transaction messages , It provides convenient support for distributed transaction implementation . therefore , We go through RocketMQ We can solve the previous problem .

1. The atomicity of the message sender executing local transactions and sending messages , That is, how to ensure successful execution of local transactions , The message must have been sent successfully

​ RocketMQ Medium Broker And The sender has two-way communication capability , bring broker Born as a transaction coordinator ; also RocketMQ It provides a storage mechanism , Make transaction messages persistent ; These excellent designs ensure that even if something goes wrong ,RocketMQ It can still guarantee the final consistency of the transaction .
 Insert picture description here

  1. The sender sends a transaction message to Broker,RocketMQ Will mark the message status as “Prepared”, At this time, this message cannot be consumed by the receiver for the time being . Such news is called Half Message, Half message .
  2. Broker Return the successful sending to the sender
  3. The sender performs local transactions , For example, operating a database
  4. If the local transaction is executed successfully , send out commit A message to Broker,RocketMQ Will mark the message status as “ Consumable ”, At this point, the message can be consumed by the receiver ; If the local transaction fails , send out rollback A message to Broker,RocketMQ The message will be deleted .
  5. If the sender is in a local transaction , The service hangs up , Network flash off or timeout , that Broker No confirmation will be received
  6. here RocketMQ The sender will be constantly asked to obtain the execution status of the local transaction ( I.e. transaction query )
  7. Determine according to the result of the transaction review Commit or Rollback, This ensures that both the message sending and the local transaction succeed or fail at the same time .

The above main process has been completed by RocketMQ Realization , For us, we only need to implement the local transaction execution method and the local transaction backlookup method respectively , Specifically, it implements the following interface :

public interface TransactionListener {
    
    /** -  send out prepare After the message succeeds, the method is called back to execute the local transaction  - @param msg  The news back , utilize transactionId You can get the only Id - @param arg  call send Parameters passed on method , When send If there are additional parameters, they can be passed to send In the method , You can get  - @return  Return transaction status ,COMMIT: Submit  ROLLBACK: Roll back  UNKNOW: Unknown , Need to check back  */
    LocalTransactionState executeLocalTransaction(final Message msg, final Object arg);
    
    /** - @param msg  By acquiring transactionId To determine the local transaction execution status of this message  - @return  Return transaction status ,COMMIT: Submit  ROLLBACK: Roll back  UNKNOW: Unknown , Need to check back  */
    LocalTransactionState checkLocalTransaction(final MessageExt msg);
}

2. The atomicity of message receiving and local transaction , That is, how to ensure that the message is received successfully , The local transaction must be executed successfully

If there is an exception ,RocketMQ Through the retry mechanism , Consume messages every once in a while , Then execute the local transaction ; If it is timeout ,RocketMQ There will be unlimited consumption messages , Continue to execute local affairs , Until we succeed .

3、 a master hand 's first small display

Environmental requirements
  • database :MySQL-5.7+
  • JDK:64 position jdk1.8+
  • Microservices :spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
  • RocketMQ Server side :RocketMQ-4.5.0
  • RocketMQ client :RocketMQ-spring-boot-starter.2.0.2-RELEASE
Create database

This case requires two databases , One is bank1, One is bank2, No need to create , Use it directly Hmily The database in the quick start case is just . in addition , In order to realize idempotence , Need to be separately in bank1、bank2 Add... To the database de_duplication surface , That is, the transaction record table ( De duplication ).

DROP TABLE IF EXISTS `de_duplication`;
CREATE TABLE `de_duplication`  (
  `tx_no` bigint(20) NOT NULL,
  `create_time` datetime(0) NULL DEFAULT NULL,
  PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
start-up RocketMQ

start-up nameserver:

set ROCKETMQ_HOME=[RocketMQ Server decompression path ]
start [RocketMQ Server decompression path ]/bin/mqnamesrv.cmd

start-up broker:

set ROCKETMQ_HOME=[RocketMQ Server decompression path ]
start [RocketMQ Server decompression path ]/bin/mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
Maven engineering

Create two maven engineering , Connect to different databases

Function realization

Message sender bank1
  1. Define a class to encapsulate the transfer message :
@Data
@AllArgsConstructor
@NoArgsConstructor
public class AccountChangeEvent implements Serializable {
    
    /** *  account number  */
    private String accountNo;
    /** *  Change amount  */
    private double amount;
    /** *  Transaction number , Time stamp  */
    private long txNo;
}
  1. Implement the data access layer , There are four functions
@Mapper
@Component
public interface AccountInfoDao {
    

    /** *  Modify the balance of an account  * @param accountNo  account number  * @param amount  Change amount  * @return */
    @Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
    int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);

    /** *  Query the information of an account  * @param accountNo  account number  * @return */
    @Select("select * from account_info where where account_no=#{accountNo}")
    AccountInfo findByIdAccountNo(@Param("accountNo") String accountNo);

    /** *  Query whether a transaction record has been executed  * @param txNo  Transaction number  * @return */
    @Select("select count(1) from de_duplication where tx_no = #{txNo}")
    int isExistTx(long txNo);

    /** *  Save a transaction execution record  * @param txNo  Transaction number  * @return */
    @Insert("insert into de_duplication values(#{txNo},now());")
    int addTx(long txNo);

}
  1. Send transfer message
@Component
@Slf4j
public class BankMessageProducer {
    
   @Resource
   private RocketMQTemplate rocketMQTemplate;

   public void sendAccountChangeEvent(AccountChangeEvent accountChangeEvent) {
    
      // 1. To construct a message 
      JSONObject object = new JSONObject();
      object.put("accountChange", accountChangeEvent);
      Message<String> msg = MessageBuilder.withPayload(object.toJSONString()).build();
      // 2. Send a message 
      rocketMQTemplate.sendMessageInTransaction("producer_ensure_transfer",
            "topic_ensure_transfer",
            msg, null);
   }
}
  1. Implement the business layer code , It realizes the deduction of transaction message and local transaction , Be careful doUpdateAccountBalance If the local transaction of is executed successfully , It will be repeated in the transaction record (de_duplication) Save the data .
public interface AccountInfoService {
    
	/** *  Update account balance - Send a message  * @param accountChange */
	void updateAccountBalance(AccountChangeEvent accountChange);

	/** *  Update account balance - Local transactions  * @param accountChange */
	void doUpdateAccountBalance(AccountChangeEvent accountChange);
}

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    

   @Autowired
   private BankMessageProducer bankMessageProducer;

   @Autowired
   private AccountInfoDao accountInfoDao;
    
	/** *  Update account balance - Sending notice  * @param accountChange */
   @Override
   public void updateAccountBalance(AccountChangeEvent accountChange) {
    
      bankMessageProducer.sendAccountChangeEvent(accountChange);
   }
    
    /** *  Update account balance - Local transactions  * @param accountChange */
   @Override
   @Transactional(isolation = Isolation.SERIALIZABLE)
   public void doUpdateAccountBalance(AccountChangeEvent accountChange) {
    
      accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount() * -1);
      accountInfoDao.addTx(accountChange.getTxNo());
   }
}
  1. Realization RocketMQ Transaction message listener , There are two functions :

​ (1)executeLocalTransaction, This method performs local transactions , Will be RocketMQ Automatically call

​ (2)checkLocalTransaction, This method implements the transaction check , Utilizing the transaction de duplication table (de_duplication), Will be RocketMQ Automatically call

@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_ensure_transfer")
public class TransferTransactionListenerImpl implements RocketMQLocalTransactionListener {
    

    @Autowired
    private AccountInfoService accountInfoService;

    @Autowired
    private AccountInfoDao accountInfoDao;

    /** *  Performing local transactions  * @param msg * @param arg * @return */
    @Override
    public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
    
 		//1. Receive and parse messages 
        final JSONObject jsonObject = JSON.parseObject(new String((byte[])
                msg.getPayload()));
        AccountChangeEvent accountChangeEvent =
                JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
                        class);

        //2. Performing local transactions 
        Boolean isCommit = true;
        try {
    
            accountInfoService.doUpdateAccountBalance(accountChangeEvent);
        }catch (Exception e){
    
            isCommit = false;
        }

        //3. Return execution result 
        if(isCommit){
    
            return RocketMQLocalTransactionState.COMMIT;
        }else {
    
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }

    /** *  Back to business  * @param msg * @return */
    @Override
    public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
    
        //1. Receive and parse messages 
        final JSONObject jsonObject = JSON.parseObject(new String((byte[])
                msg.getPayload()));
        AccountChangeEvent accountChangeEvent =
                JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.
                        class);

        //2. Inquire about de_duplication surface 
        int isExistTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo());

        //3. Return the value according to the query result 
        if(isExistTx>0){
    
            return RocketMQLocalTransactionState.COMMIT;
        }else {
    
            return RocketMQLocalTransactionState.ROLLBACK;
        }
    }
}
  1. perfect Controller Code
@RestController
@Slf4j
public class AccountInfoController {
    
    @Autowired
    private AccountInfoService accountInfoService;

    @GetMapping(value = "/transfer")
    public String transfer(){
    
        accountInfoService.updateAccountBalance(new AccountChangeEvent("1",100,System.currentTimeMillis()));
        return " Transfer succeeded ";
    }
}
The receiver of the message bank2
  1. Implement the data access layer , and bank1 equally , You can use it directly
  2. Implement business layer functions , Increase the account balance , Note that the transaction record de duplication table is used here (de_duplication) Achieve idempotency control
public interface AccountInfoService {
    
    /** *  Update account balance  * @param accountChange */
    void updateAccountBalance(AccountChangeEvent accountChange);
} 

@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
    

   @Autowired
   private AccountInfoDao accountInfoDao;

   @Override
   @Transactional(isolation = Isolation.SERIALIZABLE)
   public void updateAccountBalance(AccountChangeEvent accountChange) {
    
      int isExistTx = accountInfoDao.isExistTx(accountChange.getTxNo());
      if(isExistTx == 0){
    
         accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount());
         accountInfoDao.addTx(accountChange.getTxNo());
      }
   }
}
  1. Realization RocketMQ Transaction message listener , After receiving the message , Parsing message , Call the business layer for processing
@Component
@RocketMQMessageListener(topic = "topic_ensure_transfer", consumerGroup = "consumer_ensure_transfer")
@Slf4j
public class EnsureMessageConsumer implements RocketMQListener<String>{
    

   @Autowired
   private AccountInfoService accountInfoService;

   @Override
   public void onMessage(String  projectStr) {
    
      System.out.println(" Start consuming messages :" + projectStr);
      final JSONObject jsonObject = JSON.parseObject(projectStr);
      AccountChangeEvent accountChangeEvent = JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
      accountChangeEvent.setAccountNo("2");
      accountInfoService.updateAccountBalance(accountChangeEvent);
   }
}

A functional test

  • bank1 and bank2 They all succeeded
  • bank1 Failed to execute local transaction , be bank2 Transfer message not received .
  • bank1 After executing the local transaction , Don't return any information , be Broker Will conduct transaction back check .
  • bank2 Failed to execute local transaction , Will retry consumption .

​ Finally, reliable message consistent transactions are suitable for scenarios with long execution cycle and low real-time requirements . After introducing this mechanism , Synchronous transaction operations become asynchronous operations based on message execution , Avoid the impact of synchronous blocking operations in distributed transactions , And realize the decoupling of two services .

-------------------------- The content of the article comes from hmB Station courses , Learn to use --------------------------

原网站

版权声明
本文为[Eden Garden]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203010531311447.html