当前位置:网站首页>[real case] how to deal with the failure of message consumption?
[real case] how to deal with the failure of message consumption?
2022-07-04 07:19:00 【Java geek Technology】
One 、 Introduce
In the introduction of message middleware MQ Before , Let's have a brief understanding of , Why reference message middleware .
for example , In the e-commerce platform , Common user orders , It will go through the following processes .
When the user places an order , After creating the order , Will call the third party payment platform , Deduct the amount of the user's account , If the platform payment deduction is successful , The result will be notified to the corresponding business system , Then the business system will update the order status , Call the warehouse interface at the same time , Reduce inventory , Inform logistics to deliver goods !
Just imagine , Update from order status 、 To reduce inventory 、 Inform logistics to deliver goods in one method Synchronization complete , If the user pays successfully 、 Order status update is also successful , But the steps of deducting inventory or notifying logistics to deliver goods failed , Then there's a problem , The user has paid successfully , It's just a failure to deduct inventory from the warehouse , Which leads to the failure of the whole deal !
One order failed , The boss can pretend to be invisible , But if thousands of lists fail , So the business loss caused by the system , It will be huge , The boss may not be able to sit down !
therefore , For this business scenario , Architects introduce asynchronous communication Technical solution , In order to ensure the high availability of services , The general process is as follows :
When the order system receives the deduction result from the payment platform , An order message will be sent to MQ Message middleware , It also updates the order status .
On the other end , The warehouse system monitors the messages sent by the order system asynchronously , After receiving the order message , And then operate to deduct inventory 、 Inform the logistics company to deliver goods and other services !
Under the optimized process , Even if the inventory deduction service fails , And it won't affect user transactions .
just as 《 One month myth 》 What we say , Software Engineering , There is no silver bullet !
When introduced MQ After message middleware , There's another problem as well , If MQ Message middleware is down all of a sudden , The message cannot be sent , Then the warehouse system can't receive the order message , And then we can't deliver !
In response to this question , The mainstream solution in the industry is to adopt Cluster deployment , One master and many slaves , In order to achieve high availability of services , Even if a machine suddenly goes down , And still keep the service available , During a server failure , By means of operation and maintenance , Restart the service , After that, the service can still run normally !
But there's another problem , If the warehouse system has received the order message , But business processing exception , Or the server is abnormal , As a result, there is no deduction for the current inventory of goods , There was no delivery !
How to deal with it at this time ?
Today we are going to introduce this kind of scene , If news consumption fails , What should we do with ?
Two 、 Solution
For the scenario of message consumption failure , We usually deal with it in the following way :
- When message consumption fails , The message will be re pushed
- If the number of retries exceeds the maximum , Exception messages are stored in the database , And then manual intervention to check the problem , Make manual retries
When a message fails to be consumed on the client , We will add the exception message to a message retrial object , At the same time, set the maximum number of retries , And push the message back to MQ In message middleware , When the number of retries exceeds the maximum , The exception message will be stored in MongoDB
In the database , It is convenient for subsequent query of abnormal information .
Based on the above system model , We can write a public retry component , Don't talk much , Direct drying !
3、 ... and 、 Code practice
This compensation service adopts rabbitmq Message middleware , Other message middleware processing ideas are similar !
3.1、 Create a message retrial entity class
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Original news body
*/
private String bodyMsg;
/**
* Source ID
*/
private String sourceId;
/**
* Source description
*/
private String sourceDesc;
/**
* exchanger
*/
private String exchangeName;
/**
* Routing key
*/
private String routingKey;
/**
* queue
*/
private String queueName;
/**
* state ,1: initialization ,2: success ,3: Failure
*/
private Integer status = 1;
/**
* max retries
*/
private Integer maxTryCount = 3;
/**
* Current retries
*/
private Integer currentRetryCount = 0;
/**
* Retry interval ( millisecond )
*/
private Long retryIntervalTime = 0L;
/**
* Mission failure information
*/
private String errorMsg;
/**
* Creation time
*/
private Date createTime;
@Override
public String toString() {
return "MessageRetryDTO{" +
"bodyMsg='" + bodyMsg + '\'' +
", sourceId='" + sourceId + '\'' +
", sourceDesc='" + sourceDesc + '\'' +
", exchangeName='" + exchangeName + '\'' +
", routingKey='" + routingKey + '\'' +
", queueName='" + queueName + '\'' +
", status=" + status +
", maxTryCount=" + maxTryCount +
", currentRetryCount=" + currentRetryCount +
", retryIntervalTime=" + retryIntervalTime +
", errorMsg='" + errorMsg + '\'' +
", createTime=" + createTime +
'}';
}
/**
* Check if the number of retries exceeds the maximum
*
* @return
*/
public boolean checkRetryCount() {
retryCountCalculate();
// Check if the number of retries exceeds the maximum
if (this.currentRetryCount < this.maxTryCount) {
return true;
}
return false;
}
/**
* Recalculate the number of retries
* /
private void retryCountCalculate() {
this.currentRetryCount = this.currentRetryCount + 1;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
3.2、 Write abstract classes for service retries
public abstract class CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MongoTemplate mongoTemplate;
/**
* Initialization message
*
* @param message
*/
public void initMessage(Message message) {
log.info("{} Received a message : {}, Business data :{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
try {
// Encapsulate message
MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
if (log.isInfoEnabled()) {
log.info(" Deserialize message :{}", messageRetryDto.toString());
}
prepareAction(messageRetryDto);
} catch (Exception e) {
log.warn(" Handle message exception , error message :", e);
}
}
/**
* Prepare to carry out
*
* @param retryDto
*/
protected void prepareAction(MessageRetryDTO retryDto) {
try {
execute(retryDto);
doSuccessCallBack(retryDto);
} catch (Exception e) {
log.error(" The current task execution is abnormal , Business data :" + retryDto.toString(), e);
// Execution failure , Calculate if you still need to try again
if (retryDto.checkRetryCount()) {
if (log.isInfoEnabled()) {
log.info(" Retry message :{}", retryDto.toString());
}
retrySend(retryDto);
} else {
if (log.isWarnEnabled()) {
log.warn(" The current task has reached the maximum number of retries , Business data :" + retryDto.toString(), e);
}
doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
}
}
}
/**
* The task was executed successfully , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
try {
successCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Successful execution, callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* Task execution failed , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doFailCallBack(MessageRetryDTO messageRetryDto) {
try {
saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
failCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Execution failure callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* Perform tasks
*
* @param messageRetryDto
*/
protected abstract void execute(MessageRetryDTO messageRetryDto);
/**
* Successful callback
*
* @param messageRetryDto
*/
protected abstract void successCallback(MessageRetryDTO messageRetryDto);
/**
* Failed callback
*
* @param messageRetryDto
*/
protected abstract void failCallback(MessageRetryDTO messageRetryDto);
/**
* Build a message compensation entity
* @param message
* @return
*/
private MessageRetryDTO buildMessageRetryInfo(Message message){
// If the header contains a compensation message entity , Go straight back to
Map < String, Object > messageHeaders = message.getMessageProperties().getHeaders();
if(messageHeaders.containsKey("message_retry_info")){
Object retryMsg = messageHeaders.get("message_retry_info");
if(Objects.nonNull(retryMsg)){
return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
}
}
// Automatically add business messages to the compensation entity
MessageRetryDTO messageRetryDto = new MessageRetryDTO();
messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
messageRetryDto.setCreateTime(new Date());
return messageRetryDto;
}
/**
* The exception message is put into storage again
* @param retryDto
*/
private void retrySend(MessageRetryDTO retryDto){
// Put the compensation message entity in the header , The original message content remains unchanged
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
}
/**
* Store exception messages in mongodb in
* @param retryDto
*/
private void saveMessageRetryInfo(MessageRetryDTO retryDto){
try {
mongoTemplate.save(retryDto, "message_retry_info");
} catch (Exception e){
log.error(" Store exception messages in mongodb Failure , The message data :" + retryDto.toString(), e);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
3.3、 Write listening service class
When it comes to consumer applications , It's very simple , for example , For inventory deduction operations , We can deal with it in the following ways !
@Component
public class OrderServiceListener extends CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
/**
* Monitor the order system order success message
* @param message
*/
@RabbitListener(queues = "mq.order.add")
public void consume(Message message) {
log.info(" Received the message of successful order : {}", message.toString());
super.initMessage(message);
}
@Override
protected void execute(MessageRetryDTO messageRetryDto) {
// Call the deduction inventory service , Throw business exceptions out
}
@Override
protected void successCallback(MessageRetryDTO messageRetryDto) {
// Successful business processing , Callback
}
@Override
protected void failCallback(MessageRetryDTO messageRetryDto) {
// Business processing failed , Callback
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
When message consumption fails , And exceed the maximum number of times , The message will be stored in mongodb in , And then, like normal database operations , Can pass web Interface query exception message , And try again for specific scenarios !
Four 、 Summary
Some students may ask , Why not store exception messages in the database ?
At first it was stored in MYSQL in , But with the rapid development of business , Order message data structure is more and more complex , The amount of data is also very large , Even as big as MYSQL Medium text Types cannot be stored , At the same time, this data structure is not suitable for MYSQL Storage in , So move it to mongodb!
This paper focuses on the scenario of message consumption failure , Explain the basic program and code practice , There may be some misunderstandings , Welcome the criticism that !
5、 ... and 、 Reference resources
1、 Notes on Shishan's structure - How to deal with the problem of message consumption failure
PS: Reply within company number 「java」 You can enter Java Novice learning communication group , Grow and progress together !
Old rules , Brothers, remember , If you think the content of the article is good , Remember to share the circle of friends and let more people know !
- 1.
- 2.
Please pay attention to
Recently, you should find that the information flow of WeChat official account has been revised , No longer in chronological order . It's a challenge to the original trumpet owners like a fan , It can be said that it was a great blow , Reading has plummeted , Positive feedback continues to weaken .
If you want to receive a fan's article at the first time , Not affected by the public number's information flow , Then you can give Java Geek technology is set as a
Finally, thank you for reading , Pretty good , It's hard to avoid mistakes , If you find something wrong , Leave a message to tell a fan , Ah fan dotes on you so much , It's bound to change ~
Thank you for your support ~
边栏推荐
- Deep profile data leakage prevention scheme
- Status of the thread
- It's healthy to drink medicinal wine like this. Are you drinking it right
- Pangu open source: multi support and promotion, the wave of chip industry
- Selection (023) - what are the three stages of event propagation?
- Since DMS is upgraded to a new version, my previous SQL is in the old version of DMS. In this case, how can I retrieve my previous SQL?
- How to buy financial products in 2022?
- [freertos] freertos Learning notes (7) - written freertos bidirectionnel Link LIST / source analysis
- Basic DOS commands
- 两年前美国芯片扭捏着不卖芯片,如今芯片堆积如山祈求中国帮忙
猜你喜欢
Bottom problem of figure
Zephyr 学习笔记2,Scheduling
Cell reports: Wei Fuwen group of the Institute of zoology, Chinese Academy of Sciences analyzes the function of seasonal changes in the intestinal flora of giant pandas
What is the use of cloud redis? How to use cloud redis?
the input device is not a TTY. If you are using mintty, try prefixing the command with ‘winpty‘
关于IDEA如何设置快捷键集
Summary of MySQL common judgment functions!! Have you used it
Splicing plain text into JSON strings - easy language method
Vulhub vulnerability recurrence 76_ XXL-JOB
Uniapp applet subcontracting
随机推荐
The most effective futures trend strategy: futures reverse merchandising
2022-021ARTS:下半年开始
【森城市】GIS数据漫谈(一)
移动适配:vw/vh
Basic DOS commands
How to share the source code anti disclosure scheme
[thread pool]
How to input single quotation marks and double quotation marks in latex?
com. alibaba. nacos. api. exception. NacosException
关于IDEA如何设置快捷键集
CMS source code of multi wechat management system developed based on thinkphp6, with one click curd and other functions
2022-021ARTS:下半年開始
Since DMS is upgraded to a new version, my previous SQL is in the old version of DMS. In this case, how can I retrieve my previous SQL?
what the fuck! If you can't grab it, write it yourself. Use code to realize a Bing Dwen Dwen. It's so beautiful ~!
Finishing (III) - Exercise 2
Selenium driver ie common problem solving message: currently focused window has been closed
Bottom problem of figure
Selection (021) - what is the output of the following code?
kubernetes集群之Label管理
Novel website program source code that can be automatically collected