当前位置:网站首页>Observer mode (listener mode) + thread pool to realize asynchronous message sending

Observer mode (listener mode) + thread pool to realize asynchronous message sending

2022-06-11 05:59:00 Mrcao Jerrold

1. Use scenarios

Everybody knows MQ There are three usage scenarios :1. decoupling ,2. Synchronous mutation step ,3. Peak elimination . In many simple scenarios , We don't need to use MQ To achieve asynchronous , We can go through spring Annotations @Async Or through multithreading . I encountered such a usage scenario in development : We need to meet the conditions , Push messages to many user applets and official account at the same time , Due to the processing of push messages , It's very time consuming , It is obviously unreasonable to synchronize . We need to process sending messages asynchronously . Of course , We can use MQ To achieve this scenario . But I found the scene in my project , Using the observer pattern is also possible . We know , There are also so-called publishers and subscribers in the observer mode , But our traditional observer model is actually synchronous , It's still blocking , As long as we change it a little , Asynchronous observation can be realized .

2. Define the observed

There are two observers here : One is used to send messages to the official account , The other is used to send messages to applets . Because in java.util The following provides interfaces and methods to implement the observer pattern , We can use it directly .

/**
 *  Send a message to the applet topic 
 *
 * @author caoyong
 * @version 1.0.0
 * @since 2019-01-25 10:41
 **/
@Component
@Getter
@Setter
public class SendMiniProgramSubject extends Observable {
    private String userId;

    /**
     *  Notice sent 
     */
    public void send() {
        this.setChanged();
        this.notifyObservers();
    }
}

/**
 *  Send message to official account subject 
 *
 * @author caoyong
 * @version 1.0.0
 * @since 2019-01-25 10:40
 **/
@Getter
@Setter
@Component
public class SendPublicAccountSubject extends Observable {
    private String userId;
    /**
     *  Notice sent 
     */
    public void send() {
        this.setChanged();
        this.notifyObservers();
    }
}

3. Define the observer

When the observer is dealing with more tasks , In order to improve processing efficiency , We can use thread pools .

/**
 *  Subscription observer 
 *
 * @author caoyong
 * @version 1.0.0
 * @since 2019-01-25 10:34
 **/
@Slf4j
@Component
// Flag is multiple 
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class SubscriberObserver implements Observer {
    private ExecutorService executorService = Executors.newFixedThreadPool(5);
    @Autowired
    private WechatMessageService wechatMessageService;

    @Override
    public void update(Observable o, Object arg) {
        // Use multithreading to send messages 
        executorService.execute(() -> {
            // The processing applet sends messages 
            if (o instanceof SendMiniProgramSubject) {
                SendMiniProgramSubject miniProgramSubject = (SendMiniProgramSubject) o;
                log.info("send miniProgram message end.");
            }
            // Handle messages sent by official account 
            if (o instanceof SendPublicAccountSubject) {
                SendPublicAccountSubject publicAccountSubject = (SendPublicAccountSubject) o;
                wechatMessageService.sendCustomerdPublicAccount(publicAccountSubject.getUserId());
                log.info("send publicAccount message end.");
            }
        });
    }
}

4. Define the message sender

When sending messages here , We can use Spring Notes provided @Async To easily implement asynchrony .

/**
 *  Message sender 
 *
 * @author caoyong
 * @version 1.0.0
 * @since 2019-01-25 15:08
 **/
@Component
public class MessageSender {
    @Autowired
    private SendMiniProgramSubject sendMiniProgramSubject;
    @Autowired
    private SendPublicAccountSubject sendPublicAccountSubject;
    @Autowired
    private SubscriberObserver subscriberObserver;

    /**
     *  Send to official account 
     *
     * @param userId  user id
     */
    @Async
    public void sendPublicAccount(String userId) {
        sendPublicAccountSubject.setUserId(userId);
        sendPublicAccountSubject.addObserver(subscriberObserver);
        sendPublicAccountSubject.send();
    }

    /**
     *  Send it to the applet 
     *
     * @param userId  user id
     */
    @Async
    public void sendMiniProgram(String userId) {
        sendMiniProgramSubject.setUserId(userId);
        sendMiniProgramSubject.addObserver(subscriberObserver);
        sendMiniProgramSubject.send();
    }
}

To achieve asynchrony , If you're using Spring Boot, It needs to be added in the startup class :@EnableAsync

@SpringBootApplication
@EnableCaching
@EnableAsync
public class Application {

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

5. Test sending messages

adopt SpringJUnit Tools to test whether our message sending is successful , View log output location , You can see whether asynchrony is implemented .

/**
 * @author caoyong
 * @version 1.0.0
 * @since 2019-01-25 16:21
 **/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class TestObserver {
    @Autowired
    private MessageSender messageSender;

    @Test
    public void testSendMessage() {
        String userId = "testUserId";
        messageSender.sendPublicAccount(userId );
    }
}
原网站

版权声明
本文为[Mrcao Jerrold]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/03/202203020531258865.html