当前位置:网站首页>C#&. Net to implement a distributed event bus from 0 (1)
C#&. Net to implement a distributed event bus from 0 (1)
2022-06-21 13:42:00 【Guo Mahua】
The event bus can be viewed from the global perspective of the system , Provide a set of publish and subscribe mechanism . Have used ABP Framework development , Or studied DDD, Or understand Rebus Classmate , You should be aware of the many benefits of applying an event bus , And it can bring us the improvement of program design thought .
Source code &Nuget
I learned some open source component libraries , And from their own experience , from 0 A design based on RabbitMq Distributed event bus , You can come to my Azure Devops Warehouse View source code .
And I published it as Nuget package , It can be installed and used by everyone
Install-Package MaH.EventBus -Version 0.7.0Let's start with a simple application :
// Define an order update event
namespace MaH.SimpleLocalEventBus.Console
{
public class UpdateOrderEvent
{
public DateTime Timestamp { get; set; }
public UpdateOrderEvent()
{
Timestamp = DateTime.Now;
}
}
}// Create an order update event handler , Inherit :IEventHandler<UpdateOrderEvent>
namespace MaH.SimpleLocalEventBus.Console
{
public class UpdateOrderHandler : IEventHandler<UpdateOrderEvent>
{
public Task InvokeAsync(UpdateOrderEvent eventData)
{
System.Console.WriteLine($"Update-{eventData.Timestamp}");
return Task.CompletedTask;
}
}
}
using System.Threading.Tasks;
// stay EventBus Register handlers in , When using eventBus.PublishAsync() After publishing an event , The handler will be automatically triggered according to the event type
namespace MaH.SimpleLocalEventBus.Console
{
class Program
{
static async Task Main(string[] args)
{
ILocalEventBus eventBus = new LocalEventBus();
UpdateOrderHandler updateOrderHandler = new UpdateOrderHandler();
eventBus.Subscribe(updateOrderHandler);
await eventBus.PublishAsync(new UpdateOrderEvent());
System.Console.ReadKey();
}
}
}
Local event bus
The above example shows a simple application of the local event bus , The implementation method is also very basic , You can go to MaH.EventBus Of Basic-1.0 Branch to view the specific implementation .
public class LocalEventBus : ILocalEventBus
{
private ConcurrentDictionary<Type, List<IHandler>> handlers;
public LocalEventBus()
{
handlers = new ConcurrentDictionary<Type, List<IHandler>>();
}
public async Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
{
await TriggerEventAsync(GetOrAddHandlers(typeof(TEvent)), eventData);
}
public void Subscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
{
GetOrAddHandlers(typeof(TEvent)).Add(handler);
}
}
We can notice that , The implementation here depends on the registration of specific Handler Instance object , If we want to control handler Life cycle of , Or say , We hope handler It can be downloaded from IOC Get... In the container , What should be done ?
The answer is to introduce the factory model , Use the factory method to obtain handler.
from IOC Get... In the container Handler
We can switch the branch to MaH.EventBus Of Basic-2.0 To see the implementation :
// LocalEventBus No longer save directly handler References to , Instead, save the corresponding IEventHandlerFactory
public class LocalEventBus : ILocalEventBus
{
private ConcurrentDictionary<Type, List<IEventHandlerFactory>> handlers;
public LocalEventBus()
{
handlers = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
}
public void Subscribe(Type eventType, IEventHandlerFactory handlerFactory)
{
GetOrAddHandlers(eventType).Add(handlerFactory);
}
}namespace MaH.SimpleLocalEventBus
{
public interface IEventHandlerFactory
{
IHandler GetHandler();
}
}I made two IEventHandlerFactory The implementation of the , One is SingletonEventHandlerFactory , Direct maintenance for handler Instance reference :
public class SingletonEventHandlerFactory : IEventHandlerFactory
{
private readonly IHandler _handlerInstance;
public SingletonEventHandlerFactory(IHandler handlerInstance)
{
_handlerInstance = handlerInstance;
}
public IHandler GetHandler() => _handlerInstance;
}The other is based on IOC Containers , preservation handler type And for IServiceScopeFactory References to :
public class DefaultEventHandlerFactory : IEventHandlerFactory
{
private readonly Type _handlerType;
private readonly IServiceScopeFactory _serviceScopeFactory;
public DefaultEventHandlerFactory(Type handlerType, IServiceScopeFactory serviceScopeFactory)
{
_handlerType = handlerType;
_serviceScopeFactory = serviceScopeFactory;
}
public IHandler GetHandler()
{
var scope = _serviceScopeFactory.CreateScope();
return (IHandler) scope.ServiceProvider.GetRequiredService(_handlerType);
}
}such , We can use it IOC technology .NET The event bus is used in the project ! Of course , A good event bus framework should also provide an automatic subscription mechanism , There is no implementation here .
public void ConfigureServices(IServiceCollection services)
{
services.AddTransient<WeatherForecastHandler>();
services.AddSingleton<ILocalEventBus>(sp =>
{
var eventBus = new LocalEventBus();
eventBus.Subscribe(typeof(WeatherUpdateEvent), new DefaultEventHandlerFactory(typeof(WeatherForecastHandler), sp.GetRequiredService<IServiceScopeFactory>()));
return eventBus;
});
}Distributed event bus
Simply implementing a local event bus is not enough , Because in microservice design , It often involves communication between multiple systems , For example, the payment system issues an order payment notice , SMS sites need to send payment success messages according to the order information , wait .
therefore , We need to further upgrade the current event bus design , With the help of RabbitMQ, To implement a distributed event bus . Of course , You can also use Kafka, Database etc. .
边栏推荐
- 如何编写测试用例
- 【深入理解TcaplusDB技术】TcaplusDB构造数据
- Chapter IX Cisco ASA application nat
- MySQL - table join and join
- [deeply understand tcapulusdb technology] tmonitor background one click installation
- Using slurm cluster computing node debugger in vscode
- seaborn绘图风格的设置
- Tami dog sharing: the way of property right transaction and the significance of data-based property right transaction market
- Is it safe to open a securities account by downloading the app of qiniu business school? Is there a risk?
- Deep understanding of convolution in convolution neural network
猜你喜欢

MySQL - user management

SCCM基于已安装的 APP创建客户端集合并定期推送应用更新

居家辦公初體驗之新得分享| 社區征文

Explanation of vim, makefile and GDB tools

如何编写测试用例

Application configuration management, basic principle analysis
![[deeply understand tcapulusdb technology] tcapulusdb import data](/img/bd/999a0d2020f68b3bcee6b617328dfc.png)
[deeply understand tcapulusdb technology] tcapulusdb import data

MySQL - table constraints

Visualization strategy of Seaborn data overall distribution

5000 word analysis: the way of container security attack and defense in actual combat scenarios
随机推荐
Generalized Focal Loss: Learning Qualified and Distributed Bounding Boxes for Dense Object Detection
Are you still using generator to generate crud code of XXX management system? Let's see what I wrote
Babbitt yuancosmos daily must read: wechat may ban a official account for the first time on the grounds of "involving secondary transactions in digital collections", and the new regulations of the pla
居家辦公初體驗之新得分享| 社區征文
微证券开户正规安全吗,怎么操作开户?
Qinglong panel, JD timed task library, script library
Lamp Architecture 3 -- compilation and use of PHP source code
Lamp architecture 5 - MySQL Cluster and master-slave structure
Please, don't use pessimistic locks in high concurrency scenarios!
8 most common SQL error usages
MySQL - view properties
1. memory partition model
MySQL - data type
How to use search engine?
2. data type
实践 DevOps 时,可能面临的六大挑战
[in depth understanding of tcapulusdb technology] tcapulusdb construction data
Collection reference type in JS
【深入理解TcaplusDB技术】TcaplusDB业务数据备份
Map collection traversal, adding, replacing and deleting elements