当前位置:网站首页>[real case] how to deal with the failure of message consumption?
[real case] how to deal with the failure of message consumption?
2022-07-04 07:19:00 【Java geek Technology】
One 、 Introduce
In the introduction of message middleware MQ Before , Let's have a brief understanding of , Why reference message middleware .
for example , In the e-commerce platform , Common user orders , It will go through the following processes .
When the user places an order , After creating the order , Will call the third party payment platform , Deduct the amount of the user's account , If the platform payment deduction is successful , The result will be notified to the corresponding business system , Then the business system will update the order status , Call the warehouse interface at the same time , Reduce inventory , Inform logistics to deliver goods !
Just imagine , Update from order status 、 To reduce inventory 、 Inform logistics to deliver goods in one method Synchronization complete , If the user pays successfully 、 Order status update is also successful , But the steps of deducting inventory or notifying logistics to deliver goods failed , Then there's a problem , The user has paid successfully , It's just a failure to deduct inventory from the warehouse , Which leads to the failure of the whole deal !
One order failed , The boss can pretend to be invisible , But if thousands of lists fail , So the business loss caused by the system , It will be huge , The boss may not be able to sit down !
therefore , For this business scenario , Architects introduce asynchronous communication Technical solution , In order to ensure the high availability of services , The general process is as follows :
When the order system receives the deduction result from the payment platform , An order message will be sent to MQ Message middleware , It also updates the order status .
On the other end , The warehouse system monitors the messages sent by the order system asynchronously , After receiving the order message , And then operate to deduct inventory 、 Inform the logistics company to deliver goods and other services !
Under the optimized process , Even if the inventory deduction service fails , And it won't affect user transactions .
just as 《 One month myth 》 What we say , Software Engineering , There is no silver bullet !
When introduced MQ After message middleware , There's another problem as well , If MQ Message middleware is down all of a sudden , The message cannot be sent , Then the warehouse system can't receive the order message , And then we can't deliver !
In response to this question , The mainstream solution in the industry is to adopt Cluster deployment , One master and many slaves , In order to achieve high availability of services , Even if a machine suddenly goes down , And still keep the service available , During a server failure , By means of operation and maintenance , Restart the service , After that, the service can still run normally !
But there's another problem , If the warehouse system has received the order message , But business processing exception , Or the server is abnormal , As a result, there is no deduction for the current inventory of goods , There was no delivery !
How to deal with it at this time ?
Today we are going to introduce this kind of scene , If news consumption fails , What should we do with ?
Two 、 Solution
For the scenario of message consumption failure , We usually deal with it in the following way :
- When message consumption fails , The message will be re pushed
- If the number of retries exceeds the maximum , Exception messages are stored in the database , And then manual intervention to check the problem , Make manual retries
When a message fails to be consumed on the client , We will add the exception message to a message retrial object , At the same time, set the maximum number of retries , And push the message back to MQ In message middleware , When the number of retries exceeds the maximum , The exception message will be stored in MongoDB
In the database , It is convenient for subsequent query of abnormal information .
Based on the above system model , We can write a public retry component , Don't talk much , Direct drying !
3、 ... and 、 Code practice
This compensation service adopts rabbitmq Message middleware , Other message middleware processing ideas are similar !
3.1、 Create a message retrial entity class
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Original news body
*/
private String bodyMsg;
/**
* Source ID
*/
private String sourceId;
/**
* Source description
*/
private String sourceDesc;
/**
* exchanger
*/
private String exchangeName;
/**
* Routing key
*/
private String routingKey;
/**
* queue
*/
private String queueName;
/**
* state ,1: initialization ,2: success ,3: Failure
*/
private Integer status = 1;
/**
* max retries
*/
private Integer maxTryCount = 3;
/**
* Current retries
*/
private Integer currentRetryCount = 0;
/**
* Retry interval ( millisecond )
*/
private Long retryIntervalTime = 0L;
/**
* Mission failure information
*/
private String errorMsg;
/**
* Creation time
*/
private Date createTime;
@Override
public String toString() {
return "MessageRetryDTO{" +
"bodyMsg='" + bodyMsg + '\'' +
", sourceId='" + sourceId + '\'' +
", sourceDesc='" + sourceDesc + '\'' +
", exchangeName='" + exchangeName + '\'' +
", routingKey='" + routingKey + '\'' +
", queueName='" + queueName + '\'' +
", status=" + status +
", maxTryCount=" + maxTryCount +
", currentRetryCount=" + currentRetryCount +
", retryIntervalTime=" + retryIntervalTime +
", errorMsg='" + errorMsg + '\'' +
", createTime=" + createTime +
'}';
}
/**
* Check if the number of retries exceeds the maximum
*
* @return
*/
public boolean checkRetryCount() {
retryCountCalculate();
// Check if the number of retries exceeds the maximum
if (this.currentRetryCount < this.maxTryCount) {
return true;
}
return false;
}
/**
* Recalculate the number of retries
* /
private void retryCountCalculate() {
this.currentRetryCount = this.currentRetryCount + 1;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
3.2、 Write abstract classes for service retries
public abstract class CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MongoTemplate mongoTemplate;
/**
* Initialization message
*
* @param message
*/
public void initMessage(Message message) {
log.info("{} Received a message : {}, Business data :{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
try {
// Encapsulate message
MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
if (log.isInfoEnabled()) {
log.info(" Deserialize message :{}", messageRetryDto.toString());
}
prepareAction(messageRetryDto);
} catch (Exception e) {
log.warn(" Handle message exception , error message :", e);
}
}
/**
* Prepare to carry out
*
* @param retryDto
*/
protected void prepareAction(MessageRetryDTO retryDto) {
try {
execute(retryDto);
doSuccessCallBack(retryDto);
} catch (Exception e) {
log.error(" The current task execution is abnormal , Business data :" + retryDto.toString(), e);
// Execution failure , Calculate if you still need to try again
if (retryDto.checkRetryCount()) {
if (log.isInfoEnabled()) {
log.info(" Retry message :{}", retryDto.toString());
}
retrySend(retryDto);
} else {
if (log.isWarnEnabled()) {
log.warn(" The current task has reached the maximum number of retries , Business data :" + retryDto.toString(), e);
}
doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
}
}
}
/**
* The task was executed successfully , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
try {
successCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Successful execution, callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* Task execution failed , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doFailCallBack(MessageRetryDTO messageRetryDto) {
try {
saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
failCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Execution failure callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* Perform tasks
*
* @param messageRetryDto
*/
protected abstract void execute(MessageRetryDTO messageRetryDto);
/**
* Successful callback
*
* @param messageRetryDto
*/
protected abstract void successCallback(MessageRetryDTO messageRetryDto);
/**
* Failed callback
*
* @param messageRetryDto
*/
protected abstract void failCallback(MessageRetryDTO messageRetryDto);
/**
* Build a message compensation entity
* @param message
* @return
*/
private MessageRetryDTO buildMessageRetryInfo(Message message){
// If the header contains a compensation message entity , Go straight back to
Map < String, Object > messageHeaders = message.getMessageProperties().getHeaders();
if(messageHeaders.containsKey("message_retry_info")){
Object retryMsg = messageHeaders.get("message_retry_info");
if(Objects.nonNull(retryMsg)){
return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
}
}
// Automatically add business messages to the compensation entity
MessageRetryDTO messageRetryDto = new MessageRetryDTO();
messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
messageRetryDto.setCreateTime(new Date());
return messageRetryDto;
}
/**
* The exception message is put into storage again
* @param retryDto
*/
private void retrySend(MessageRetryDTO retryDto){
// Put the compensation message entity in the header , The original message content remains unchanged
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
}
/**
* Store exception messages in mongodb in
* @param retryDto
*/
private void saveMessageRetryInfo(MessageRetryDTO retryDto){
try {
mongoTemplate.save(retryDto, "message_retry_info");
} catch (Exception e){
log.error(" Store exception messages in mongodb Failure , The message data :" + retryDto.toString(), e);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
3.3、 Write listening service class
When it comes to consumer applications , It's very simple , for example , For inventory deduction operations , We can deal with it in the following ways !
@Component
public class OrderServiceListener extends CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
/**
* Monitor the order system order success message
* @param message
*/
@RabbitListener(queues = "mq.order.add")
public void consume(Message message) {
log.info(" Received the message of successful order : {}", message.toString());
super.initMessage(message);
}
@Override
protected void execute(MessageRetryDTO messageRetryDto) {
// Call the deduction inventory service , Throw business exceptions out
}
@Override
protected void successCallback(MessageRetryDTO messageRetryDto) {
// Successful business processing , Callback
}
@Override
protected void failCallback(MessageRetryDTO messageRetryDto) {
// Business processing failed , Callback
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
When message consumption fails , And exceed the maximum number of times , The message will be stored in mongodb in , And then, like normal database operations , Can pass web Interface query exception message , And try again for specific scenarios !
Four 、 Summary
Some students may ask , Why not store exception messages in the database ?
At first it was stored in MYSQL in , But with the rapid development of business , Order message data structure is more and more complex , The amount of data is also very large , Even as big as MYSQL Medium text Types cannot be stored , At the same time, this data structure is not suitable for MYSQL Storage in , So move it to mongodb!
This paper focuses on the scenario of message consumption failure , Explain the basic program and code practice , There may be some misunderstandings , Welcome the criticism that !
5、 ... and 、 Reference resources
1、 Notes on Shishan's structure - How to deal with the problem of message consumption failure
PS: Reply within company number 「java」 You can enter Java Novice learning communication group , Grow and progress together !
Old rules , Brothers, remember , If you think the content of the article is good , Remember to share the circle of friends and let more people know !
- 1.
- 2.
Please pay attention to
Recently, you should find that the information flow of WeChat official account has been revised , No longer in chronological order . It's a challenge to the original trumpet owners like a fan , It can be said that it was a great blow , Reading has plummeted , Positive feedback continues to weaken .
If you want to receive a fan's article at the first time , Not affected by the public number's information flow , Then you can give Java Geek technology is set as a
Finally, thank you for reading , Pretty good , It's hard to avoid mistakes , If you find something wrong , Leave a message to tell a fan , Ah fan dotes on you so much , It's bound to change ~
Thank you for your support ~
边栏推荐
- ABCD four sequential execution methods, extended application
- MySQL storage engine
- MySQL relearn 2- Alibaba cloud server CentOS installation mysql8.0
- [untitled] notice on holding "2022 traditional fermented food and modern brewing technology"
- Valentine's Day is coming! Without 50W bride price, my girlfriend was forcibly dragged away...
- Summary of June 2022
- The cloud native programming challenge ended, and Alibaba cloud launched the first white paper on application liveliness technology in the field of cloud native
- Zephyr study notes 2, scheduling
- Status of the thread
- [Android reverse] function interception (use cache_flush system function to refresh CPU cache | refresh CPU cache disadvantages | recommended time for function interception)
猜你喜欢
[kubernetes series] kubesphere is installed on kubernetes
A new understanding of how to encrypt industrial computers: host reinforcement application
[GF (q) + LDPC] regular LDPC coding and decoding design and MATLAB simulation based on the GF (q) field of binary graph
NLP literature reading summary
Selenium ide plug-in download, installation and use tutorial
Set JTAG fuc invalid to normal IO port
Review of enterprise security incidents: how can enterprises do a good job in preventing source code leakage?
Node connection MySQL access denied for user 'root' @ 'localhost' (using password: yes
Two years ago, the United States was reluctant to sell chips, but now there are mountains of chips begging China for help
在所有SwiftUI版本(1.0-4.0)中原生实现Charts图表视图之思路
随机推荐
Responsive - media query
Electronic Association C language level 1 35, bank interest
The important role of host reinforcement concept in medical industry
The final week, I split
Knowledge payment applet dream vending machine V2
BasicVSR++: Improving Video Super-Resolutionwith Enhanced Propagation and Alignment
NLP-文献阅读总结
电子协会 C语言 1级 34 、分段函数
Campus network problems
[FPGA tutorial case 8] design and implementation of frequency divider based on Verilog
Mobile adaptation: vw/vh
Introduction to spark core components
Highly paid programmers & interview questions: how does redis of series 119 realize distributed locks?
2022-021rts: from the second half of the year
[freertos] freertos Learning notes (7) - written freertos bidirectionnel Link LIST / source analysis
Improve the accuracy of 3D reconstruction of complex scenes | segmentation of UAV Remote Sensing Images Based on paddleseg
How notepad++ counts words
Redis - detailed explanation of cache avalanche, cache penetration and cache breakdown
Data double write consistency between redis and MySQL
在已經知道錶格列勾選一個顯示一列