当前位置:网站首页>[cloud native] event publishing and subscription in Nacos -- observer mode
[cloud native] event publishing and subscription in Nacos -- observer mode
2022-07-05 12:51:00 【Shizhenzhen's grocery store】
EventDispatcher
EventDispatcher stay Nacos Is a class that publishes and subscribes to events , Which is what we often use Java Design patterns —— Observer mode
Generally, publish and subscribe have three main roles
event : Represents certain types of event actions , for example Nacos Medium Local data changes
LocalDataChangeEventEvent source : The event source can be viewed as an action , The act of an event , for example Nacos Local data in has changed , It will notify all listeners listening to this event
Event listener : After the event listener listens to the event source , Will perform some of their own business processing , The listener must have a callback method for the event source to callback
A listener can listen to multiple events , An event can also be listened to by multiple listeners
Let's look at the roles in this class
event
Event
/** Event definition interface , All events inherit this empty interface **/
public interface Event {
}
LocalDataChangeEvent
/** * Local data change events . * @author Nacos */
public class LocalDataChangeEvent implements Event {
}
Event listener
AbstractEventListener
/** Abstract event listeners ; Each listener needs to implement onEvent() Handling events , and interest() List of events to listen to **/
static public abstract class AbstractEventListener {
public AbstractEventListener() {
/* Automatically register to */
EventDispatcher.addEventListener(this);
}
/** List of events of interest **/
abstract public List<Class<? extends Event>> interest();
/** Handling events **/
abstract public void onEvent(Event event);
}
LongPollingService
/** * Long polling service . Responsible for handling * * @author Nacos */
@Service
public class LongPollingService extends AbstractEventListener {
@Override
public List<Class<? extends Event>> interest() {
List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>();
eventTypes.add(LocalDataChangeEvent.class);
return eventTypes;
}
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
}
Event distribution class
EventDispatcher
public class EventDispatcher {
/** Data center for events and event listeners ; An event can correspond to multiple listeners **/
static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();
/** * New monitor */
static public void addEventListener(AbstractEventListener listener) {
for (Class<? extends Event> type : listener.interest()) {
getEntry(type).listeners.addIfAbsent(listener);
}
}
/** * Interface actions called by the event source , Tell me something happened */
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
Event source
For example, when deleting a configuration file , You need to trigger Local data change event , You need to call
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
Called fireEvent After that, all monitor this Event All listeners will execute listener.onEvent(event);
There are many ways to use event publishing and subscription , But the basic pattern is the same — Observer mode ;
Let's introduce other uses
Google Guava Medium EventBus
EventBus yes Guava Event handling mechanism of , It's the observer pattern in the design pattern ( production / Consumer programming model ) The elegant realization of . For event monitoring and publish subscribe mode ,EventBus It's a very elegant and simple solution , We don't have to create complex class and interface hierarchies .
EventBucket
Let's customize a class EventBucket, To initialize and register some listeners ( subscriber )
@Component
public class EventBucket {
private static Logger logger = LoggerFactory.getLogger(EventBucket.class);
/** Event bus **/
private static EventBus asyncEventBus = new AsyncEventBus("asyncEventBus", Executors.newFixedThreadPool(5));
private static AtomicBoolean isInit = new AtomicBoolean(false);
private final List<AsyncListener> asyncListenerList;
/** Set all types to AsyncListener Listener injection of **/
@Autowired
public EventBucket(List<AsyncListener> asyncListenerList) {
this.asyncListenerList = asyncListenerList;
}
@PostConstruct
public synchronized void init() {
// All event listeners should be in EventBus To register
if (isInit.compareAndSet(false, true)) {
asyncListenerList.forEach(a -> asyncEventBus.register(a));
}
}
/** Send events **/
public static void post(BaseEvent event) {
try {
asyncEventBus.post(event);
} catch (Throwable e) {
logger.error("EventBucket Error sending event : " + e.getMessage(), e);
}
}
}
BaseEvent
Definition BaseEvent This class has a post Method , Used to send events ; be-all ** Events must inherit from this class
public class BaseEvent {
public void post(){
EventBucket.post(this);
}
}
AsyncListener
Define a listener empty interface , All listener classes that inherit this interface will be registered to EventBus in ;
public interface AsyncListener {
}
The basic classes are defined above , Let's test how to use publish and subscribe
First subscribe to an event TestEvent
public class TestEvent extends BaseEvent {
private Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
Then define a listener TestAsyncListener
@Component
public class TestAsyncListener implements AsyncListener {
@Subscribe
public void testEvent(TestEvent testEvent){
System.out.print(" I am a TestAsyncListener received TestEvent notice "+testEvent.toString());
}
@Subscribe
public void test2Event(Test2Event test2Event){
System.out.print(" I am a TestAsyncListener received test2Event notice "+test2Event.toString());
}
}
With notes @Subscribe You can subscribe to events directly ;
Then start sending an event ; We will have a SpringBoot Call send event notification after startup
@SpringBootApplication
public class ClientsApplication {
public static void main(String[] args) {
SpringApplication.run(ClientsApplication.class, args);
TestEvent event = new TestEvent();
event.setId(1);
// Issue notice
event.post();
}
}
Once the boot is complete , Immediately printed 
Spring Event driven mechanism
This blog is written in detail and can be read
Spring Event driven mechanism
Nacos Listening extension interface used in
- SpringApplicationRunListener
- ApplicationListener
SpringApplicationRunListener
SpringApplicationRunListener The main function of the interface is to Spring Boot The initialization process can be started through SpringApplicationRunListener Interface callback to allow users to add their own logic in each process started .
So is it Observer mode ,Spring Provides us with this Monitor Extension interface for ; What it monitors is SpringBoot Start the following items in initialization event
SpringBoot Key events in the startup process ( According to the trigger sequence ) Include :
1. To start
2. Environment Build complete
3. ApplicationContext Build complete
4. ApplicationContext Finish loading
5. ApplicationContext Finish refreshing and start
6. Start up
7. Boot failure
package org.springframework.boot;
public interface SpringApplicationRunListener {
// stay run() When the method starts to execute , The method is called immediately , Can be used to do some work at the earliest stage of initialization
void starting();
// When environment Build complete ,ApplicationContext Before creation , The method is called
void environmentPrepared(ConfigurableEnvironment environment);
// When ApplicationContext When the build is complete , The method is called
void contextPrepared(ConfigurableApplicationContext context);
// stay ApplicationContext Finish loading , But it wasn't refreshed before , The method is called
void contextLoaded(ConfigurableApplicationContext context);
// stay ApplicationContext After refreshing and starting ,CommandLineRunners and ApplicationRunner Before being called , The method is called
void started(ConfigurableApplicationContext context);
// stay run() The method is called before the execution of the method is completed
void running(ConfigurableApplicationContext context);
// This method is called when the application runs in error
void failed(ConfigurableApplicationContext context, Throwable exception);
}
StartingSpringApplicationRunListener
- In this listening class , It mainly sets some system properties ; Such as :
nacos.mode=stand alone / cluster
nacos.function.mode=All/config/naming
nacos.local.ip=InetUtils.getSelfIp()
@Override
public void environmentPrepared(ConfigurableEnvironment environment) {
if (STANDALONE_MODE) {
System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "stand alone");
} else {
System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "cluster");
}
if (FUNCTION_MODE == null) {
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, "All");
} else if(SystemUtils.FUNCTION_MODE_CONFIG.equals(FUNCTION_MODE)){
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_CONFIG);
} else if(SystemUtils.FUNCTION_MODE_NAMING.equals(FUNCTION_MODE)) {
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_NAMING);
}
System.setProperty(LOCAL_IP_PROPERTY_KEY, LOCAL_IP);
}
- And by the way, before the start is over , Print the log once per second Nacos is starting…
ApplicationListener
ApplicationListener Namely spring The monitor for , Can be used to monitor events , The typical observer pattern
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/** * Handle an application event. * @param event the event to respond to */
void onApplicationEvent(E event);
}
ApplicationEvent yes event The abstract class of ; Concrete events must inherit this class ;
Nacos in StandaloneProfileApplicationListener
public class StandaloneProfileApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>,
PriorityOrdered {
private static final Logger logger = LoggerFactory.getLogger(StandaloneProfileApplicationListener.class);
@Override
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
ConfigurableEnvironment environment = event.getEnvironment();
if (environment.getProperty(STANDALONE_MODE_PROPERTY_NAME, boolean.class, false)) {
environment.addActiveProfile(STANDALONE_SPRING_PROFILE);
}
if (logger.isInfoEnabled()) {
logger.info("Spring Environment's active profiles : {} in standalone mode : {}",
Arrays.asList(environment.getActiveProfiles()),
STANDALONE_MODE
);
}
}
@Override
public int getOrder() {
return HIGHEST_PRECEDENCE;
}
}
The generic type of the listener here is ApplicationEnvironmentPreparedEvent
This incident is SpringBoot Built in events ;
SpringApplication Start and Environment Events published when first available for inspection and modification , That is to say through ApplicationEnvironmentPreparedEvent Can get Environment Properties of ;
The function of this listener here is to get ConfigurableEnvironment, Then if it is in stand-alone mode standalone Just set it up ActiveProfile
EnvironmentPostProcessor Load external configuration file
SpringBoot Support dynamic reading of files , Left extension interface
org.springframework.boot.env.EnvironmentPostProcessor. This interface is spring Under bag , Use this for centralized management of configuration files , You don't need to configure configuration files for every project .
NacosDefaultPropertySourceEnvironmentPostProcessor load Nacos The configuration file
Loading this class is loading core Below the module META-INF/nacos-default.properties Profile's ;
public class NacosDefaultPropertySourceEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered {
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
ResourceLoader resourceLoader = getResourceLoader(application);
processPropertySource(environment, resourceLoader);
}
}
Spring Factories SPI Extension mechanism
Spring Boot There is a very decoupled extension mechanism in :Spring Factories. This kind of extension mechanism actually imitates Java Medium SPI Extension mechanism
In a brief summary java SPI The idea of mechanism . The abstract modules in our system , There are often many different implementations , For example, the scheme of log module ,xml Parsing module 、jdbc The scheme of the module, etc . In object-oriented design , We generally recommend interface based programming between modules , The implementation classes are not hard coded between modules . Once a specific implementation class is involved in the code , It violates the pluggable principle , If you need to replace an implementation , You need to change the code . In order to realize the module assembly without dynamic indication in the program , This requires a service discovery mechanism ;
stay Dubbo Is also defined in SPI Mechanism ;
stay Spring There is also a kind of similar to Java SPI Loading mechanism . It's in META-INF/spring.factories The implementation class name of the configuration interface in the file , Then read these configuration files in the program and instantiate .
This custom SPI The mechanism is Spring Boot Starter The foundation of realization .
Let's look at the above Nacos Some classes in , It is not marked @Component wait Spring The annotations in , Without these annotations, how are they loaded into Spring Managed in the container ?
We open the file core/resources/META-INF/spring.factories
# ApplicationListener
org.springframework.context.ApplicationListener=\
com.alibaba.nacos.core.listener.StandaloneProfileApplicationListener
# EnvironmentPostProcessor
org.springframework.boot.env.EnvironmentPostProcessor=\
com.alibaba.nacos.core.env.NacosDefaultPropertySourceEnvironmentPostProcessor
# SpringApplicationRunListener
org.springframework.boot.SpringApplicationRunListener=\
com.alibaba.nacos.core.listener.LoggingSpringApplicationRunListener,\
com.alibaba.nacos.core.listener.StartingSpringApplicationRunListener
The full class names of the above mentioned classes are in this file ;
- Spring Factories What is the principle of implementation
spring-core It's defined in the package SpringFactoriesLoader class , This class implements Retrieval META-INF/spring.factories file , And get the configuration function of the specified interface
Specific implementation principle , You can write another article later ;
边栏推荐
- A possible investment strategy and a possible fuzzy fast stock valuation method
- Add a new cloud disk to Huawei virtual machine
- Constructing expression binary tree with prefix expression
- Vonedao solves the problem of organizational development effectiveness
- 石臻臻的2021总结和2022展望 | 文末彩蛋
- I met Tencent in the morning and took out 38K, which showed me the basic smallpox
- Redis master-slave configuration and sentinel mode
- OPPO小布推出预训练大模型OBERT,晋升KgCLUE榜首
- Distributed solution - Comprehensive decryption of distributed task scheduling platform -xxljob
- 以VMware创新之道,重塑多云产品力
猜你喜欢

谈谈我写作生涯的画图技巧

上午面了个腾讯拿 38K 出来的,让我见识到了基础的天花

【云原生】Nacos中的事件发布与订阅--观察者模式

Transactions from December 29, 2021 to January 4, 2022

Kotlin variable

JSON parsing error special character processing (really speechless... Troubleshooting for a long time)

Resnet+attention project complete code learning

Distributed cache architecture - cache avalanche & penetration & hit rate

About LDA model

Pytoch loads the initialization V3 pre training model and reports an error
随机推荐
GPON other manufacturers' configuration process analysis
关于 SAP UI5 getSAPLogonLanguage is not a function 的错误消息以及 API 版本的讨论
Research: data security tools cannot resist blackmail software in 60% of cases
Taobao short videos are automatically released in batches without manual RPA open source
Reshape the power of multi cloud products with VMware innovation
Transactions on December 23, 2021
Taobao short video, why the worse the effect
OPPO小布推出预训练大模型OBERT,晋升KgCLUE榜首
Knowledge representation (KR)
Rasa Chat Robot Tutorial (translation) (1)
上午面了个腾讯拿 38K 出来的,让我见识到了基础的天花
实战模拟│JWT 登录认证
C alarm design
自然语言处理从小白到精通(四):用机器学习做中文邮件内容分类
开发者,云原生数据库是未来吗?
在家庭智能照明中应用的测距传感芯片4530A
滴滴开源DELTA:AI开发者可轻松训练自然语言模型
Vonedao solves the problem of organizational development effectiveness
NFT: how to make money with unique assets?
Introduction to GNN