Introduce
With the characteristics of queues , Add a function of delaying consumption queue message to it , That is, you can specify the point in time at which messages in the queue are consumed .
Use scenarios
The application of delay queue in the project is still more , Especially like e-commerce platforms :
- After the order is successful , stay 30 No payment in minutes , Cancel the order automatically
- The takeout platform sends out the order notice , After placing the order successfully 60s Push SMS to users .
- If the order has been in an open status , Deal with the customs bill in time , And return the inventory
- Taobao new merchants have not uploaded product information within a month , Will freeze shops, etc
This introduction comes from other articles
programme
The following example is not encapsulated , So the code is for reference only
Redis Overdue Events
Be careful :
It is not guaranteed to delete and send a notice immediately within the set expiration time , When there is a large amount of data, the delay is relatively large
Delivery is not guaranteed
Forget when you send , Does not include persistence
But for example, some scenes , I don't pay much attention to this time , And there are other measures to ensure two levels , The implementation scheme is relatively simple .
redis since 2.8.0 Later versions provide Keyspace Notifications function , Allow customers to subscribe to Pub / Sub channel , In order to receive influence in some way Redis Dataset events .
To configure
You need to modify the configuration to enable expiration events , For example windows The client , Need modification redis.windows.conf file , stay linux Need to modify redis.conf, The modified content is :
-- uncomment
notify-keyspace-events Ex
-- notes
#notify-keyspace-events ""
Then restart the server , such as windows
.\redis-server.exe .\redis.windows.conf
perhaps linux Use in docker-compose Redeployment redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" # Start and execute the specified redis.conf file
Then use the client to subscribe to events
-- windows
.\redis-cli
-- linux
docker exec -it Container identification redis-cli
psubscribe [email protected]__:expired
Console subscription
Use StackExchange.Redis Component subscription expiration event
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", " Order creation ", TimeSpan.FromSeconds(10));
Console.WriteLine(" Start subscription ");
var subscriber = connectionMultiplexer.GetSubscriber();
// Subscription library 0 Expired notification event
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
Console.ReadLine();
Output results :
key Be overdue channel:[email protected]:expired key:orderno:123456
If you start multiple clients to listen , So many clients can receive expiration events .
WebApi Subscription in
establish RedisListenService Inherited from :BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Subscription library 0 Expired notification event
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}
Register the background service
services.AddHostedService<RedisListenService>();
Enable project , to redis Specify library settings , After expiration, you will receive an expiration notification event .
RabbitMq Delay queue
Version information Rabbitmq edition :3.10.5 Erlang edition :24.3.4.2
To use rabbitmq To delay, you need to install plug-ins (rabbitmq_delayed_message_exchange) Of
The plugin is introduced :https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
Download address :https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
The downloaded plug-in (d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez) Mapped to container plugins Under the table of contents :
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
Into the container
docker exec -it Container name / identification bash
To enable the plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
See if... Is enabled
rabbitmq-plugins list
[E] and [e] Means to enable , Then restart the service
rabbitmq-server restart
Then add switches in the management interface, and you can see
Production news
The type of message sent is :x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
// Set up Exchange Queue type
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
// Set the current message as delay queue
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $" The sending time is {DateTime.Now:yyyy-MM-dd HH:mm:ss} The delay time is :{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
// Set the expiration time of the message
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine(" Message sent successfully :" + message);
return "success";
}
News consumption
Consumption news I got a background task (RabbitmqDelayedHostService) Processing
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory// Create a connection factory object
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
// Switch name
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
// Declare as manual confirmation
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($" The message was received at {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
// Manual validation
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}
Register the background task
services.AddHostedService<RabbitmqDelayedHostService>();
Output results
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
Other options
- Hangfire Delay queue
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));
- Time wheel
- Redisson DelayQueue
- Time manager
.Net More articles about delay queues
- C# Realization rabbitmq Delay queue function
Recent research rabbitmq, There is such a scenario in the project : When the user has to pay for the order , If exceeded 30 Minutes unpaid , Will close the order . Of course, we can do a timed task , Scan the unpaid orders every time , If the order exceeds the payment time, it will be closed ...
- Java Delay queue usage
Delay queue , First, he is a queue , So the second function is delay , This is delay alignment , The function is to put the task in the delay pair , Only when the delay time is reached can the task be obtained from the delay pair column, otherwise it cannot be obtained …… There are many application scenarios , Such as delay 1 Minutes short ...
- Spring Boot( fourteen )RabbitMQ Delay queue
One . Preface Delay queue usage scenarios :1. Orders not paid on time ,30 Cancel the order after minutes :2. Give intervals to less active users N Push the message in a few days , Increase activity :3. too 1 Minutes for new members , Send registration email, etc . Implementation of delay queue ...
- Use netty HashedWheelTimer Build a simple delay queue
background There is a business in the recent project , You need to process the business after the new tasks are due . Use timed tasks to scan expiration time , Waste resources , And not in real time . Only delay queue processing can be used . DelayQueue The first thought is java Built in delay queue del ...
- php Use redis Ordered set of zset Implement delay queue
Delay queue is a message queue with delay function , Compared to the normal queue , It can consume messages at a specified time . Application scenario of delay queue : 1. New user registration ,10 Minutes later, send email or in station message . 2. After the user orders ,30 Minutes unpaid , Order auto void . I ...
- C# RabbitMQ Delay queue function real combat project drill
One . Demand background When the user places an order on the mall, the payment is made , Let's assume that if 8 Hours without payment , Then the background will automatically change the status of the transaction to order close cancel , At the same time, send an email reminder to the user . So how can our application achieve such a requirement scenario ? Before ...
- rabbitmq Delay queue demo
1. demo Detailed explanation 1.1 Engineering structure : 1.2 pom Definition jar The version the package depends on . The version is very important ,rabbit rely on spring, The two have to be consistent , Otherwise, the report will be wrong : <properties> <sp ...
- PHP Delayed order processing : Delay queue ( Not identified )
PHP Delayed order processing : Delay queue : https://github.com/chenlinzhong/php-delayqueue
- Spring RabbitMQ Delay queue
One . explain Delay message sending may be used in actual business scenarios , For example, the retransmission mechanism when an asynchronous callback fails . RabbitMQ It does not have the function of delaying message queue , But it can go through rabbitmq-delayed-message-excha ...
- JUC—— Delay queue
The biggest feature of the so-called delay queue is that it can automatically leave through the queue , for example : Now some objects are temporarily saved , But it is possible that the collection object is a public object , If some data in it is not in use, we hope it can disappear automatically after the specified time ...
Random recommendation
- walk around by The provided App differs from another App with the same version and product ID classification : Sharepoint 2015-07-05 08:14 4 Human reading Comment on (0) Collection
'm currently developing a SharePoint 2013 application. After a few deployments via Visual Studio, I ...
- HDMI Interface and protocol
Deepen understanding HDMI Interface One .HDMI The working principle of the interface is HDMI Schematic diagram of interface architecture . You can see from the signal source on the left ,HDMI The source of the interface can be any support HDMI Output device , And the access end can also be any with HDMI transport Enter into ...
- redis The publish and subscribe model of
Summary redis Each server Each instance maintains a file that stores the state of the server redisServer structure struct redisServer { /* Pubsub */ // Dictionaries , Key for channel , ...
- PrintWriter out = response.getWriter() Output Chinese garbled problem
HttpServletResponse response = ServletActionContext.getResponse(); response.setCharacterEncod ...
- see sql Statement execution time / test sql Sentence performance
The person who wrote the program , It's often necessary to analyze what's written SQL Whether the statement has been optimized , How fast is the response time of the server , It's needed at this time SQL Of STATISTICS Status values to see . By setting STATISTICS We can see the execution SQL At the time of the ...
- C# Realization WebSocket signal communication
This example can be found in web Test on the web side , Let's go straight to the code . In the first NuGet Import “Fleck” package , Need to be .NET Framework 4.5 And above . using System; using System.Co ...
- Use spark Integrate kudu do DDL
spark Yes kudu The creation of a table Definition kudu Your table needs to be divided into 5 A step : 1: Provide table name 2: Provide schema 3: Provide primary key 4: Define important options : for example : Define the partition schema 5: call create Table a ...
- 【Linux】【Selenium】 install Chrome and ChromeDriver Configuration of
from :https://www.cnblogs.com/longronglang/p/8078898.html 1. install chrome sudo apt-get install libxss1 libap ...
- front end HTML form Form labels input label type attribute checkbox Checkbox
Checkbox <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8 ...
- leetcode50
public class Solution { public double MyPow(double x, int n) { return Math.Pow(x, (double)n); } }