当前位置:网站首页>Shh! Is this really good for asynchronous events?

Shh! Is this really good for asynchronous events?

2020-11-06 01:33:00 Yin Jihuan

The story background

At the beginning of this year, I wrote an article 《 gather and watch : The process of internal decoupling based on event mechanism 》. This article is mainly about using ES Heterogeneous data scenarios . Program subscription Mysql Binlog Changes , Then the program uses Spring Event To distribute specific events , Because the data change of a table may need to update more than one ES Indexes .

In order to make it easier for you to understand, I copied the pictures of the previous scheme , as follows :

There is a problem with the scheme in the figure above , That's what we're going to talk about today .

The problem is when MQ Consumer After receiving the message , Just post it directly Event 了 , If it's synchronous , No problem . If a EventListener Failed to process in , So this message will not ACK.

If it's an asynchronous release Event Scene , As soon as the news is released ACK 了 . Even if someone EventListener Failed to process in ,MQ I can't feel it , There will be no redelivery of messages , That's the problem .

Solution

Scheme 1

Now that the news has ACK 了 , Then don't use MQ The retrying function of , Can users try again by themselves ?

It's certainly possible , Whether the internal processing is successful or not is certain to be known , If the processing fails, you can try again by default , Or try again with a certain strategy . I can't, but I can drop it in the library , Keep records .

The problem is that it's too boring , Every place you use it has to do it , And for the program brother who will take over your code in the future , This is likely to make the little brother's hair fall off slowly ....

It doesn't matter if you fall off , The point is that he doesn't know what to do with it , Maybe one day I'll be carrying the pot , Pathetic ....

Option two

To ensure the consistency of messages and business processing , You can't do it right away ACK operation . It's about waiting for the business to complete before deciding whether to ACK.

If there is a failure to handle, it should not ACK, So you can reuse MQ The retrial mechanism of .

Analyze it , This is a typical asynchronous to synchronous scenario . image Dubbo There's also this scene in , So we can learn from Dubbo In the realization of ideas .

Create a DefaultFuture It is used to synchronously wait to get the result of task execution . And then in MQ Where to consume DefaultFuture.

  
  1. @Service
  2. @RocketMQMessageListener(topic = "${rocketmq.topic.data_change}", consumerGroup = "${rocketmq.group.data_change_consumer}")
  3. public class DataChangeConsume implements RocketMQListener<DataChangeRequest> {
  4. @Autowired
  5. private ApplicationContext applicationContext;
  6. @Autowired
  7. private CustomApplicationContextAware customApplicationContextAware;
  8. @Override
  9. public void onMessage(DataChangeRequest dataChangeRequest) {
  10. log.info("received message {} , Thread {}", dataChangeRequest, Thread.currentThread().getName());
  11. DataChangeEvent event = new DataChangeEvent(this);
  12. event.setChangeType(dataChangeRequest.getChangeType());
  13. event.setTable(dataChangeRequest.getTable());
  14. event.setMessageId(dataChangeRequest.getMessageId());
  15. DefaultFuture defaultFuture = DefaultFuture.newFuture(dataChangeRequest, customApplicationContextAware.getTaskCount(), 6000 * 10);
  16. applicationContext.publishEvent(event);
  17. Boolean result = defaultFuture.get();
  18. log.info("MessageId {} Processing results {}", dataChangeRequest.getMessageId(), result);
  19. if (!result) {
  20. throw new RuntimeException(" Processing failed , No news ACK, Wait for the next time to try again ");
  21. }
  22. }
  23. }

newFuture() Event parameters are passed in , Timeout time , Number of tasks several parameters . The number of tasks is used to judge all EventListener Is it all done .

defaultFuture.get(); It doesn't block , Wait for all tasks to complete before returning the result , If all the business is successful , So it will return true, End of the process , Message automatically ACK.

If you return false Prove that there are processing failures or timeouts , There is no need to ACK 了 , Throw an exception and wait to try again .

  
  1. public Boolean get() {
  2. if (isDone()) {
  3. return true;
  4. }
  5. long start = System.currentTimeMillis();
  6. lock.lock();
  7. try {
  8. while (!isDone()) {
  9. done.await(timeout, TimeUnit.MILLISECONDS);
  10. // There is feedback from failed tasks
  11. if (!isSuccessDone()) {
  12. return false;
  13. }
  14. // All executed successfully
  15. if (isDone()) {
  16. return true;
  17. }
  18. // Overtime
  19. if (System.currentTimeMillis() - start > timeout) {
  20. return false;
  21. }
  22. }
  23. } catch (InterruptedException e) {
  24. throw new RuntimeException(e);
  25. } finally {
  26. lock.unlock();
  27. }
  28. return true;
  29. }

isDone() Can judge whether the number of tasks with feedback results is consistent with the total number , If it is consistent, it means that the whole execution is completed .

  
  1. public boolean isDone() {
  2. return feedbackResultCount.get() == taskCount;
  3. }

How to give feedback after the task is completed ? It's impossible for every method used to care about , So we define a facet to do this .

  
  1. @Aspect
  2. @Component
  3. public class EventListenerAspect {
  4. @Around(value = "@annotation(eventListener)")
  5. public Object aroundAdvice(ProceedingJoinPoint joinpoint, EventListener eventListener) throws Throwable {
  6. DataChangeEvent event = null;
  7. boolean executeResult = true;
  8. try {
  9. event = (DataChangeEvent)joinpoint.getArgs()[0];
  10. Object result = joinpoint.proceed();
  11. return result;
  12. } catch (Exception e) {
  13. executeResult = false;
  14. throw e;
  15. } finally {
  16. DefaultFuture.received(event.getMessageId(), executeResult);
  17. }
  18. }
  19. }

adopt DefaultFuture.received() Feedback on execution results .

  
  1. public static void received(String id, boolean result) {
  2. DefaultFuture future = FUTURES.get(id);
  3. if (future != null) {
  4. // Add up the number of failed tasks
  5. if (!result) {
  6. future.feedbackFailResultCount.incrementAndGet();
  7. }
  8. // Add up the number of completed tasks
  9. future.feedbackResultCount.incrementAndGet();
  10. if (future.isDone()) {
  11. FUTURES.remove(id);
  12. future.doReceived();
  13. }
  14. }
  15. }
  16. private void doReceived() {
  17. lock.lock();
  18. try {
  19. if (done != null) {
  20. // Wake up blocked threads
  21. done.signal();
  22. }
  23. } finally {
  24. lock.unlock();
  25. }
  26. }

Let's summarize the whole process :

  • received MQ news , Assemble into DefaultFuture, adopt get Method to get the execution result , This method blocks when the execution is not finished .
  • Cut through the section and add EventListener Methods , Judge whether there is an exception to judge the execution result of the task .
  • adopt DefaultFuture.received() Feedback results .
  • Whether the calculation is complete in feedback , When all is done, the blocked thread is awakened .DefaultFuture.get() You can get the results .
  • Is it necessary to ACK operation .

It's important to note that each EventListener The logic of internal consumption should be idempotent control .

Source code address :https://github.com/yinjihuan/kitty-cloud/tree/master/kitty-cloud-mqconsume

About author : Yin Jihuan , Simple technology enthusiasts ,《Spring Cloud Microservices - Full stack technology and case analysis 》, 《Spring Cloud Microservices introduction Actual combat and advanced 》 author , official account Ape world Originator . Personal wechat jihuan900, Welcome to hook up with .

I have compiled a complete set of learning materials , Those who are interested can search through wechat 「 Ape world 」, Reply key 「 Learning materials 」 Get what I've sorted out Spring Cloud,Spring Cloud Alibaba,Sharding-JDBC Sub database and sub table , Task scheduling framework XXL-JOB,MongoDB, Reptiles and other related information .

版权声明
本文为[Yin Jihuan]所创,转载请带上原文链接,感谢