当前位置:网站首页>C#&. Net to implement a distributed event bus from 0 (1)

C#&. Net to implement a distributed event bus from 0 (1)

2022-06-21 13:42:00 Guo Mahua

The event bus can be viewed from the global perspective of the system , Provide a set of publish and subscribe mechanism . Have used ABP Framework development , Or studied DDD, Or understand Rebus Classmate , You should be aware of the many benefits of applying an event bus , And it can bring us the improvement of program design thought .

Source code &Nuget

I learned some open source component libraries , And from their own experience , from 0 A design based on RabbitMq Distributed event bus , You can come to my Azure Devops Warehouse View source code .

And I published it as Nuget package , It can be installed and used by everyone

Install-Package MaH.EventBus -Version 0.7.0

Let's start with a simple application :

// Define an order update event 
namespace MaH.SimpleLocalEventBus.Console
{
    public class UpdateOrderEvent
    {
        public DateTime Timestamp { get; set; }

        public UpdateOrderEvent()
        {
            Timestamp = DateTime.Now;
        }
    }
}
// Create an order update event handler ,  Inherit :IEventHandler<UpdateOrderEvent>
namespace MaH.SimpleLocalEventBus.Console
{
    public class UpdateOrderHandler : IEventHandler<UpdateOrderEvent>
    {
        public Task InvokeAsync(UpdateOrderEvent eventData)
        {
            System.Console.WriteLine($"Update-{eventData.Timestamp}");

            return Task.CompletedTask;
        }
    }
}
using System.Threading.Tasks;
// stay EventBus Register handlers in , When using eventBus.PublishAsync() After publishing an event , The handler will be automatically triggered according to the event type 
namespace MaH.SimpleLocalEventBus.Console
{
    class Program
    {
        static async Task Main(string[] args)
        {
            ILocalEventBus eventBus = new LocalEventBus();

            UpdateOrderHandler updateOrderHandler = new UpdateOrderHandler();

            eventBus.Subscribe(updateOrderHandler);

            await eventBus.PublishAsync(new UpdateOrderEvent());

            System.Console.ReadKey();
        }
    }
}

Local event bus

The above example shows a simple application of the local event bus , The implementation method is also very basic , You can go to MaH.EventBus Of Basic-1.0  Branch to view the specific implementation .

 public class LocalEventBus : ILocalEventBus
    {
        private ConcurrentDictionary<Type, List<IHandler>> handlers;

        public LocalEventBus()
        {
            handlers = new ConcurrentDictionary<Type, List<IHandler>>();
        }
        public async Task PublishAsync<TEvent>(TEvent eventData) where TEvent : class
        {
            await TriggerEventAsync(GetOrAddHandlers(typeof(TEvent)), eventData);
        }

        public void Subscribe<TEvent>(IEventHandler<TEvent> handler) where TEvent : class
        {
            GetOrAddHandlers(typeof(TEvent)).Add(handler);
        }
}

 

We can notice that , The implementation here depends on the registration of specific Handler Instance object , If we want to control handler Life cycle of , Or say , We hope handler It can be downloaded from IOC Get... In the container , What should be done ?

The answer is to introduce the factory model , Use the factory method to obtain handler.

from IOC Get... In the container Handler

We can switch the branch to  MaH.EventBus Of Basic-2.0 To see the implementation :

// LocalEventBus No longer save directly handler References to , Instead, save the corresponding IEventHandlerFactory
public class LocalEventBus : ILocalEventBus
    {
        private ConcurrentDictionary<Type, List<IEventHandlerFactory>> handlers;

        public LocalEventBus()
        {
            handlers = new ConcurrentDictionary<Type, List<IEventHandlerFactory>>();
        }
        
        public void Subscribe(Type eventType, IEventHandlerFactory handlerFactory)
        {
            GetOrAddHandlers(eventType).Add(handlerFactory);
        }
    }
namespace MaH.SimpleLocalEventBus
{
    public interface IEventHandlerFactory
    {
        IHandler GetHandler();
    }
}

I made two IEventHandlerFactory The implementation of the , One is SingletonEventHandlerFactory , Direct maintenance for handler Instance reference :

 public class SingletonEventHandlerFactory : IEventHandlerFactory
    {
        private readonly IHandler _handlerInstance;

        public SingletonEventHandlerFactory(IHandler handlerInstance)
        {
            _handlerInstance = handlerInstance;
        }

        public IHandler GetHandler() => _handlerInstance;
    }

The other is based on IOC Containers , preservation handler type And for IServiceScopeFactory References to :

public class DefaultEventHandlerFactory : IEventHandlerFactory
    {
        private readonly Type _handlerType;

        private readonly IServiceScopeFactory _serviceScopeFactory;

        public DefaultEventHandlerFactory(Type handlerType, IServiceScopeFactory serviceScopeFactory)
        {
            _handlerType = handlerType;
            _serviceScopeFactory = serviceScopeFactory;
        }

        public IHandler GetHandler()
        {
            var scope = _serviceScopeFactory.CreateScope();
            return (IHandler) scope.ServiceProvider.GetRequiredService(_handlerType);
        }
    }

such , We can use it IOC technology .NET The event bus is used in the project ! Of course , A good event bus framework should also provide an automatic subscription mechanism , There is no implementation here .

 public void ConfigureServices(IServiceCollection services)
        {
            services.AddTransient<WeatherForecastHandler>();
            services.AddSingleton<ILocalEventBus>(sp =>
            {
                var eventBus = new LocalEventBus();
                eventBus.Subscribe(typeof(WeatherUpdateEvent), new DefaultEventHandlerFactory(typeof(WeatherForecastHandler), sp.GetRequiredService<IServiceScopeFactory>()));
                return eventBus;
            });
        }

Distributed event bus

Simply implementing a local event bus is not enough , Because in microservice design , It often involves communication between multiple systems , For example, the payment system issues an order payment notice , SMS sites need to send payment success messages according to the order information , wait .

therefore , We need to further upgrade the current event bus design , With the help of RabbitMQ, To implement a distributed event bus . Of course , You can also use Kafka, Database etc. .

原网站

版权声明
本文为[Guo Mahua]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/02/202202221433592280.html