当前位置:网站首页>C#事件总线
C#事件总线
2022-06-11 12:09:00 【三岁Funny】
C#事件总线
实现事件总线
通用的发布订阅模式不是我们的目的,我们的目的是一个集中式的事件处理机制,且各个模块之间相互不产生依赖。那我们如何做到呢?同样我们还是一步一步的进行分析改造。
分析问题
思考一下,每次为了实现这个模式,都要完成以下三步:
Publish 发布
Subscribe 订阅
Unsubscribe 取消订阅
这对应着发布订阅模式的基本概念,不过对于事件总线的接口添加了许多约束:
发布的内容(消息)必须是IntegrationEvent及其子类
订阅事件必须指明要订阅事件的类型,并附带处理器类型
处理器必须是IIntegrationEventHandler的实现类
Ok,看到这里先不要管Dynamic相关的方法,然后记住这个两个关键点:
事件必须继承IntegrationEvent
处理器必须实现IIntegrationEventHandler且T是IntegrationEvent子类
另外,看下 IntegrationEvent有什么
public class IntegrationEvent{
public IntegrationEvent() {
Id = Guid.NewGuid();
CreationDate = DateTime.UtcNow;
} public Guid Id {
get; }
public DateTime CreationDate {
get; }
}
IEventBusSubscriptionsManager是什么
public interface IEventBusSubscriptionsManager{
bool IsEmpty {
get; }
这个接口看起来稍显复杂些,我们来简化下看看:
public interface IEventBusSubscriptionsManager{
void AddSubscription<T, TH>()
void RemoveSubscription<T, TH>()
IEnumerable<SubscriptionInfo> GetHandlersForEvent<T>()
}
最终,这三个方法就是我们要关注的,添加订阅、移除订阅、获取指定事件的订阅信息。
SubscriptionInfo是什么?
public bool IsDynamic {
get; }public Type HandlerType{
get; }
SubscriptionInfo中只有两个信息,这是不是一个Dynamic类型的Event以及这个Event所对应的处理器的类型。
这是你可能会有另一个疑问:
这个和IEventBus有什么关系?
IEventBusSubscriptionsManager含有更多功能:查看是否有订阅,获取事件的Type,获取事件的处理器等等
IEventBusSubscriptionsManager由IEventBus使用,在RabbitMq和ServiceBus的实现中,都使用Manager去存储事件的信息,例如下面的代码:
public void Subscribe<T, TH>() where T : IntegrationEvent where TH : IIntegrationEventHandler<T>
{
// 查询事件的全名
var eventName = _subsManager.GetEventKey<T>(); //向mq添加注册
DoInternalSubscription(eventName); // 向manager添加订阅
_subsManager.AddSubscription<T, TH>();
}
private void DoInternalSubscription(string eventName){
var containsKey = _subsManager.HasSubscriptionsForEvent(eventName);
if (!containsKey)
{
if (!_persistentConnection.IsConnected)
{
_persistentConnection.TryConnect();
}
using (var channel = _persistentConnection.CreateModel())
{
channel.QueueBind(queue: _queueName,
exchange: BROKER_NAME,
routingKey: eventName);
}
}
}
查询事件的名字是manager做的,订阅的时候是先向mq添加订阅,之后又加到manager中,manager管理着订阅的基本信息。
另外一个重要功能是获取事件的处理器信息,在rabbit mq的实现中,ProcessEvent方法中用manager获取了事件的处理器,再用依赖注入获得处理器的实例,反射调用Handle方法处理事件信息:
private async Task ProcessEvent(string eventName, string message)
{
// 从manager查询信息
if (_subsManager.HasSubscriptionsForEvent(eventName))
{
using (var scope = _autofac.BeginLifetimeScope(AUTOFAC_SCOPE_NAME))
{
// 从manager获取处理器
var subscriptions = _subsManager.GetHandlersForEvent(eventName);
foreach (var subscription in subscriptions)
{
// Di + 反射调用,处理事件(两个都是,只是针对是否是dynamic做了不同的处理)
if (subscription.IsDynamic)
{
var handler = scope.ResolveOptional(subscription.HandlerType) as IDynamicIntegrationEventHandler;
dynamic eventData = JObject.Parse(message);
await handler.Handle(eventData);
}
else
{
var eventType = _subsManager.GetEventTypeByName(eventName);
var integrationEvent = JsonConvert.DeserializeObject(message, eventType);
var handler = scope.ResolveOptional(subscription.HandlerType);
var concreteType = typeof(IIntegrationEventHandler<>).MakeGenericType(eventType);
await (Task)concreteType.GetMethod("Handle").Invoke(handler, new object[] {
integrationEvent });
}
}
}
}
}
IEventBusSubscriptionsManager的默认实现
在eShop中只有一个实现就是InMemoryEventBusSubscriptionsManager类
这个类中有两个重要的字段
private readonly Dictionary<string, List<SubscriptionInfo>> _handlers;
private readonly List<Type> _eventTypes;
他们分别存储了事件列表和事件处理器信息词典
接下来就是实现一个
基于内存的事件总线
我们要做什么呢?IEventBusSubscriptionsManager 已经有了InMemory的实现了,我们可以直接拿来用,所以我们只需要自己实现一个EventBus就好了
先贴出代码
public class InMemoryEventBus : IEventBus
{
private readonly IServiceProvider _provider;
private readonly ILogger<InMemoryEventBus> _logger;
private readonly ISubscriptionsManager _manager;
private readonly IList<IntegrationEvent> _events;
public InMemoryEventBus(
IServiceProvider provider,
ILogger<InMemoryEventBus> logger,
ISubscriptionsManager manager)
{
_provider = provider;
_logger = logger;
_manager = manager;
}
public void Publish(IntegrationEvent e)
{
var eventType = e.GetType();
var handlers = _manager.GetHandlersForEvent(eventType.FullName);
foreach (var handlerInfo in handlers)
{
var handler = _provider.GetService(handlerInfo.HandlerType);
var method = handlerInfo.HandlerType.GetMethod("Handle");
method.Invoke(handler, new object[] {
e });
}
}
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.AddSubscription<T, TH>();
}
public void SubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.RemoveSubscription<T, TH>();
}
public void UnsubscribeDynamic<TH>(string eventName) where TH : IDynamicIntegrationEventHandler
{
throw new NotImplementedException();
}
}
首先构造函数中声明我们要使用的东西:
public InMemoryEventBus(
IServiceProvider provider,
ILogger<InMemoryEventBus> logger,
ISubscriptionsManager manager)
{
_provider = provider;
_logger = logger;
_manager = manager;
}
这里要注意的就是IServiceProvider provider这是 DI容器,当我们在切实处理事件的时候我们选择从DI获取处理器的实例,而不是反射创建,这要做的好处在于,处理器可以依赖于其它东西,并且可以是单例的
public void Subscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.AddSubscription<T, TH>();
}
public void Unsubscribe<T, TH>()
where T : IntegrationEvent
where TH : IIntegrationEventHandler<T>
{
_manager.RemoveSubscription<T, TH>();
}
订阅和取消订阅很简单,因为我们是InMemory的所以只调用了manager的方法。
接下来就是最重要的Publish方法,实现Publish有两种方式:
使用额外的线程和Queue让发布和处理异步
为了简单起见,我们先写个简单易懂的同步的
public void Publish(IntegrationEvent e)
{
// 首先要拿到集成事件的Type信息
var eventType = e.GetType();
// 获取属于这个事件的处理器列表,可能有很多,注意获得的是SubscriptionInfo
var handlers = _manager.GetHandlersForEvent(eventType.FullName);
// 不解释循环
foreach (var handlerInfo in handlers)
{
// 从DI中获取类型的实例
var handler = _provider.GetService(handlerInfo.HandlerType);
// 拿到Handle方法
var method = handlerInfo.HandlerType.GetMethod("Handle");
// 调用方法
method.Invoke(handler, new object[] {
e });
}
}
OK,我们的InMemoryEventBus就写好了!
要实践这个InMemoryEventBus,那么还需要一个IntegrationEvent的子类,和一个IIntegrationEventHandler的实现类,这些都不难,例如我们做一个添加用户的事件,A在添加用户后,发起一个事件并将新用户的名字作为事件数据,B去订阅事件,并在自己的处理器中处理名字信息。
思路是这样的:
写一个 AddUserEvent:IntegrationEvent,里面有一个UserId和一个UserName。
写一个AddUserEventHandler:IIntegrationEventHandler<AddUserEvent>,在Handle方法中输出UserId和Name到日志。
注册DI,你要注册下面这些服务:
IEventBus=>InMemoryEventBus
ISubscriptionsManager=>InMemorySubscriptionsManager
AddUserEventHandler=>AddUserEventHandler
在Startup中为刚刚写的事件和处理器添加订阅(在这里已经可以获取到IEventBus实例了)
写一个Api接口或是什么,调用IEventBus的Publish方法,new 一个新的AddUserEvent作为参数传进去。
OK!到这里一个切实可用的InMemoryEventBus就可以使用了。
- 如果你觉得这篇博客有用,那就点击右下角的【推荐】支持一下我吧: 你还可以【关注我】,我会经常更新博客的 ;
边栏推荐
- Solving the problem of data garbled in sqlserver connection database (Chinese table)
- Some common websites
- InputStream读取文件OutputStream创建文件
- Linux changes the MySQL password after forgetting it
- Shut down THP of Splunk health check
- Wechat web developers, how to learn web development
- 记一次 mysql 主从不同步问题排查
- Deep learning and CV tutorial (14) | image segmentation (FCN, segnet, u-net, pspnet, deeplab, refinenet)
- The wonderful use of XOR (C language)
- (recommended) how many splunks are appropriate? Search head
猜你喜欢

Installation and use of saltstack

数据如何在 Splunk 中老化?

Splunk 健康检查之关闭THP

flink Spark 和 Flink对比

Flink data flow graph, parallelism, operator chain, jobgraph and executiongraph, task and task slot
![Harmonyos application development -- General app interface framework appgeneralframework[app general framework][api v6]](/img/b6/d1d7d0e670af9505a4fabee76211c6.jpg)
Harmonyos application development -- General app interface framework appgeneralframework[app general framework][api v6]

Linux忘记MySQL密码后修改密码

你管这破玩意儿叫 MQ?

flink 控制窗口行为(触发器、移除器、允许延迟、将迟到的数据放入侧输出流)

Solving the problem of data garbled in sqlserver connection database (Chinese table)
随机推荐
Flick controls window behavior (trigger, remover, allow delay, put late data into side output stream)
Sulley fuzzer learning
Adjust the array order so that odd numbers precede even numbers (C language)
1、线程基础知识
12. AQS of abstractqueuedsynchronizer
Error occurred when MySQL imported the database data in pagoda as 0000-00-00 and enum as null
7、CAS
mysql 导入宝塔中数据库data为0000-00-00,enum为null出错
记一次codis内存清理
(recommended) how many splunks are appropriate? Search head
Wechat web developers, how to learn web development
Wireshark packet capturing and debugging RTSP
How can physical stores break through operational difficulties? Take a look at the store operation guide of this physical store applet
Android 11+ configuring sqlserver2014+
leetcode-59. Spiral matrix II JS
纯数据业务的机器打电话进来时回落到了2G/3G
JVM optimization
mysql的主从复制
When I saw the sudden death of a 28 year old employee, I wanted to moisten
你管这破玩意儿叫 MQ?