当前位置:网站首页>Shh! Is this really good for asynchronous events?
Shh! Is this really good for asynchronous events?
2020-11-06 01:33:00 【Yin Jihuan】
The story background
At the beginning of this year, I wrote an article 《 gather and watch : The process of internal decoupling based on event mechanism 》. This article is mainly about using ES Heterogeneous data scenarios . Program subscription Mysql Binlog Changes , Then the program uses Spring Event To distribute specific events , Because the data change of a table may need to update more than one ES Indexes .
In order to make it easier for you to understand, I copied the pictures of the previous scheme , as follows :
There is a problem with the scheme in the figure above , That's what we're going to talk about today .
The problem is when MQ Consumer After receiving the message , Just post it directly Event 了 , If it's synchronous , No problem . If a EventListener Failed to process in , So this message will not ACK.
If it's an asynchronous release Event Scene , As soon as the news is released ACK 了 . Even if someone EventListener Failed to process in ,MQ I can't feel it , There will be no redelivery of messages , That's the problem .
Solution
Scheme 1
Now that the news has ACK 了 , Then don't use MQ The retrying function of , Can users try again by themselves ?
It's certainly possible , Whether the internal processing is successful or not is certain to be known , If the processing fails, you can try again by default , Or try again with a certain strategy . I can't, but I can drop it in the library , Keep records .
The problem is that it's too boring , Every place you use it has to do it , And for the program brother who will take over your code in the future , This is likely to make the little brother's hair fall off slowly ....
It doesn't matter if you fall off , The point is that he doesn't know what to do with it , Maybe one day I'll be carrying the pot , Pathetic ....
Option two
To ensure the consistency of messages and business processing , You can't do it right away ACK operation . It's about waiting for the business to complete before deciding whether to ACK.
If there is a failure to handle, it should not ACK, So you can reuse MQ The retrial mechanism of .
Analyze it , This is a typical asynchronous to synchronous scenario . image Dubbo There's also this scene in , So we can learn from Dubbo In the realization of ideas .
Create a DefaultFuture It is used to synchronously wait to get the result of task execution . And then in MQ Where to consume DefaultFuture.
@Service
@RocketMQMessageListener(topic = "${rocketmq.topic.data_change}", consumerGroup = "${rocketmq.group.data_change_consumer}")
public class DataChangeConsume implements RocketMQListener<DataChangeRequest> {
@Autowired
private ApplicationContext applicationContext;
@Autowired
private CustomApplicationContextAware customApplicationContextAware;
@Override
public void onMessage(DataChangeRequest dataChangeRequest) {
log.info("received message {} , Thread {}", dataChangeRequest, Thread.currentThread().getName());
DataChangeEvent event = new DataChangeEvent(this);
event.setChangeType(dataChangeRequest.getChangeType());
event.setTable(dataChangeRequest.getTable());
event.setMessageId(dataChangeRequest.getMessageId());
DefaultFuture defaultFuture = DefaultFuture.newFuture(dataChangeRequest, customApplicationContextAware.getTaskCount(), 6000 * 10);
applicationContext.publishEvent(event);
Boolean result = defaultFuture.get();
log.info("MessageId {} Processing results {}", dataChangeRequest.getMessageId(), result);
if (!result) {
throw new RuntimeException(" Processing failed , No news ACK, Wait for the next time to try again ");
}
}
}
newFuture() Event parameters are passed in , Timeout time , Number of tasks several parameters . The number of tasks is used to judge all EventListener Is it all done .
defaultFuture.get(); It doesn't block , Wait for all tasks to complete before returning the result , If all the business is successful , So it will return true, End of the process , Message automatically ACK.
If you return false Prove that there are processing failures or timeouts , There is no need to ACK 了 , Throw an exception and wait to try again .
public Boolean get() {
if (isDone()) {
return true;
}
long start = System.currentTimeMillis();
lock.lock();
try {
while (!isDone()) {
done.await(timeout, TimeUnit.MILLISECONDS);
// There is feedback from failed tasks
if (!isSuccessDone()) {
return false;
}
// All executed successfully
if (isDone()) {
return true;
}
// Overtime
if (System.currentTimeMillis() - start > timeout) {
return false;
}
}
} catch (InterruptedException e) {
throw new RuntimeException(e);
} finally {
lock.unlock();
}
return true;
}
isDone() Can judge whether the number of tasks with feedback results is consistent with the total number , If it is consistent, it means that the whole execution is completed .
public boolean isDone() {
return feedbackResultCount.get() == taskCount;
}
How to give feedback after the task is completed ? It's impossible for every method used to care about , So we define a facet to do this .
@Aspect
@Component
public class EventListenerAspect {
@Around(value = "@annotation(eventListener)")
public Object aroundAdvice(ProceedingJoinPoint joinpoint, EventListener eventListener) throws Throwable {
DataChangeEvent event = null;
boolean executeResult = true;
try {
event = (DataChangeEvent)joinpoint.getArgs()[0];
Object result = joinpoint.proceed();
return result;
} catch (Exception e) {
executeResult = false;
throw e;
} finally {
DefaultFuture.received(event.getMessageId(), executeResult);
}
}
}
adopt DefaultFuture.received() Feedback on execution results .
public static void received(String id, boolean result) {
DefaultFuture future = FUTURES.get(id);
if (future != null) {
// Add up the number of failed tasks
if (!result) {
future.feedbackFailResultCount.incrementAndGet();
}
// Add up the number of completed tasks
future.feedbackResultCount.incrementAndGet();
if (future.isDone()) {
FUTURES.remove(id);
future.doReceived();
}
}
}
private void doReceived() {
lock.lock();
try {
if (done != null) {
// Wake up blocked threads
done.signal();
}
} finally {
lock.unlock();
}
}
Let's summarize the whole process :
- received MQ news , Assemble into DefaultFuture, adopt get Method to get the execution result , This method blocks when the execution is not finished .
- Cut through the section and add EventListener Methods , Judge whether there is an exception to judge the execution result of the task .
- adopt DefaultFuture.received() Feedback results .
- Whether the calculation is complete in feedback , When all is done, the blocked thread is awakened .DefaultFuture.get() You can get the results .
- Is it necessary to ACK operation .
It's important to note that each EventListener The logic of internal consumption should be idempotent control .
Source code address :https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume
About author : Yin Jihuan , Simple technology enthusiasts ,《Spring Cloud Microservices - Full stack technology and case analysis 》, 《Spring Cloud Microservices introduction Actual combat and advanced 》 author , official account Ape world Originator . Personal wechat jihuan900, Welcome to hook up with .
I have compiled a complete set of learning materials , Those who are interested can search through wechat 「 Ape world 」, Reply key 「 Learning materials 」 Get what I've sorted out Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC Sub database and sub table , Task scheduling framework XXL-JOB,MongoDB, Reptiles and other related information .
版权声明
本文为[Yin Jihuan]所创,转载请带上原文链接,感谢
边栏推荐
- Discussion on the development practice of aspnetcore, a cross platform framework
- Windows 10 tensorflow (2) regression analysis of principles, deep learning framework (gradient descent method to solve regression parameters)
- 6.2 handleradapter adapter processor (in-depth analysis of SSM and project practice)
- 中小微企业选择共享办公室怎么样?
- Analysis of ThreadLocal principle
- I've been rejected by the product manager. Why don't you know
- Network security engineer Demo: the original * * is to get your computer administrator rights! 【***】
- Synchronous configuration from git to consult with git 2consul
- 前端都应懂的入门基础-github基础
- 一篇文章带你了解CSS3 背景知识
猜你喜欢
Windows 10 tensorflow (2) regression analysis of principles, deep learning framework (gradient descent method to solve regression parameters)
With the advent of tensorflow 2.0, can pytoch still shake the status of big brother?
Character string and memory operation function in C language
零基础打造一款属于自己的网页搜索引擎
NLP model Bert: from introduction to mastery (1)
百万年薪,国内工作6年的前辈想和你分享这四点
Network security engineer Demo: the original * * is to get your computer administrator rights! 【***】
ES6学习笔记(二):教你玩转类的继承和类的对象
Summary of common algorithms of binary tree
一篇文章带你了解CSS3 背景知识
随机推荐
至联云解析:IPFS/Filecoin挖矿为什么这么难?
Electron application uses electronic builder and electronic updater to realize automatic update
axios学习笔记(二):轻松弄懂XHR的使用及如何封装简易axios
6.1.2 handlermapping mapping processor (2) (in-depth analysis of SSM and project practice)
git rebase的時候捅婁子了,怎麼辦?線上等……
I think it is necessary to write a general idempotent component
Five vuex plug-ins for your next vuejs project
一篇文章带你了解HTML表格及其主要属性介绍
In order to save money, I learned PHP in one day!
DRF JWT authentication module and self customization
Analysis of query intention recognition
MeterSphere开发者手册
百万年薪,国内工作6年的前辈想和你分享这四点
Analysis of ThreadLocal principle
I've been rejected by the product manager. Why don't you know
It is really necessary to build a distributed ID generation service
Installing ns-3 on ubuntu18.04
Save the file directly to Google drive and download it back ten times faster
vue-codemirror基本用法:实现搜索功能、代码折叠功能、获取编辑器值及时验证
Relationship between business policies, business rules, business processes and business master data - modern analysis