Introduce

With the characteristics of queues , Add a function of delaying consumption queue message to it , That is, you can specify the point in time at which messages in the queue are consumed .

Use scenarios

The application of delay queue in the project is still more , Especially like e-commerce platforms :

  1. After the order is successful , stay 30 No payment in minutes , Cancel the order automatically
  2. The takeout platform sends out the order notice , After placing the order successfully 60s Push SMS to users .
  3. If the order has been in an open status , Deal with the customs bill in time , And return the inventory
  4. Taobao new merchants have not uploaded product information within a month , Will freeze shops, etc

This introduction comes from other articles

programme

The following example is not encapsulated , So the code is for reference only

Redis Overdue Events

Be careful :

It is not guaranteed to delete and send a notice immediately within the set expiration time , When there is a large amount of data, the delay is relatively large

Delivery is not guaranteed

Forget when you send , Does not include persistence

But for example, some scenes , I don't pay much attention to this time , And there are other measures to ensure two levels , The implementation scheme is relatively simple .

redis since 2.8.0 Later versions provide Keyspace Notifications function , Allow customers to subscribe to Pub / Sub channel , In order to receive influence in some way Redis Dataset events .

To configure

You need to modify the configuration to enable expiration events , For example windows The client , Need modification redis.windows.conf file , stay linux Need to modify redis.conf, The modified content is :

--  uncomment 
notify-keyspace-events Ex -- notes
#notify-keyspace-events ""

Then restart the server , such as windows

 .\redis-server.exe  .\redis.windows.conf

perhaps linux Use in docker-compose Redeployment redis

  redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" # Start and execute the specified redis.conf file

Then use the client to subscribe to events

-- windows
.\redis-cli -- linux
docker exec -it Container identification redis-cli psubscribe [email protected]__:expired

Console subscription

Use StackExchange.Redis Component subscription expiration event

var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0); db.StringSet("orderno:123456", " Order creation ", TimeSpan.FromSeconds(10));
Console.WriteLine(" Start subscription "); var subscriber = connectionMultiplexer.GetSubscriber(); // Subscription library 0 Expired notification event
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
}); Console.ReadLine();

Output results :

key Be overdue channel:[email protected]:expired key:orderno:123456

If you start multiple clients to listen , So many clients can receive expiration events .

WebApi Subscription in

establish RedisListenService Inherited from :BackgroundService

public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber; public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>(); var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
} protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Subscription library 0 Expired notification event
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
}); return Task.CompletedTask;
}
}

Register the background service

services.AddHostedService<RedisListenService>();

Enable project , to redis Specify library settings , After expiration, you will receive an expiration notification event .

RabbitMq Delay queue

Version information Rabbitmq edition :3.10.5 Erlang edition :24.3.4.2

To use rabbitmq To delay, you need to install plug-ins (rabbitmq_delayed_message_exchange) Of

The plugin is introduced :https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq

Download address :https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

The downloaded plug-in (d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez) Mapped to container plugins Under the table of contents :

docker run -d --name myrabbit -p 9005:15672 -p 5672:5672  -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez  rabbitmq:3-management-alpine

Into the container

docker exec -it  Container name / identification  bash

To enable the plugin

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

See if... Is enabled

rabbitmq-plugins list

[E] and [e] Means to enable , Then restart the service

rabbitmq-server restart

Then add switches in the management interface, and you can see

Production news

The type of message sent is :x-delayed-message

[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel(); var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName"; // Set up Exchange Queue type
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
// Set the current message as delay queue
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey); var time = 1000 * 5;
var message = $" The sending time is {DateTime.Now:yyyy-MM-dd HH:mm:ss} The delay time is :{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
// Set the expiration time of the message
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine(" Message sent successfully :" + message); return "success";
}

News consumption

Consumption news I got a background task (RabbitmqDelayedHostService) Processing

public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection; public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory// Create a connection factory object
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel(); // Switch name
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
// Declare as manual confirmation
_channel.BasicQos(0, 1, false);
} protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName"; var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($" The message was received at {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} "); // Manual validation
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer); return Task.CompletedTask;
} public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}

Register the background task

services.AddHostedService<RabbitmqDelayedHostService>();

Output results

Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000

Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000

The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000

The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000

The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000

The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000

The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000

The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000

Other options

  • Hangfire Delay queue
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));
  • Time wheel
  • Redisson DelayQueue
  • Time manager

.Net More articles about delay queues

  1. C# Realization rabbitmq Delay queue function

    Recent research rabbitmq, There is such a scenario in the project : When the user has to pay for the order , If exceeded 30 Minutes unpaid , Will close the order . Of course, we can do a timed task , Scan the unpaid orders every time , If the order exceeds the payment time, it will be closed ...

  2. Java Delay queue usage

    Delay queue , First, he is a queue , So the second function is delay , This is delay alignment , The function is to put the task in the delay pair , Only when the delay time is reached can the task be obtained from the delay pair column, otherwise it cannot be obtained …… There are many application scenarios , Such as delay 1 Minutes short ...

  3. Spring Boot( fourteen )RabbitMQ Delay queue

    One . Preface Delay queue usage scenarios :1. Orders not paid on time ,30 Cancel the order after minutes :2. Give intervals to less active users N Push the message in a few days , Increase activity :3. too 1 Minutes for new members , Send registration email, etc . Implementation of delay queue ...

  4. Use netty HashedWheelTimer Build a simple delay queue

    background There is a business in the recent project , You need to process the business after the new tasks are due . Use timed tasks to scan expiration time , Waste resources , And not in real time . Only delay queue processing can be used . DelayQueue The first thought is java Built in delay queue del ...

  5. php Use redis Ordered set of zset Implement delay queue

    Delay queue is a message queue with delay function , Compared to the normal queue , It can consume messages at a specified time . Application scenario of delay queue : 1. New user registration ,10 Minutes later, send email or in station message . 2. After the user orders ,30 Minutes unpaid , Order auto void . I ...

  6. C# RabbitMQ Delay queue function real combat project drill

    One . Demand background When the user places an order on the mall, the payment is made , Let's assume that if 8 Hours without payment , Then the background will automatically change the status of the transaction to order close cancel , At the same time, send an email reminder to the user . So how can our application achieve such a requirement scenario ? Before ...

  7. rabbitmq Delay queue demo

    1. demo Detailed explanation 1.1 Engineering structure : 1.2 pom Definition jar The version the package depends on . The version is very important ,rabbit rely on spring, The two have to be consistent , Otherwise, the report will be wrong : <properties> <sp ...

  8. PHP Delayed order processing : Delay queue ( Not identified )

    PHP Delayed order processing : Delay queue : https://github.com/chenlinzhong/php-delayqueue

  9. Spring RabbitMQ Delay queue

    One . explain Delay message sending may be used in actual business scenarios , For example, the retransmission mechanism when an asynchronous callback fails . RabbitMQ It does not have the function of delaying message queue , But it can go through rabbitmq-delayed-message-excha ...

  10. JUC—— Delay queue

    The biggest feature of the so-called delay queue is that it can automatically leave through the queue , for example : Now some objects are temporarily saved , But it is possible that the collection object is a public object , If some data in it is not in use, we hope it can disappear automatically after the specified time ...

Random recommendation

  1. walk around by The provided App differs from another App with the same version and product ID classification : Sharepoint 2015-07-05 08:14 4 Human reading Comment on (0) Collection

    'm currently developing a SharePoint 2013 application. After a few deployments via Visual Studio, I ...

  2. HDMI Interface and protocol

    Deepen understanding HDMI Interface One .HDMI The working principle of the interface is HDMI Schematic diagram of interface architecture . You can see from the signal source on the left ,HDMI The source of the interface can be any support HDMI Output device , And the access end can also be any with HDMI transport Enter into ...

  3. redis The publish and subscribe model of

    Summary redis Each server Each instance maintains a file that stores the state of the server redisServer structure struct redisServer {     /* Pubsub */     //  Dictionaries , Key for channel , ...

  4. PrintWriter out = response.getWriter() Output Chinese garbled problem

    HttpServletResponse response = ServletActionContext.getResponse();        response.setCharacterEncod ...

  5. see sql Statement execution time / test sql Sentence performance

    The person who wrote the program , It's often necessary to analyze what's written SQL Whether the statement has been optimized , How fast is the response time of the server , It's needed at this time SQL Of STATISTICS Status values to see . By setting STATISTICS We can see the execution SQL At the time of the ...

  6. C# Realization WebSocket signal communication

    This example can be found in web Test on the web side , Let's go straight to the code . In the first NuGet Import “Fleck” package , Need to be .NET Framework 4.5 And above . using System; using System.Co ...

  7. Use spark Integrate kudu do DDL

    spark Yes kudu The creation of a table Definition kudu Your table needs to be divided into 5 A step : 1: Provide table name 2: Provide schema 3: Provide primary key 4: Define important options : for example : Define the partition schema 5: call create Table a ...

  8. 【Linux】【Selenium】 install Chrome and ChromeDriver Configuration of

    from :https://www.cnblogs.com/longronglang/p/8078898.html 1. install chrome sudo apt-get install libxss1 libap ...

  9. front end HTML form Form labels input label type attribute checkbox Checkbox

    Checkbox <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8 ...

  10. leetcode50

    public class Solution { public double MyPow(double x, int n) { return Math.Pow(x, (double)n); } }