当前位置:网站首页>[real case] how to deal with the failure of message consumption?
[real case] how to deal with the failure of message consumption?
2022-07-04 07:19:00 【Java geek Technology】
One 、 Introduce
In the introduction of message middleware MQ Before , Let's have a brief understanding of , Why reference message middleware .
for example , In the e-commerce platform , Common user orders , It will go through the following processes .
When the user places an order , After creating the order , Will call the third party payment platform , Deduct the amount of the user's account , If the platform payment deduction is successful , The result will be notified to the corresponding business system , Then the business system will update the order status , Call the warehouse interface at the same time , Reduce inventory , Inform logistics to deliver goods !
Just imagine , Update from order status 、 To reduce inventory 、 Inform logistics to deliver goods in one method Synchronization complete , If the user pays successfully 、 Order status update is also successful , But the steps of deducting inventory or notifying logistics to deliver goods failed , Then there's a problem , The user has paid successfully , It's just a failure to deduct inventory from the warehouse , Which leads to the failure of the whole deal !
One order failed , The boss can pretend to be invisible , But if thousands of lists fail , So the business loss caused by the system , It will be huge , The boss may not be able to sit down !
therefore , For this business scenario , Architects introduce asynchronous communication Technical solution , In order to ensure the high availability of services , The general process is as follows :
When the order system receives the deduction result from the payment platform , An order message will be sent to MQ Message middleware , It also updates the order status .
On the other end , The warehouse system monitors the messages sent by the order system asynchronously , After receiving the order message , And then operate to deduct inventory 、 Inform the logistics company to deliver goods and other services !
Under the optimized process , Even if the inventory deduction service fails , And it won't affect user transactions .
just as 《 One month myth 》 What we say , Software Engineering , There is no silver bullet !
When introduced MQ After message middleware , There's another problem as well , If MQ Message middleware is down all of a sudden , The message cannot be sent , Then the warehouse system can't receive the order message , And then we can't deliver !
In response to this question , The mainstream solution in the industry is to adopt Cluster deployment , One master and many slaves , In order to achieve high availability of services , Even if a machine suddenly goes down , And still keep the service available , During a server failure , By means of operation and maintenance , Restart the service , After that, the service can still run normally !
But there's another problem , If the warehouse system has received the order message , But business processing exception , Or the server is abnormal , As a result, there is no deduction for the current inventory of goods , There was no delivery !
How to deal with it at this time ?
Today we are going to introduce this kind of scene , If news consumption fails , What should we do with ?
Two 、 Solution
For the scenario of message consumption failure , We usually deal with it in the following way :
- When message consumption fails , The message will be re pushed
- If the number of retries exceeds the maximum , Exception messages are stored in the database , And then manual intervention to check the problem , Make manual retries
When a message fails to be consumed on the client , We will add the exception message to a message retrial object , At the same time, set the maximum number of retries , And push the message back to MQ In message middleware , When the number of retries exceeds the maximum , The exception message will be stored in MongoDB
In the database , It is convenient for subsequent query of abnormal information .
Based on the above system model , We can write a public retry component , Don't talk much , Direct drying !
3、 ... and 、 Code practice
This compensation service adopts rabbitmq Message middleware , Other message middleware processing ideas are similar !
3.1、 Create a message retrial entity class
@Data
@EqualsAndHashCode(callSuper = false)
@Accessors(chain = true)
public class MessageRetryDTO implements Serializable {
private static final long serialVersionUID = 1L;
/**
* Original news body
*/
private String bodyMsg;
/**
* Source ID
*/
private String sourceId;
/**
* Source description
*/
private String sourceDesc;
/**
* exchanger
*/
private String exchangeName;
/**
* Routing key
*/
private String routingKey;
/**
* queue
*/
private String queueName;
/**
* state ,1: initialization ,2: success ,3: Failure
*/
private Integer status = 1;
/**
* max retries
*/
private Integer maxTryCount = 3;
/**
* Current retries
*/
private Integer currentRetryCount = 0;
/**
* Retry interval ( millisecond )
*/
private Long retryIntervalTime = 0L;
/**
* Mission failure information
*/
private String errorMsg;
/**
* Creation time
*/
private Date createTime;
@Override
public String toString() {
return "MessageRetryDTO{" +
"bodyMsg='" + bodyMsg + '\'' +
", sourceId='" + sourceId + '\'' +
", sourceDesc='" + sourceDesc + '\'' +
", exchangeName='" + exchangeName + '\'' +
", routingKey='" + routingKey + '\'' +
", queueName='" + queueName + '\'' +
", status=" + status +
", maxTryCount=" + maxTryCount +
", currentRetryCount=" + currentRetryCount +
", retryIntervalTime=" + retryIntervalTime +
", errorMsg='" + errorMsg + '\'' +
", createTime=" + createTime +
'}';
}
/**
* Check if the number of retries exceeds the maximum
*
* @return
*/
public boolean checkRetryCount() {
retryCountCalculate();
// Check if the number of retries exceeds the maximum
if (this.currentRetryCount < this.maxTryCount) {
return true;
}
return false;
}
/**
* Recalculate the number of retries
* /
private void retryCountCalculate() {
this.currentRetryCount = this.currentRetryCount + 1;
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
3.2、 Write abstract classes for service retries
public abstract class CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(CommonMessageRetryService.class);
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MongoTemplate mongoTemplate;
/**
* Initialization message
*
* @param message
*/
public void initMessage(Message message) {
log.info("{} Received a message : {}, Business data :{}", this.getClass().getName(), message.toString(), new String(message.getBody()));
try {
// Encapsulate message
MessageRetryDTO messageRetryDto = buildMessageRetryInfo(message);
if (log.isInfoEnabled()) {
log.info(" Deserialize message :{}", messageRetryDto.toString());
}
prepareAction(messageRetryDto);
} catch (Exception e) {
log.warn(" Handle message exception , error message :", e);
}
}
/**
* Prepare to carry out
*
* @param retryDto
*/
protected void prepareAction(MessageRetryDTO retryDto) {
try {
execute(retryDto);
doSuccessCallBack(retryDto);
} catch (Exception e) {
log.error(" The current task execution is abnormal , Business data :" + retryDto.toString(), e);
// Execution failure , Calculate if you still need to try again
if (retryDto.checkRetryCount()) {
if (log.isInfoEnabled()) {
log.info(" Retry message :{}", retryDto.toString());
}
retrySend(retryDto);
} else {
if (log.isWarnEnabled()) {
log.warn(" The current task has reached the maximum number of retries , Business data :" + retryDto.toString(), e);
}
doFailCallBack(retryDto.setErrorMsg(e.getMessage()));
}
}
}
/**
* The task was executed successfully , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doSuccessCallBack(MessageRetryDTO messageRetryDto) {
try {
successCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Successful execution, callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* Task execution failed , Callback service ( Rewrite as needed )
*
* @param messageRetryDto
*/
private void doFailCallBack(MessageRetryDTO messageRetryDto) {
try {
saveMessageRetryInfo(messageRetryDto.setErrorMsg(messageRetryDto.getErrorMsg()));
failCallback(messageRetryDto);
} catch (Exception e) {
log.warn(" Execution failure callback exception , Queue description :{}, The reason for the error :{}", messageRetryDto.getSourceDesc(), e.getMessage());
}
}
/**
* Perform tasks
*
* @param messageRetryDto
*/
protected abstract void execute(MessageRetryDTO messageRetryDto);
/**
* Successful callback
*
* @param messageRetryDto
*/
protected abstract void successCallback(MessageRetryDTO messageRetryDto);
/**
* Failed callback
*
* @param messageRetryDto
*/
protected abstract void failCallback(MessageRetryDTO messageRetryDto);
/**
* Build a message compensation entity
* @param message
* @return
*/
private MessageRetryDTO buildMessageRetryInfo(Message message){
// If the header contains a compensation message entity , Go straight back to
Map < String, Object > messageHeaders = message.getMessageProperties().getHeaders();
if(messageHeaders.containsKey("message_retry_info")){
Object retryMsg = messageHeaders.get("message_retry_info");
if(Objects.nonNull(retryMsg)){
return JSONObject.parseObject(String.valueOf(retryMsg), MessageRetryDTO.class);
}
}
// Automatically add business messages to the compensation entity
MessageRetryDTO messageRetryDto = new MessageRetryDTO();
messageRetryDto.setBodyMsg(new String(message.getBody(), StandardCharsets.UTF_8));
messageRetryDto.setExchangeName(message.getMessageProperties().getReceivedExchange());
messageRetryDto.setRoutingKey(message.getMessageProperties().getReceivedRoutingKey());
messageRetryDto.setQueueName(message.getMessageProperties().getConsumerQueue());
messageRetryDto.setCreateTime(new Date());
return messageRetryDto;
}
/**
* The exception message is put into storage again
* @param retryDto
*/
private void retrySend(MessageRetryDTO retryDto){
// Put the compensation message entity in the header , The original message content remains unchanged
MessageProperties messageProperties = new MessageProperties();
messageProperties.setContentType(MessageProperties.CONTENT_TYPE_JSON);
messageProperties.setHeader("message_retry_info", JSONObject.toJSON(retryDto));
Message message = new Message(retryDto.getBodyMsg().getBytes(), messageProperties);
rabbitTemplate.convertAndSend(retryDto.getExchangeName(), retryDto.getRoutingKey(), message);
}
/**
* Store exception messages in mongodb in
* @param retryDto
*/
private void saveMessageRetryInfo(MessageRetryDTO retryDto){
try {
mongoTemplate.save(retryDto, "message_retry_info");
} catch (Exception e){
log.error(" Store exception messages in mongodb Failure , The message data :" + retryDto.toString(), e);
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
- 106.
- 107.
- 108.
- 109.
- 110.
- 111.
- 112.
- 113.
- 114.
- 115.
- 116.
- 117.
- 118.
- 119.
- 120.
- 121.
- 122.
- 123.
- 124.
- 125.
- 126.
- 127.
- 128.
- 129.
- 130.
- 131.
- 132.
- 133.
- 134.
- 135.
- 136.
- 137.
- 138.
- 139.
- 140.
- 141.
- 142.
- 143.
- 144.
- 145.
- 146.
- 147.
- 148.
- 149.
- 150.
- 151.
- 152.
- 153.
- 154.
- 155.
3.3、 Write listening service class
When it comes to consumer applications , It's very simple , for example , For inventory deduction operations , We can deal with it in the following ways !
@Component
public class OrderServiceListener extends CommonMessageRetryService {
private static final Logger log = LoggerFactory.getLogger(OrderServiceListener.class);
/**
* Monitor the order system order success message
* @param message
*/
@RabbitListener(queues = "mq.order.add")
public void consume(Message message) {
log.info(" Received the message of successful order : {}", message.toString());
super.initMessage(message);
}
@Override
protected void execute(MessageRetryDTO messageRetryDto) {
// Call the deduction inventory service , Throw business exceptions out
}
@Override
protected void successCallback(MessageRetryDTO messageRetryDto) {
// Successful business processing , Callback
}
@Override
protected void failCallback(MessageRetryDTO messageRetryDto) {
// Business processing failed , Callback
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
When message consumption fails , And exceed the maximum number of times , The message will be stored in mongodb in , And then, like normal database operations , Can pass web Interface query exception message , And try again for specific scenarios !
Four 、 Summary
Some students may ask , Why not store exception messages in the database ?
At first it was stored in MYSQL in , But with the rapid development of business , Order message data structure is more and more complex , The amount of data is also very large , Even as big as MYSQL Medium text Types cannot be stored , At the same time, this data structure is not suitable for MYSQL Storage in , So move it to mongodb!
This paper focuses on the scenario of message consumption failure , Explain the basic program and code practice , There may be some misunderstandings , Welcome the criticism that !
5、 ... and 、 Reference resources
1、 Notes on Shishan's structure - How to deal with the problem of message consumption failure
PS: Reply within company number 「java」 You can enter Java Novice learning communication group , Grow and progress together !
Old rules , Brothers, remember , If you think the content of the article is good , Remember to share the circle of friends and let more people know !
- 1.
- 2.
Please pay attention to
Recently, you should find that the information flow of WeChat official account has been revised , No longer in chronological order . It's a challenge to the original trumpet owners like a fan , It can be said that it was a great blow , Reading has plummeted , Positive feedback continues to weaken .
If you want to receive a fan's article at the first time , Not affected by the public number's information flow , Then you can give Java Geek technology is set as a
Finally, thank you for reading , Pretty good , It's hard to avoid mistakes , If you find something wrong , Leave a message to tell a fan , Ah fan dotes on you so much , It's bound to change ~
Thank you for your support ~
边栏推荐
- Zephyr 学习笔记1,threads
- Computer connects raspberry pie remotely through putty
- About how idea sets up shortcut key sets
- 在已經知道錶格列勾選一個顯示一列
- [thread pool]
- Boosting the Performance of Video Compression Artifact Reduction with Reference Frame Proposals and
- Highly paid programmers & interview questions: how does redis of series 119 realize distributed locks?
- Zabbix agent主动模式的实现
- NLP-文献阅读总结
- MySQL storage engine
猜你喜欢
Zhanrui tankbang | jointly build, cooperate and win-win zhanrui core ecology
Chain ide -- the infrastructure of the metauniverse
Campus network problems
Redis - detailed explanation of cache avalanche, cache penetration and cache breakdown
The cloud native programming challenge ended, and Alibaba cloud launched the first white paper on application liveliness technology in the field of cloud native
Data double write consistency between redis and MySQL
[Flink] temporal semantics and watermark
Vulhub vulnerability recurrence 76_ XXL-JOB
"Sword finger offer" 2nd Edition - force button brush question
Experience installing VMware esxi 6.7 under VMware Workstation 16
随机推荐
Su Weijie, a member of Qingyuan Association and an assistant professor at the University of Pennsylvania, won the first Siam Youth Award for data science, focusing on privacy data protection, etc
The difference between synchronized and lock
window上用.bat文件启动项目
Guoguo took you to write a linked list, and the primary school students said it was good after reading it
Status of the thread
what the fuck! If you can't grab it, write it yourself. Use code to realize a Bing Dwen Dwen. It's so beautiful ~!
[thread pool]
Splicing plain text into JSON strings - easy language method
Campus network problems
jdbc连接es查询的时候,有遇到下面这种情况的大神嘛?
MySQL storage engine
Cochez une colonne d'affichage dans une colonne de tableau connue
[network data transmission] FPGA based development of 100M / Gigabit UDP packet sending and receiving system, PC to FPGA
Experience installing VMware esxi 6.7 under VMware Workstation 16
【网络数据传输】基于FPGA的百兆网/兆网千UDP数据包收发系统开发,PC到FPGA
Zephyr study notes 2, scheduling
socket inet_ pton() inet_ Ntop() function (a new network address translation function, which converts the expression format and numerical format to each other. The old ones are inet_aton(), INET_ ntoa
Redis - detailed explanation of cache avalanche, cache penetration and cache breakdown
Uniapp applet subcontracting
Industrial computer anti-virus