当前位置:网站首页>【云原生】Nacos中的事件发布与订阅--观察者模式
【云原生】Nacos中的事件发布与订阅--观察者模式
2022-07-05 12:38:00 【石臻臻的杂货铺】
EventDispatcher
EventDispatcher在Nacos中是一个事件发布与订阅的类,也就是我们经常使用的Java设计模式——观察者模式
一般发布与订阅主要有三个角色
事件: 表示某些类型的事件动作,例如Nacos中的 本地数据发生变更事件
LocalDataChangeEvent
事件源 : 事件源可以看成是一个动作,某个事件发生的动作,例如Nacos中本地数据发生了变更,就会通知给所有监听该事件的监听器
事件监听器: 事件监听器监听到事件源之后,会执行自己的一些业务处理,监听器必须要有回调方法供事件源回调
一个监听器可以监听多个事件,一个事件也可以被多个监听器监听
那我们看看这个类中的角色
事件
Event
/**事件定义接口,所有事件继承这个空接口**/
public interface Event {
}
LocalDataChangeEvent
/** * 本地数据发生变更的事件。 * @author Nacos */
public class LocalDataChangeEvent implements Event {
}
事件监听器
AbstractEventListener
/**抽象事件监听器; 每个监听器需要实现onEvent()处理事件,和interest()将要监听的事件列表**/
static public abstract class AbstractEventListener {
public AbstractEventListener() {
/*自动注册到*/
EventDispatcher.addEventListener(this);
}
/**感兴趣的事件列表**/
abstract public List<Class<? extends Event>> interest();
/**处理事件**/
abstract public void onEvent(Event event);
}
LongPollingService
/** * 长轮询服务。负责处理 * * @author Nacos */
@Service
public class LongPollingService extends AbstractEventListener {
@Override
public List<Class<? extends Event>> interest() {
List<Class<? extends Event>> eventTypes = new ArrayList<Class<? extends Event>>();
eventTypes.add(LocalDataChangeEvent.class);
return eventTypes;
}
@Override
public void onEvent(Event event) {
if (isFixedPolling()) {
// ignore
} else {
if (event instanceof LocalDataChangeEvent) {
LocalDataChangeEvent evt = (LocalDataChangeEvent)event;
scheduler.execute(new DataChangeTask(evt.groupKey, evt.isBeta, evt.betaIps));
}
}
}
}
事件分发类
EventDispatcher
public class EventDispatcher {
/**事件与事件监听器的数据中心; 一个事件可以对应着多个监听器**/
static final CopyOnWriteArrayList<Entry> LISTENER_HUB = new CopyOnWriteArrayList<Entry>();
/** * 新增监听器 */
static public void addEventListener(AbstractEventListener listener) {
for (Class<? extends Event> type : listener.interest()) {
getEntry(type).listeners.addIfAbsent(listener);
}
}
/** * 事件源调用的接口动作,告知某个事件发生了 */
static public void fireEvent(Event event) {
if (null == event) {
throw new IllegalArgumentException();
}
for (AbstractEventListener listener : getEntry(event.getClass()).listeners) {
try {
listener.onEvent(event);
} catch (Exception e) {
log.error(e.toString(), e);
}
}
}
事件源
例如当删除配置文件的时候,就需要触发本地数据变更事件,则需要调用
EventDispatcher.fireEvent(new LocalDataChangeEvent(groupKey));
调用了fireEvent之后所有监听这个Event的监听器都将执行listener.onEvent(event);
事件发布与订阅的使用方法有很多,但是基本模式都是一样的—观察者模式;
我们介绍一下其他的用法
Google Guava 中的EventBus
EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的类和接口层次结构。
EventBucket
我们自定义一个类EventBucket,来初始化及注册一些监听器(订阅者)
@Component
public class EventBucket {
private static Logger logger = LoggerFactory.getLogger(EventBucket.class);
/**事件总线**/
private static EventBus asyncEventBus = new AsyncEventBus("asyncEventBus", Executors.newFixedThreadPool(5));
private static AtomicBoolean isInit = new AtomicBoolean(false);
private final List<AsyncListener> asyncListenerList;
/**将所有类型为AsyncListener的监听器注入**/
@Autowired
public EventBucket(List<AsyncListener> asyncListenerList) {
this.asyncListenerList = asyncListenerList;
}
@PostConstruct
public synchronized void init() {
//要将所有的事件监听者都在 EventBus中去注册
if (isInit.compareAndSet(false, true)) {
asyncListenerList.forEach(a -> asyncEventBus.register(a));
}
}
/**发送事件**/
public static void post(BaseEvent event) {
try {
asyncEventBus.post(event);
} catch (Throwable e) {
logger.error("EventBucket发送事件出错: " + e.getMessage(), e);
}
}
}
BaseEvent
定义BaseEvent 这个类有个post方法,用来发送事件的;所有的**事件必须继承此类
public class BaseEvent {
public void post(){
EventBucket.post(this);
}
}
AsyncListener
定义一个监听器空接口,所有继承此接口的监听器类都将被注册到EventBus中;
public interface AsyncListener {
}
上面定义好了基本的类,那我们下面测试怎么使用发布以及订阅
首先订阅一个事件 TestEvent
public class TestEvent extends BaseEvent {
private Integer id;
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
}
然后定义一个监听器 TestAsyncListener
@Component
public class TestAsyncListener implements AsyncListener {
@Subscribe
public void testEvent(TestEvent testEvent){
System.out.print("我是TestAsyncListener接收到了TestEvent通知"+testEvent.toString());
}
@Subscribe
public void test2Event(Test2Event test2Event){
System.out.print("我是TestAsyncListener接收到了test2Event通知"+test2Event.toString());
}
}
用注解@Subscribe
就可以直接订阅事件了;
那么接下来开始发送一个事件;我们再SpringBoot启动完之后调用一下发送事件通知
@SpringBootApplication
public class ClientsApplication {
public static void main(String[] args) {
SpringApplication.run(ClientsApplication.class, args);
TestEvent event = new TestEvent();
event.setId(1);
//发布通知
event.post();
}
}
启动完成之后,立马打印了
Spring事件驱动机制
这篇博客写的比较详细可以前往阅读
Spring事件驱动机制
Nacos中使用的监听扩展接口
- SpringApplicationRunListener
- ApplicationListener
SpringApplicationRunListener
SpringApplicationRunListener 接口的作用主要就是在Spring Boot 启动初始化的过程中可以通过SpringApplicationRunListener接口回调来让用户在启动的各个流程中可以加入自己的逻辑。
它也是 观察者模式,Spring为我们提供了这个监听器的扩展接口;它监听的就是SpringBoot启动初始化中下面的各个事件
SpringBoot启动过程的关键事件(按照触发顺序)包括:
1. 开始启动
2. Environment构建完成
3. ApplicationContext构建完成
4. ApplicationContext完成加载
5. ApplicationContext完成刷新并启动
6. 启动完成
7. 启动失败
package org.springframework.boot;
public interface SpringApplicationRunListener {
// 在run()方法开始执行时,该方法就立即被调用,可用于在初始化最早期时做一些工作
void starting();
// 当environment构建完成,ApplicationContext创建之前,该方法被调用
void environmentPrepared(ConfigurableEnvironment environment);
// 当ApplicationContext构建完成时,该方法被调用
void contextPrepared(ConfigurableApplicationContext context);
// 在ApplicationContext完成加载,但没有被刷新前,该方法被调用
void contextLoaded(ConfigurableApplicationContext context);
// 在ApplicationContext刷新并启动后,CommandLineRunners和ApplicationRunner未被调用前,该方法被调用
void started(ConfigurableApplicationContext context);
// 在run()方法执行完成前该方法被调用
void running(ConfigurableApplicationContext context);
// 当应用运行出错时该方法被调用
void failed(ConfigurableApplicationContext context, Throwable exception);
}
StartingSpringApplicationRunListener
- 在这个监听类中,主要是做了一些系统属性的设置;如:
nacos.mode=stand alone / cluster
nacos.function.mode=All/config/naming
nacos.local.ip=InetUtils.getSelfIp()
@Override
public void environmentPrepared(ConfigurableEnvironment environment) {
if (STANDALONE_MODE) {
System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "stand alone");
} else {
System.setProperty(MODE_PROPERTY_KEY_STAND_MODE, "cluster");
}
if (FUNCTION_MODE == null) {
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, "All");
} else if(SystemUtils.FUNCTION_MODE_CONFIG.equals(FUNCTION_MODE)){
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_CONFIG);
} else if(SystemUtils.FUNCTION_MODE_NAMING.equals(FUNCTION_MODE)) {
System.setProperty(MODE_PROPERTY_KEY_FUNCTION_MODE, SystemUtils.FUNCTION_MODE_NAMING);
}
System.setProperty(LOCAL_IP_PROPERTY_KEY, LOCAL_IP);
}
- 还有顺便再启动结束之前,每秒中打印一次日志 Nacos is starting…
ApplicationListener
ApplicationListener 就是spring的监听器,能够用来监听事件,典型的观察者模式
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
/** * Handle an application event. * @param event the event to respond to */
void onApplicationEvent(E event);
}
ApplicationEvent
是事件的抽象类; 具体的事件必须继承这个类;
Nacos中StandaloneProfileApplicationListener
public class StandaloneProfileApplicationListener implements ApplicationListener<ApplicationEnvironmentPreparedEvent>,
PriorityOrdered {
private static final Logger logger = LoggerFactory.getLogger(StandaloneProfileApplicationListener.class);
@Override
public void onApplicationEvent(ApplicationEnvironmentPreparedEvent event) {
ConfigurableEnvironment environment = event.getEnvironment();
if (environment.getProperty(STANDALONE_MODE_PROPERTY_NAME, boolean.class, false)) {
environment.addActiveProfile(STANDALONE_SPRING_PROFILE);
}
if (logger.isInfoEnabled()) {
logger.info("Spring Environment's active profiles : {} in standalone mode : {}",
Arrays.asList(environment.getActiveProfiles()),
STANDALONE_MODE
);
}
}
@Override
public int getOrder() {
return HIGHEST_PRECEDENCE;
}
}
这里的监听器的泛型传的ApplicationEnvironmentPreparedEvent
这个事件是SpringBoot内置的事件;
SpringApplication启动并且Environment首次可用于检查和修改时发布的事件,也就是说通过ApplicationEnvironmentPreparedEvent可以拿到Environment的属性;
在这里这个监听器的作用就是拿到 ConfigurableEnvironment
,然后如果是单机模式standalone
就设置一下ActiveProfile
EnvironmentPostProcessor加载外部配置文件
SpringBoot支持动态的读取文件,留下的扩展接口
org.springframework.boot.env.EnvironmentPostProcessor
。这个接口是spring包下的,使用这个进行配置文件的集中管理,而不需要每个项目都去配置配置文件。
NacosDefaultPropertySourceEnvironmentPostProcessor加载Nacos配置文件
加载这个类是加载core模块下面的META-INF/nacos-default.properties
配置文件的;
public class NacosDefaultPropertySourceEnvironmentPostProcessor implements EnvironmentPostProcessor, Ordered {
@Override
public void postProcessEnvironment(ConfigurableEnvironment environment, SpringApplication application) {
ResourceLoader resourceLoader = getResourceLoader(application);
processPropertySource(environment, resourceLoader);
}
}
Spring Factories SPI扩展机制
Spring Boot中有一种非常解耦的扩展机制:Spring Factories。这种扩展机制实际上是仿照Java中的SPI扩展机制来实现的
简单的总结下java SPI机制的思想。我们系统里抽象的各个模块,往往有很多不同的实现方案,比如日志模块的方案,xml解析模块、jdbc模块的方案等。面向的对象的设计里,我们一般推荐模块之间基于接口编程,模块之间不对实现类进行硬编码。一旦代码里涉及具体的实现类,就违反了可拔插的原则,如果需要替换一种实现,就需要修改代码。为了实现在模块装配的时候能不在程序里动态指明,这就需要一种服务发现机制;
在Dubbo中也定义了SPI机制;
在Spring中也有一种类似与Java SPI的加载机制。它在META-INF/spring.factories文件中配置接口的实现类名称,然后在程序中读取这些配置文件并实例化。
这种自定义的SPI机制是Spring Boot Starter实现的基础。
我们看上面说的Nacos中的几个类,并没有打上@Component等等Spring中的注解,没有这些注解那么他们是怎么被加载到Spring容器中被管理的呢?
我们打开文件 core/resources/META-INF/spring.factories
# ApplicationListener
org.springframework.context.ApplicationListener=\
com.alibaba.nacos.core.listener.StandaloneProfileApplicationListener
# EnvironmentPostProcessor
org.springframework.boot.env.EnvironmentPostProcessor=\
com.alibaba.nacos.core.env.NacosDefaultPropertySourceEnvironmentPostProcessor
# SpringApplicationRunListener
org.springframework.boot.SpringApplicationRunListener=\
com.alibaba.nacos.core.listener.LoggingSpringApplicationRunListener,\
com.alibaba.nacos.core.listener.StartingSpringApplicationRunListener
上面提及到的几个类的全类名都在这个文件中;
- Spring Factories实现原理是什么
spring-core包里定义了SpringFactoriesLoader类,这个类实现了检索META-INF/spring.factories文件,并获取指定接口的配置的功能
具体的实现原理,后面可以再写一篇文章;
边栏推荐
- Taobao, pinduoduo, jd.com, Doudian order & Flag insertion remarks API solution
- Difference between JUnit theories and parameterized tests
- Redis cluster configuration
- Oppo Xiaobu launched Obert, a large pre training model, and promoted to the top of kgclue
- ZABBIX 5.0 - LNMP environment compilation and installation
- ZABBIX agent2 installation
- Simply take stock reading notes (1/8)
- 10 minute fitness method reading notes (1/5)
- PIP command reports an error pip is configured with locations that requires tls/ssl problems
- ZABBIX monitors mongodb templates and configuration operations
猜你喜欢
Oppo Xiaobu launched Obert, a large pre training model, and promoted to the top of kgclue
Pinduoduo flag insertion remarks API
What if wechat is mistakenly sealed? Explain the underlying logic of wechat seal in detail
Ecplise development environment configuration and simple web project construction
10 minute fitness method reading notes (5/5)
I met Tencent in the morning and took out 38K, which showed me the basic smallpox
Taobao order interface | order flag remarks, may be the most stable and easy-to-use interface
Resnet+attention project complete code learning
UNIX socket advanced learning diary - advanced i/o functions
JSON parsing error special character processing (really speechless... Troubleshooting for a long time)
随机推荐
Kotlin process control and circulation
Redis cluster configuration
SAP 自开发记录用户登录日志等信息
GPON other manufacturers' configuration process analysis
Laravel文档阅读笔记-mews/captcha的使用(验证码功能)
Array cyclic shift problem
Redis highly available sentinel cluster
Redis highly available sentinel mechanism
2021-12-22 transaction record
Solution to order timeout unpaid
GNN(pytorch-geometric)
Constructing expression binary tree with prefix expression
JDBC exercise - query data encapsulated into object return & simple login demo
Distributed solution - Comprehensive decryption of distributed task scheduling platform -xxljob
jxl笔记
The relationship between the size change of characteristic graph and various parameters before and after DL convolution operation
PIP command reports an error pip is configured with locations that requires tls/ssl problems
Redis clean cache
Learn the garbage collector of JVM -- a brief introduction to Shenandoah collector
Kotlin流程控制、循环