当前位置:网站首页>Observer mode (listener mode) + thread pool to realize asynchronous message sending
Observer mode (listener mode) + thread pool to realize asynchronous message sending
2022-06-11 05:59:00 【Mrcao Jerrold】
1. Use scenarios
Everybody knows MQ There are three usage scenarios :1. decoupling ,2. Synchronous mutation step ,3. Peak elimination . In many simple scenarios , We don't need to use MQ To achieve asynchronous , We can go through spring Annotations @Async Or through multithreading . I encountered such a usage scenario in development : We need to meet the conditions , Push messages to many user applets and official account at the same time , Due to the processing of push messages , It's very time consuming , It is obviously unreasonable to synchronize . We need to process sending messages asynchronously . Of course , We can use MQ To achieve this scenario . But I found the scene in my project , Using the observer pattern is also possible . We know , There are also so-called publishers and subscribers in the observer mode , But our traditional observer model is actually synchronous , It's still blocking , As long as we change it a little , Asynchronous observation can be realized .
2. Define the observed
There are two observers here : One is used to send messages to the official account , The other is used to send messages to applets . Because in java.util The following provides interfaces and methods to implement the observer pattern , We can use it directly .
/**
* Send a message to the applet topic
*
* @author caoyong
* @version 1.0.0
* @since 2019-01-25 10:41
**/
@Component
@Getter
@Setter
public class SendMiniProgramSubject extends Observable {
private String userId;
/**
* Notice sent
*/
public void send() {
this.setChanged();
this.notifyObservers();
}
}
/**
* Send message to official account subject
*
* @author caoyong
* @version 1.0.0
* @since 2019-01-25 10:40
**/
@Getter
@Setter
@Component
public class SendPublicAccountSubject extends Observable {
private String userId;
/**
* Notice sent
*/
public void send() {
this.setChanged();
this.notifyObservers();
}
}
3. Define the observer
When the observer is dealing with more tasks , In order to improve processing efficiency , We can use thread pools .
/**
* Subscription observer
*
* @author caoyong
* @version 1.0.0
* @since 2019-01-25 10:34
**/
@Slf4j
@Component
// Flag is multiple
@Scope(value = ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public class SubscriberObserver implements Observer {
private ExecutorService executorService = Executors.newFixedThreadPool(5);
@Autowired
private WechatMessageService wechatMessageService;
@Override
public void update(Observable o, Object arg) {
// Use multithreading to send messages
executorService.execute(() -> {
// The processing applet sends messages
if (o instanceof SendMiniProgramSubject) {
SendMiniProgramSubject miniProgramSubject = (SendMiniProgramSubject) o;
log.info("send miniProgram message end.");
}
// Handle messages sent by official account
if (o instanceof SendPublicAccountSubject) {
SendPublicAccountSubject publicAccountSubject = (SendPublicAccountSubject) o;
wechatMessageService.sendCustomerdPublicAccount(publicAccountSubject.getUserId());
log.info("send publicAccount message end.");
}
});
}
}
4. Define the message sender
When sending messages here , We can use Spring Notes provided @Async To easily implement asynchrony .
/**
* Message sender
*
* @author caoyong
* @version 1.0.0
* @since 2019-01-25 15:08
**/
@Component
public class MessageSender {
@Autowired
private SendMiniProgramSubject sendMiniProgramSubject;
@Autowired
private SendPublicAccountSubject sendPublicAccountSubject;
@Autowired
private SubscriberObserver subscriberObserver;
/**
* Send to official account
*
* @param userId user id
*/
@Async
public void sendPublicAccount(String userId) {
sendPublicAccountSubject.setUserId(userId);
sendPublicAccountSubject.addObserver(subscriberObserver);
sendPublicAccountSubject.send();
}
/**
* Send it to the applet
*
* @param userId user id
*/
@Async
public void sendMiniProgram(String userId) {
sendMiniProgramSubject.setUserId(userId);
sendMiniProgramSubject.addObserver(subscriberObserver);
sendMiniProgramSubject.send();
}
}
To achieve asynchrony , If you're using Spring Boot, It needs to be added in the startup class :@EnableAsync
@SpringBootApplication
@EnableCaching
@EnableAsync
public class Application {
public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}
}
5. Test sending messages
adopt SpringJUnit Tools to test whether our message sending is successful , View log output location , You can see whether asynchrony is implemented .
/**
* @author caoyong
* @version 1.0.0
* @since 2019-01-25 16:21
**/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = Application.class)
public class TestObserver {
@Autowired
private MessageSender messageSender;
@Test
public void testSendMessage() {
String userId = "testUserId";
messageSender.sendPublicAccount(userId );
}
}
边栏推荐
- Which company is better in JIRA organizational structure management?
- Do you know the functions of getbit and setbit in redis?
- ImageView supporting single finger sliding and double finger scaling
- Sign for this "plug-in" before returning home for the new year
- Jsonobject jsonarray for parsing
- Sword finger offer 32: print binary tree from top to bottom
- Database basic instruction set
- What do you need to know about Amazon evaluation?
- “All in ONE”一个平台解决所有需求,运维监控3.0时代已来
- NLP-D46-nlp比赛D15
猜你喜欢

How to use perforce helix core with CI build server

Error:Execution failed for task ':app:buildNative'. & gt; A problem occurred'x/x/x/'NDK build' error resolution

What should the cross-border e-commerce evaluation team do?

Don't be afraid of xxE vulnerabilities: understand their ferocity and detection methods

Cocoapods installation error
![Experimental report on information management and information system [information security and confidentiality] of Huazhong Agricultural University](/img/f6/e58196aeac85178f6603cea1962a6e.jpg)
Experimental report on information management and information system [information security and confidentiality] of Huazhong Agricultural University

Pycharm usage experience

Wechat custom component - style - slot

NDK learning notes (I)

Thymeleafengine template engine
随机推荐
What is a thread pool?
JS -- reference type
Multithreading tutorial (XXIII) thread safety without lock
Multithreading tutorial (XXV) atomic array
Devsecops in Agile Environment
微信自定义组件---样式--插槽
数据接入平台方案实现(游族网络)
Functional interface lambda, elegant code development
Install Oracle Database
Jsonobject jsonarray for parsing
Informatica: six steps of data quality management
What do you need to know about Amazon evaluation?
Principle of copyonwritearraylist copy on write
亚马逊、速卖通、Lazada、虾皮平台在用911+VM的环境可以进行产号、养号、补单等操作吗?
Multi threading tutorial (XXIV) cas+volatile
VSCode插件开发
NFC Development -- utility tools and development documents (IV)
修复【无 Internet, 安全】问题
Warmly celebrate that yeyanxiu, senior consultant of Longzhi, won the title of "atlassian Certified Expert"
What happened to the young man who loved to write code -- approaching the "Yao Guang young man" of Huawei cloud