当前位置:网站首页>. Net delay queue
. Net delay queue
2022-07-05 09:57:00 【Dotnet cross platform】
Introduce
With the characteristics of queues , Add a function of delaying consumption queue message to it , That is, you can specify the point in time at which messages in the queue are consumed .
Use scenarios
There are many applications of delay queue in projects , Especially like e-commerce platforms :
After the order is successful , stay 30 No payment in minutes , Cancel the order automatically
The takeout platform sends out the order notice , After placing the order successfully 60s Push SMS to users .
If the order has been in an open status , Deal with the customs bill in time , And return the inventory
Taobao new merchants have not uploaded product information within a month , Will freeze shops, etc
This introduction comes from other articles
programme
The following example is not encapsulated , So the code is for reference only
Redis Overdue Events
Be careful :
It is not guaranteed to delete and send a notice immediately within the set expiration time , When there is a large amount of data, the delay is relatively large
Delivery is not guaranteed
Forget when you send , Does not include persistence
But for example, some scenes , I don't pay much attention to this time , And there are other measures to ensure two levels , The implementation scheme is relatively simple .
redis since 2.8.0 Later versions provide Keyspace Notifications function , Allow customers to subscribe to Pub / Sub channel , In order to receive influence in some way Redis Dataset events .
To configure
You need to modify the configuration to enable expiration events , For example windows The client , Need modification redis.windows.conf file , stay linux Need to modify redis.conf, The modified content is :
-- uncomment
notify-keyspace-events Ex
-- notes
#notify-keyspace-events ""
Then restart the server , such as windows
.\redis-server.exe .\redis.windows.conf
perhaps linux Use in docker-compose Redeployment redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" # Start and execute the specified redis.conf file
Then use the client to subscribe to events
-- windows
.\redis-cli
-- linux
docker exec -it Container identification redis-cli
psubscribe [email protected]__:expired
Console subscription
Use StackExchange.Redis Component subscription expiration event
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", " Order creation ", TimeSpan.FromSeconds(10));
Console.WriteLine(" Start subscription ");
var subscriber = connectionMultiplexer.GetSubscriber();
// Subscription library 0 Expired notification event
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
Console.ReadLine();
Output results :
key Be overdue channel:[email protected]:expired key:orderno:123456
If you start multiple clients to listen , So many clients can receive expiration events .
WebApi Subscription in
establish RedisListenService Inherited from :BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Subscription library 0 Expired notification event
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}
Register the background service
services.AddHostedService<RedisListenService>();
Enable project , to redis Specify library settings , After expiration, you will receive an expiration notification event .
RabbitMq Delay queue
Version information Rabbitmq edition :3.10.5 Erlang edition :24.3.4.2
To use rabbitmq To delay, you need to install plug-ins (rabbitmq_delayed_message_exchange) Of
The plugin is introduced :https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
Download address :https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
The downloaded plug-in (d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez) Mapped to container plugins Under the table of contents :
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
Into the container
docker exec -it Container name / identification bash
To enable the plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
See if... Is enabled
rabbitmq-plugins list
[E*] and [e*] Means to enable , Then restart the service
rabbitmq-server restart
Then add switches in the management interface, and you can see
Production news
The type of message sent is :x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
// Set up Exchange Queue type
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
// Set the current message as delay queue
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $" The sending time is {DateTime.Now:yyyy-MM-dd HH:mm:ss} The delay time is :{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
// Set the expiration time of the message
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine(" Message sent successfully :" + message);
return "success";
}
News consumption
Consumption news I got a background task (RabbitmqDelayedHostService) Processing
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory// Create a connection factory object
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
// Switch name
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
// Declare as manual confirmation
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($" The message was received at {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
// Manual validation
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}
Register the background task
services.AddHostedService<RabbitmqDelayedHostService>();
Output results
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
Other options
Hangfire Delay queue
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));
Time wheel
Redisson DelayQueue
Time manager
边栏推荐
- Officially launched! Tdengine plug-in enters the official website of grafana
- 90%的人都不懂的泛型,泛型的缺陷和应用场景
- The popularity of B2B2C continues to rise. What are the benefits of enterprises doing multi-user mall system?
- 【C语言】动态内存开辟的使用『malloc』
- 7 月 2 日邀你来TD Hero 线上发布会
- (1) Complete the new construction of station in Niagara vykon N4 supervisor 4.8 software
- idea用debug调试出现com.intellij.rt.debugger.agent.CaptureAgent,导致无法进行调试
- 搞数据库是不是越老越吃香?
- Wechat applet - simple diet recommendation (2)
- SQL learning group by multi table grouping scenario
猜你喜欢
How to use sqlcipher tool to decrypt encrypted database under Windows system
Why don't you recommend using products like mongodb to replace time series databases?
cent7安装Oracle数据库报错
Three-level distribution is becoming more and more popular. How should businesses choose the appropriate three-level distribution system?
What should we pay attention to when entering the community e-commerce business?
Develop and implement movie recommendation applet based on wechat cloud
Kotlin Compose 与原生 嵌套使用
Understand the window query function of tdengine in one article
让AI替企业做复杂决策真的靠谱吗?参与直播,斯坦福博士来分享他的选择|量子位·视点...
TDengine 连接器上线 Google Data Studio 应用商店
随机推荐
【sourceTree配置SSH及使用】
让AI替企业做复杂决策真的靠谱吗?参与直播,斯坦福博士来分享他的选择|量子位·视点...
[app packaging error] to proceed, either fix the issues identified by lint, or modify your build script as follow
Kotlin Compose 多个条目滚动
小程序启动性能优化实践
Evolution of Baidu intelligent applet patrol scheduling scheme
Roll up, break through 35 year old anxiety, and animate the CPU to record the function call process
使用el-upload封装得组件怎么清空已上传附件
H.265编码原理入门
Mobile heterogeneous computing technology GPU OpenCL programming (Advanced)
为什么不建议你用 MongoDB 这类产品替代时序数据库?
Six simple cases of QT
Resolve the horizontal (vertical) sliding conflict between viewpager and WebView
【技术直播】如何用 VSCode 从 0 到 1 改写 TDengine 代码
[NTIRE 2022]Residual Local Feature Network for Efficient Super-Resolution
Vs code problem: the length of long lines can be configured through "editor.maxtokenizationlinelength"
[listening for an attribute in the array]
Windows uses commands to run kotlin
Fluent development: setting method of left and right alignment of child controls in row
Analysis on the wallet system architecture of Baidu trading platform