当前位置:网站首页>. Net delay queue

. Net delay queue

2022-07-04 13:49:00 AZRNG

Introduce

With the characteristics of queues , Add a function of delaying consumption queue message to it , That is, you can specify the point in time at which messages in the queue are consumed .

Use scenarios

The application of delay queue in the project is still more , Especially like e-commerce platforms :

  1. After the order is successful , stay 30 No payment in minutes , Cancel the order automatically
  2. The takeout platform sends out the order notice , After placing the order successfully 60s Push SMS to users .
  3. If the order has been in an open status , Deal with the customs bill in time , And return the inventory
  4. Taobao new merchants have not uploaded product information within a month , Will freeze shops, etc

This introduction comes from other articles

programme

The following example is not encapsulated , So the code is for reference only

Redis Overdue Events

Be careful :

It is not guaranteed to delete and send a notice immediately within the set expiration time , When there is a large amount of data, the delay is relatively large

Delivery is not guaranteed

Forget when you send , Does not include persistence

But for example, some scenes , I don't pay much attention to this time , And there are other measures to ensure two levels , The implementation scheme is relatively simple .

redis since 2.8.0 Later versions provide Keyspace Notifications function , Allow customers to subscribe to Pub / Sub channel , In order to receive influence in some way Redis Dataset events .

To configure

You need to modify the configuration to enable expiration events , For example windows The client , Need modification redis.windows.conf file , stay linux Need to modify redis.conf, The modified content is :

img

--  uncomment 
notify-keyspace-events Ex

--  notes 
#notify-keyspace-events ""

Then restart the server , such as windows

 .\redis-server.exe  .\redis.windows.conf

perhaps linux Use in docker-compose Redeployment redis

  redis:
    container_name: redis
    image: redis
    hostname: redis
    restart: always
    ports: 
      - "6379:6379"
    volumes: 
      - $PWD/redis/redis.conf:/etc/redis.conf
      - /root/common-docker-compose/redis/data:/data
    command: 
      /bin/bash -c "redis-server /etc/redis.conf" # Start and execute the specified redis.conf file 

Then use the client to subscribe to events

-- windows
.\redis-cli
 
-- linux
docker exec -it  Container identification  redis-cli
 
psubscribe [email protected]__:expired

Console subscription

Use StackExchange.Redis Component subscription expiration event

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);

db.StringSet("orderno:123456", " Order creation ", TimeSpan.FromSeconds(10));
Console.WriteLine(" Start subscription ");

var subscriber = connectionMultiplexer.GetSubscriber();

// Subscription library 0 Expired notification event 
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
    Console.WriteLine($"key Be overdue  channel:{channel} key:{key}");
});

Console.ReadLine();

Output results :

key Be overdue channel:[email protected]:expired key:orderno:123456

If you start multiple clients to listen , So many clients can receive expiration events .

WebApi Subscription in

establish RedisListenService Inherited from :BackgroundService

public class RedisListenService : BackgroundService
{
    private readonly ISubscriber _subscriber;

    public RedisListenService(IServiceScopeFactory serviceScopeFactory)
    {
        using var scope = serviceScopeFactory.CreateScope();
        var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();

        var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
        var db = connectionMultiplexer.GetDatabase(0);
        _subscriber = connectionMultiplexer.GetSubscriber();
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        // Subscription library 0 Expired notification event 
        _subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
        {
            Console.WriteLine($"key Be overdue  channel:{channel} key:{key}");
        });

        return Task.CompletedTask;
    }
}

Register the background service

services.AddHostedService<RedisListenService>();

Enable project , to redis Specify library settings , After expiration, you will receive an expiration notification event .

RabbitMq Delay queue

Version information Rabbitmq edition :3.10.5 Erlang edition :24.3.4.2

To use rabbitmq To delay, you need to install plug-ins (rabbitmq_delayed_message_exchange) Of

The plugin is introduced :https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

Download address :https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

The downloaded plug-in (d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez) Mapped to container plugins Under the table of contents :

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine

Into the container

docker exec -it  Container name / identification  bash

To enable the plugin

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

See if... Is enabled

rabbitmq-plugins list

[E] and [e] Means to enable , Then restart the service

rabbitmq-server restart

Then add switches in the management interface, and you can see

img

Production news

The type of message sent is :x-delayed-message

[HttpGet("send/delay")]
public string SendDelayedMessage()
{
    var factory = new ConnectionFactory()
    {
        HostName = "localhost",//IP Address 
        Port = 5672,// Port number 
        UserName = "admin",// The user account 
        Password = "123456",// User password 
        VirtualHost = "customer"
    };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    var exchangeName = "delay-exchange";
    var routingkey = "delay.delay";
    var queueName = "delay_queueName";

    // Set up Exchange Queue type 
    var argMaps = new Dictionary<string, object>()
    {
        {"x-delayed-type", "topic"}
    };
    // Set the current message as delay queue 
    channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
    channel.QueueDeclare(queueName, true, false, false, argMaps);
    channel.QueueBind(queueName, exchangeName, routingkey);

    var time = 1000 * 5;
    var message = $" The sending time is  {DateTime.Now:yyyy-MM-dd HH:mm:ss}  The delay time is :{time}";
    var body = Encoding.UTF8.GetBytes(message);
    var props = channel.CreateBasicProperties();
    // Set the expiration time of the message 
    props.Headers = new Dictionary<string, object>()
            {
                {  "x-delay", time }
            };
    channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
    Console.WriteLine(" Message sent successfully :" + message);

    return "success";
}

News consumption

Consumption news I got a background task (RabbitmqDelayedHostService) Processing

public class RabbitmqDelayedHostService : BackgroundService
{
    private readonly IModel _channel;
    private readonly IConnection _connection;

    public RabbitmqDelayedHostService()
    {
        var connFactory = new ConnectionFactory// Create a connection factory object 
        {
            HostName = "localhost",//IP Address 
            Port = 5672,// Port number 
            UserName = "admin",// The user account 
            Password = "123456",// User password 
            VirtualHost = "customer"
        };
        _connection = connFactory.CreateConnection();
        _channel = _connection.CreateModel();

        // Switch name 
        var exchangeName = "exchangeDelayed";
        var queueName = "delay_queueName";
        var routingkey = "delay.delay";
        var argMaps = new Dictionary<string, object>()
        {
            {"x-delayed-type", "topic"}
        };
        _channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
        _channel.QueueDeclare(queueName, true, false, false, argMaps);
        _channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
        // Declare as manual confirmation 
        _channel.BasicQos(0, 1, false);
    }

    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        var queueName = "delay_queueName";

        var consumer = new EventingBasicConsumer(_channel);
        consumer.Received += (model, ea) =>
        {
            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
            var routingKey = ea.RoutingKey;
            Console.WriteLine($" The message was received at  {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");

            // Manual validation 
            _channel.BasicAck(ea.DeliveryTag, true);
        };
        _channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);

        return Task.CompletedTask;
    }

    public override void Dispose()
    {
        _connection.Dispose();
        _channel.Dispose();
        base.Dispose();
    }
}

Register the background task

services.AddHostedService<RabbitmqDelayedHostService>();

Output results

Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000

The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000

The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000

The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000

The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000

The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000

The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000

Other options

  • Hangfire Delay queue
BackgroundJob.Schedule(
  () => Console.WriteLine("Delayed!"),
   TimeSpan.FromDays(7));
  • Time wheel
  • Redisson DelayQueue
  • Time manager
原网站

版权声明
本文为[AZRNG]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/185/202207041235008712.html