当前位置:网站首页>[cloud native] event publishing and subscription in Nacos -- observer mode
[cloud native] event publishing and subscription in Nacos -- observer mode
2022-07-05 12:51:00 【Shizhenzhen's grocery store】
EventDispatcher
EventDispatcher stay Nacos Is a class that publishes and subscribes to events , Which is what we often use Java Design patterns —— Observer mode
Generally, publish and subscribe have three main roles
event : Represents certain types of event actions , for example Nacos Medium Local data changes
LocalDataChangeEvent
Event source : The event source can be viewed as an action , The act of an event , for example Nacos Local data in has changed , It will notify all listeners listening to this event
Event listener : After the event listener listens to the event source , Will perform some of their own business processing , The listener must have a callback method for the event source to callback
A listener can listen to multiple events , An event can also be listened to by multiple listeners
Let's look at the roles in this class
event
Event
/** Event definition interface , All events inherit this empty interface **/
public interface Event {
}
LocalDataChangeEvent
/** * Local data change events . * @author Nacos */
public class LocalDataChangeEvent implements Event {
}
Event listener
AbstractEventListener
/** Abstract event listeners ; Each listener needs to implement onEvent() Handling events , and interest() List of events to listen to **/
static public abstract class AbstractEventListener {
public AbstractEventListener() {
/* Automatically register to */
EventDispatcher.addEventListener(this);
}
/** List of events of interest **/
abstract public List<Class<? extends Event>> interest();
/** Handling events **/
abstract public void onEvent(Event event);
}
LongPollingService
/** * Long polling service . Responsible for handling * * @author Nacos */
@Service
public class LongPollingService extends AbstractEventListener {
@Override
public List<Class<? extends Event>> interest() {
List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>();
eventTypes.add(LocalDataChangeEvent.class);
return eventTypes;
}
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
}
Event distribution class
EventDispatcher
public class EventDispatcher {
/** Data center for events and event listeners ; An event can correspond to multiple listeners **/
static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();
/** * New monitor */
static public void addEventListener(AbstractEventListener listener) {
for (Class<? extends Event> type : listener.interest()) {
getEntry(type).listeners.addIfAbsent(listener);
}
}
/** * Interface actions called by the event source , Tell me something happened */
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
Event source
For example, when deleting a configuration file , You need to trigger Local data change event , You need to call
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
Called fireEvent After that, all monitor this Event All listeners will execute listener.onEvent(event);
There are many ways to use event publishing and subscription , But the basic pattern is the same — Observer mode ;
Let's introduce other uses
Google Guava Medium EventBus
EventBus yes Guava Event handling mechanism of , It's the observer pattern in the design pattern ( production / Consumer programming model ) The elegant realization of . For event monitoring and publish subscribe mode ,EventBus It's a very elegant and simple solution , We don't have to create complex class and interface hierarchies .
EventBucket
Let's customize a class EventBucket, To initialize and register some listeners ( subscriber )
@Component
public class EventBucket {
private static Logger logger = LoggerFactory.getLogger(EventBucket.class);
/** Event bus **/
private static EventBus asyncEventBus = new AsyncEventBus("asyncEventBus", Executors.newFixedThreadPool(5));
private static AtomicBoolean isInit = new AtomicBoolean(false);
private final List<AsyncListener> asyncListenerList;
/** Set all types to AsyncListener Listener injection of **/
@Autowired
public EventBucket(List<AsyncListener> asyncListenerList) {
this.asyncListenerList = asyncListenerList;
}
@PostConstruct
public synchronized void init() {
// All event listeners should be in EventBus To register
if (isInit.compareAndSet(false, true)) {
asyncListenerList.forEach(a -> asyncEventBus.register(a));
}
}
/** Send events **/
public static void post(BaseEvent event) {
try {
asyncEventBus.post(event);
} catch (Throwable e) {
logger.error("EventBucket Error sending event : " + e.getMessage(), e);
}
}
}
BaseEvent
Definition BaseEvent This class has a post Method , Used to send events ; be-all ** Events must inherit from this class
public class BaseEvent {
public void post(){
EventBucket.post(this);
}
}
AsyncListener
Define a listener empty interface , All listener classes that inherit this interface will be registered to EventBus in ;
public interface AsyncListener {
}
The basic classes are defined above , Let's test how to use publish and subscribe
First subscribe to an event TestEvent
public class TestEvent extends BaseEvent {
private Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
Then define a listener TestAsyncListener
@Component
public class TestAsyncListener implements AsyncListener {
@Subscribe
public void testEvent(TestEvent testEvent){
System.out.print(" I am a TestAsyncListener received TestEvent notice "+testEvent.toString());
}
@Subscribe
public void test2Event(Test2Event test2Event){
System.out.print(" I am a TestAsyncListener received test2Event notice "+test2Event.toString());
}
}
With notes @Subscribe
You can subscribe to events directly ;
Then start sending an event ; We will have a SpringBoot Call send event notification after startup
@SpringBootApplication
public class ClientsApplication {
public static void main(String[] args) {
SpringApplication.run(ClientsApplication.class, args);
TestEvent event = new TestEvent();
event.setId(1);
// Issue notice
event.post();
}
}
Once the boot is complete , Immediately printed
Spring Event driven mechanism
This blog is written in detail and can be read
Spring Event driven mechanism
Nacos Listening extension interface used in
- SpringApplicationRunListener
- ApplicationListener
SpringApplicationRunListener
SpringApplicationRunListener The main function of the interface is to Spring Boot The initialization process can be started through SpringApplicationRunListener Interface callback to allow users to add their own logic in each process started .
So is it Observer mode ,Spring Provides us with this Monitor Extension interface for ; What it monitors is SpringBoot Start the following items in initialization event
SpringBoot Key events in the startup process ( According to the trigger sequence ) Include :
1. To start
2. Environment Build complete
3. ApplicationContext Build complete
4. ApplicationContext Finish loading
5. ApplicationContext Finish refreshing and start
6. Start up
7. Boot failure
package org.springframework.boot;
public interface SpringApplicationRunListener {
// stay run() When the method starts to execute , The method is called immediately , Can be used to do some work at the earliest stage of initialization
void starting();
// When environment Build complete ,ApplicationContext Before creation , The method is called
void environmentPrepared(ConfigurableEnvironment environment);
// When ApplicationContext When the build is complete , The method is called
void contextPrepared(ConfigurableApplicationContext context);
// stay ApplicationContext Finish loading , But it wasn't refreshed before , The method is called
void contextLoaded(ConfigurableApplicationContext context);
// stay ApplicationContext After refreshing and starting ,CommandLineRunners and ApplicationRunner Before being called , The method is called
void started(ConfigurableApplicationContext context);
// stay run() The method is called before the execution of the method is completed
void running(ConfigurableApplicationContext context);
// This method is called when the application runs in error
void failed(ConfigurableApplicationContext context, Throwable exception);
}
StartingSpringApplicationRunListener
- In this listening class , It mainly sets some system properties ; Such as :
nacos.mode=stand alone / cluster
nacos.function.mode=All/config/naming
nacos.local.ip=InetUtils.getSelfIp()
@Override
public void environmentPrepared(ConfigurableEnvironment environment) {
if (STANDALONE_MODE) {
System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "stand alone");
} else {
System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "cluster");
}
if (FUNCTION_MODE == null) {
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, "All");
} else if(SystemUtils.FUNCTION_MODE_CONFIG.equals(FUNCTION_MODE)){
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_CONFIG);
} else if(SystemUtils.FUNCTION_MODE_NAMING.equals(FUNCTION_MODE)) {
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_NAMING);
}
System.setProperty(LOCAL_IP_PROPERTY_KEY, LOCAL_IP);
}
- And by the way, before the start is over , Print the log once per second Nacos is starting…
ApplicationListener
ApplicationListener Namely spring The monitor for , Can be used to monitor events , The typical observer pattern
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/** * Handle an application event. * @param event the event to respond to */
void onApplicationEvent(E event);
}
ApplicationEvent
yes event The abstract class of ; Concrete events must inherit this class ;
Nacos in StandaloneProfileApplicationListener
public class StandaloneProfileApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>,
PriorityOrdered {
private static final Logger logger = LoggerFactory.getLogger(StandaloneProfileApplicationListener.class);
@Override
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
ConfigurableEnvironment environment = event.getEnvironment();
if (environment.getProperty(STANDALONE_MODE_PROPERTY_NAME, boolean.class, false)) {
environment.addActiveProfile(STANDALONE_SPRING_PROFILE);
}
if (logger.isInfoEnabled()) {
logger.info("Spring Environment's active profiles : {} in standalone mode : {}",
Arrays.asList(environment.getActiveProfiles()),
STANDALONE_MODE
);
}
}
@Override
public int getOrder() {
return HIGHEST_PRECEDENCE;
}
}
The generic type of the listener here is ApplicationEnvironmentPreparedEvent
This incident is SpringBoot Built in events ;
SpringApplication Start and Environment Events published when first available for inspection and modification , That is to say through ApplicationEnvironmentPreparedEvent Can get Environment Properties of ;
The function of this listener here is to get ConfigurableEnvironment
, Then if it is in stand-alone mode standalone
Just set it up ActiveProfile
EnvironmentPostProcessor Load external configuration file
SpringBoot Support dynamic reading of files , Left extension interface
org.springframework.boot.env.EnvironmentPostProcessor
. This interface is spring Under bag , Use this for centralized management of configuration files , You don't need to configure configuration files for every project .
NacosDefaultPropertySourceEnvironmentPostProcessor load Nacos The configuration file
Loading this class is loading core Below the module META-INF/nacos-default.properties
Profile's ;
public class NacosDefaultPropertySourceEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered {
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
ResourceLoader resourceLoader = getResourceLoader(application);
processPropertySource(environment, resourceLoader);
}
}
Spring Factories SPI Extension mechanism
Spring Boot There is a very decoupled extension mechanism in :Spring Factories. This kind of extension mechanism actually imitates Java Medium SPI Extension mechanism
In a brief summary java SPI The idea of mechanism . The abstract modules in our system , There are often many different implementations , For example, the scheme of log module ,xml Parsing module 、jdbc The scheme of the module, etc . In object-oriented design , We generally recommend interface based programming between modules , The implementation classes are not hard coded between modules . Once a specific implementation class is involved in the code , It violates the pluggable principle , If you need to replace an implementation , You need to change the code . In order to realize the module assembly without dynamic indication in the program , This requires a service discovery mechanism ;
stay Dubbo Is also defined in SPI Mechanism ;
stay Spring There is also a kind of similar to Java SPI Loading mechanism . It's in META-INF/spring.factories The implementation class name of the configuration interface in the file , Then read these configuration files in the program and instantiate .
This custom SPI The mechanism is Spring Boot Starter The foundation of realization .
Let's look at the above Nacos Some classes in , It is not marked @Component wait Spring The annotations in , Without these annotations, how are they loaded into Spring Managed in the container ?
We open the file core/resources/META-INF/spring.factories
# ApplicationListener
org.springframework.context.ApplicationListener=\
com.alibaba.nacos.core.listener.StandaloneProfileApplicationListener
# EnvironmentPostProcessor
org.springframework.boot.env.EnvironmentPostProcessor=\
com.alibaba.nacos.core.env.NacosDefaultPropertySourceEnvironmentPostProcessor
# SpringApplicationRunListener
org.springframework.boot.SpringApplicationRunListener=\
com.alibaba.nacos.core.listener.LoggingSpringApplicationRunListener,\
com.alibaba.nacos.core.listener.StartingSpringApplicationRunListener
The full class names of the above mentioned classes are in this file ;
- Spring Factories What is the principle of implementation
spring-core It's defined in the package SpringFactoriesLoader class , This class implements Retrieval META-INF/spring.factories file , And get the configuration function of the specified interface
Specific implementation principle , You can write another article later ;
边栏推荐
- Time conversion error
- Redis cluster configuration
- NFT: how to make money with unique assets?
- Research: data security tools cannot resist blackmail software in 60% of cases
- C alarm design
- DNS的原理介绍
- 2021-12-22 transaction record
- Distributed solution - distributed session consistency problem
- Hexadecimal conversion summary
- 2021-12-21 transaction record
猜你喜欢
About LDA model
Install rhel8.2 virtual machine
How to connect the API interface of Taobao open platform (super detailed)
Distance measuring sensor chip 4530a used in home intelligent lighting
Simply take stock reading notes (2/8)
Pytoch loads the initialization V3 pre training model and reports an error
Taobao, pinduoduo, jd.com, Doudian order & Flag insertion remarks API solution
ActiveMQ installation and deployment simple configuration (personal test)
[figure neural network] GNN from entry to mastery
实战模拟│JWT 登录认证
随机推荐
Distributed solution - distributed lock solution - redis based distributed lock implementation
HiEngine:可媲美本地的云原生内存数据库引擎
Constructing expression binary tree with prefix expression
Kotlin流程控制、循环
Annotation problem and hidden Markov model
SAP self-development records user login logs and other information
ActiveMQ installation and deployment simple configuration (personal test)
Taobao order interface | order flag remarks, may be the most stable and easy-to-use interface
从39个kaggle竞赛中总结出来的图像分割的Tips和Tricks
stm32和电机开发(从架构图到文档编写)
Distributed solution - Comprehensive decryption of distributed task scheduling platform - xxljob scheduling center cluster
自然语言处理从小白到精通(四):用机器学习做中文邮件内容分类
insmod 提示 Invalid module format
10 minute fitness method reading notes (2/5)
Pytoch uses torchnet Classerrormeter in meter
Clear neo4j database data
【云原生】Nacos中的事件发布与订阅--观察者模式
Distributed solution - Comprehensive decryption of distributed task scheduling platform -xxljob
Halcon 模板匹配实战代码(一)
Taobao flag insertion remarks | logistics delivery interface