当前位置:网站首页>[real case] how to deal with the failure of message consumption?

[real case] how to deal with the failure of message consumption?

2022-07-04 07:19:00 Java geek Technology


One 、 Introduce

In the introduction of message middleware MQ Before , Let's have a brief understanding of , Why reference message middleware .

for example , In the e-commerce platform , Common user orders , It will go through the following processes .

When the user places an order , After creating the order , Will call the third party payment platform , Deduct the amount of the user's account , If the platform payment deduction is successful , The result will be notified to the corresponding business system , Then the business system will update the order status , Call the warehouse interface at the same time , Reduce inventory , Inform logistics to deliver goods !

【 Real case 】 How to deal with the failure of message consumption ?_mongodb

Just imagine , Update from order status 、 To reduce inventory 、 Inform logistics to deliver goods in one method Synchronization complete , If the user pays successfully 、 Order status update is also successful , But the steps of deducting inventory or notifying logistics to deliver goods failed , Then there's a problem , The user has paid successfully , It's just a failure to deduct inventory from the warehouse , Which leads to the failure of the whole deal !

One order failed , The boss can pretend to be invisible , But if thousands of lists fail , So the business loss caused by the system , It will be huge , The boss may not be able to sit down !

therefore , For this business scenario , Architects introduce asynchronous communication Technical solution , In order to ensure the high availability of services , The general process is as follows :

【 Real case 】 How to deal with the failure of message consumption ?_ Message store _02

When the order system receives the deduction result from the payment platform , An order message will be sent to MQ Message middleware , It also updates the order status .

On the other end , The warehouse system monitors the messages sent by the order system asynchronously , After receiving the order message , And then operate to deduct inventory 、 Inform the logistics company to deliver goods and other services !

Under the optimized process , Even if the inventory deduction service fails , And it won't affect user transactions .

just as 《 One month myth 》 What we say , Software Engineering , There is no silver bullet

When introduced MQ After message middleware , There's another problem as well , If MQ Message middleware is down all of a sudden , The message cannot be sent , Then the warehouse system can't receive the order message , And then we can't deliver !

In response to this question , The mainstream solution in the industry is to adopt Cluster deployment , One master and many slaves , In order to achieve high availability of services , Even if a machine suddenly goes down , And still keep the service available , During a server failure , By means of operation and maintenance , Restart the service , After that, the service can still run normally !

But there's another problem , If the warehouse system has received the order message , But business processing exception , Or the server is abnormal , As a result, there is no deduction for the current inventory of goods , There was no delivery !

How to deal with it at this time ?

Today we are going to introduce this kind of scene , If news consumption fails , What should we do with ?

Two 、 Solution

For the scenario of message consumption failure , We usually deal with it in the following way :


  • When message consumption fails , The message will be re pushed
  • If the number of retries exceeds the maximum , Exception messages are stored in the database , And then manual intervention to check the problem , Make manual retries

【 Real case 】 How to deal with the failure of message consumption ?_ Message store _03

When a message fails to be consumed on the client , We will add the exception message to a message retrial object , At the same time, set the maximum number of retries , And push the message back to MQ In message middleware , When the number of retries exceeds the maximum , The exception message will be stored in ​​MongoDB​​ In the database , It is convenient for subsequent query of abnormal information .

Based on the above system model , We can write a public retry component , Don't talk much , Direct drying !

3、 ... and 、 Code practice

This compensation service adopts rabbitmq Message middleware , Other message middleware processing ideas are similar !

3.1、 Create a message retrial entity class

       
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {

private static final long serialVersionUID = 1L;

/**
* Original news body
*/
private String bodyMsg;

/**
* Source ID
*/
private String sourceId;

/**
* Source description
*/
private String sourceDesc;

/**
* exchanger
*/
private String exchangeName;

/**
* Routing key
*/
private String routingKey;

/**
* queue
*/
private String queueName;

/**
* state ,1: initialization ,2: success ,3: Failure
*/
private Integer status = 1;

/**
* max retries
*/
private Integer maxTryCount = 3;

/**
* Current retries
*/
private Integer currentRetryCount = 0;

/**
* Retry interval ( millisecond )
*/
private Long retryIntervalTime = 0L;

/**
* Mission failure information
*/
private String errorMsg;

/**
* Creation time
*/
private Date createTime;

@Override
public String toString() {
return "MessageRetryDTO{" +
"bodyMsg='" + bodyMsg + '\'' +
", sourceId='" + sourceId + '\'' +
", sourceDesc='" + sourceDesc + '\'' +
", exchangeName='" + exchangeName + '\'' +
", routingKey='" + routingKey + '\'' +
", queueName='" + queueName + '\'' +
", status=" + status +
", maxTryCount=" + maxTryCount +
", currentRetryCount=" + currentRetryCount +
", retryIntervalTime=" + retryIntervalTime +
", errorMsg='" + errorMsg + '\'' +
", createTime=" + createTime +
'}';
}

/**
* Check if the number of retries exceeds the maximum
*
* @return
*/
public boolean checkRetryCount() {
retryCountCalculate();
// Check if the number of retries exceeds the maximum
if (this.currentRetryCount < this.maxTryCount) {
return true;
}
return false;
}

/**
* Recalculate the number of retries
* /
private void retryCountCalculate() {
this.currentRetryCount = this.currentRetryCount + 1;
}

}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.

3.2、 Write abstract classes for service retries

       
public abstract class CommonMessageRetryService {

private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);

@Autowired
private RabbitTemplate rabbitTemplate;

@Autowired
private MongoTemplate mongoTemplate;


/**
* Initialization message
*
* @param message
*/
public void initMessage(Message message) {
log.info("{} Received a message : {}, Business data :{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
try {
// Encapsulate message
MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
if (log.isInfoEnabled()) {
log.info(" Deserialize message :{}", messageRetryDto.toString());
}
prepareAction(messageRetryDto);
} catch (Exception e) {
log.warn(" Handle message exception , error message :", e);
}
}

/**
* Prepare to carry out
*
* @param retryDto
*/
protected void prepareAction(MessageRetryDTO retryDto) {
try {
execute(retryDto);
doSuccessCallBack(retryDto);
} catch (Exception e) {
log.error(" The current task execution is abnormal , Business data :" + retryDto.toString(), e);
// Execution failure , Calculate if you still need to try again
if (retryDto.checkRetryCount()) {
if (log.isInfoEnabled()) {
log.info(" Retry message :{}", retryDto.toString());
}
retrySend(retryDto);
} else {
if (log.isWarnEnabled()) {
log.warn(" The current task has reached the maximum number of retries , Business data :" + retryDto.toString(), e);
}
doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
}
}
}

/**
* The task was executed successfully , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
try {
successCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Successful execution, callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}

/**
* Task execution failed , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doFailCallBack(MessageRetryDTO messageRetryDto) {
try {
saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
failCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Execution failure callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}

/**
* Perform tasks
*
* @param messageRetryDto
*/
protected abstract void execute(MessageRetryDTO messageRetryDto);

/**
* Successful callback
*
* @param messageRetryDto
*/
protected abstract void successCallback(MessageRetryDTO messageRetryDto);

/**
* Failed callback
*
* @param messageRetryDto
*/
protected abstract void failCallback(MessageRetryDTO messageRetryDto);

/**
* Build a message compensation entity
* @param message
* @return
*/
private MessageRetryDTO buildMessageRetryInfo(Message message){
// If the header contains a compensation message entity , Go straight back to
Map < String, Object > messageHeaders = message.getMessageProperties().getHeaders();
if(messageHeaders.containsKey("message_retry_info")){
Object retryMsg = messageHeaders.get("message_retry_info");
if(Objects.nonNull(retryMsg)){
return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
}
}
// Automatically add business messages to the compensation entity
MessageRetryDTO messageRetryDto = new MessageRetryDTO();
messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
messageRetryDto.setCreateTime(new Date());
return messageRetryDto;
}

/**
* The exception message is put into storage again
* @param retryDto
*/
private void retrySend(MessageRetryDTO retryDto){
// Put the compensation message entity in the header , The original message content remains unchanged
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
}



/**
* Store exception messages in mongodb in
* @param retryDto
*/
private void saveMessageRetryInfo(MessageRetryDTO retryDto){
try {
mongoTemplate.save(retryDto, "message_retry_info");
} catch (Exception e){
log.error(" Store exception messages in mongodb Failure , The message data :" + retryDto.toString(), e);
}
}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.
  • 32.
  • 33.
  • 34.
  • 35.
  • 36.
  • 37.
  • 38.
  • 39.
  • 40.
  • 41.
  • 42.
  • 43.
  • 44.
  • 45.
  • 46.
  • 47.
  • 48.
  • 49.
  • 50.
  • 51.
  • 52.
  • 53.
  • 54.
  • 55.
  • 56.
  • 57.
  • 58.
  • 59.
  • 60.
  • 61.
  • 62.
  • 63.
  • 64.
  • 65.
  • 66.
  • 67.
  • 68.
  • 69.
  • 70.
  • 71.
  • 72.
  • 73.
  • 74.
  • 75.
  • 76.
  • 77.
  • 78.
  • 79.
  • 80.
  • 81.
  • 82.
  • 83.
  • 84.
  • 85.
  • 86.
  • 87.
  • 88.
  • 89.
  • 90.
  • 91.
  • 92.
  • 93.
  • 94.
  • 95.
  • 96.
  • 97.
  • 98.
  • 99.
  • 100.
  • 101.
  • 102.
  • 103.
  • 104.
  • 105.
  • 106.
  • 107.
  • 108.
  • 109.
  • 110.
  • 111.
  • 112.
  • 113.
  • 114.
  • 115.
  • 116.
  • 117.
  • 118.
  • 119.
  • 120.
  • 121.
  • 122.
  • 123.
  • 124.
  • 125.
  • 126.
  • 127.
  • 128.
  • 129.
  • 130.
  • 131.
  • 132.
  • 133.
  • 134.
  • 135.
  • 136.
  • 137.
  • 138.
  • 139.
  • 140.
  • 141.
  • 142.
  • 143.
  • 144.
  • 145.
  • 146.
  • 147.
  • 148.
  • 149.
  • 150.
  • 151.
  • 152.
  • 153.
  • 154.
  • 155.

3.3、 Write listening service class

When it comes to consumer applications , It's very simple , for example , For inventory deduction operations , We can deal with it in the following ways !

       
@Component
public class OrderServiceListener extends CommonMessageRetryService {

private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);

/**
* Monitor the order system order success message
* @param message
*/
@RabbitListener(queues = "mq.order.add")
public void consume(Message message) {
log.info(" Received the message of successful order : {}", message.toString());
super.initMessage(message);
}


@Override
protected void execute(MessageRetryDTO messageRetryDto) {
// Call the deduction inventory service , Throw business exceptions out
}

@Override
protected void successCallback(MessageRetryDTO messageRetryDto) {
// Successful business processing , Callback
}

@Override
protected void failCallback(MessageRetryDTO messageRetryDto) {
// Business processing failed , Callback
}
}
  • 1.
  • 2.
  • 3.
  • 4.
  • 5.
  • 6.
  • 7.
  • 8.
  • 9.
  • 10.
  • 11.
  • 12.
  • 13.
  • 14.
  • 15.
  • 16.
  • 17.
  • 18.
  • 19.
  • 20.
  • 21.
  • 22.
  • 23.
  • 24.
  • 25.
  • 26.
  • 27.
  • 28.
  • 29.
  • 30.
  • 31.

When message consumption fails , And exceed the maximum number of times , The message will be stored in mongodb in , And then, like normal database operations , Can pass web Interface query exception message , And try again for specific scenarios !

Four 、 Summary

Some students may ask , Why not store exception messages in the database ?

At first it was stored in MYSQL in , But with the rapid development of business , Order message data structure is more and more complex , The amount of data is also very large , Even as big as MYSQL Medium text Types cannot be stored , At the same time, this data structure is not suitable for MYSQL Storage in , So move it to mongodb!

This paper focuses on the scenario of message consumption failure , Explain the basic program and code practice , There may be some misunderstandings , Welcome the criticism that !

5、 ... and 、 Reference resources

1、 Notes on Shishan's structure - How to deal with the problem of message consumption failure

       
PS: Reply within company number 「java」 You can enter Java Novice learning communication group , Grow and progress together !
Old rules , Brothers, remember , If you think the content of the article is good , Remember to share the circle of friends and let more people know !
  • 1.
  • 2.


Please pay attention to

Recently, you should find that the information flow of WeChat official account has been revised , No longer in chronological order . It's a challenge to the original trumpet owners like a fan , It can be said that it was a great blow , Reading has plummeted , Positive feedback continues to weaken .


If you want to receive a fan's article at the first time , Not affected by the public number's information flow , Then you can give Java Geek technology is set as a

Finally, thank you for reading , Pretty good , It's hard to avoid mistakes , If you find something wrong , Leave a message to tell a fan , Ah fan dotes on you so much , It's bound to change ~

Thank you for your support ~




原网站

版权声明
本文为[Java geek Technology]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202141533196886.html