当前位置:网站首页>. Net delay queue
. Net delay queue
2022-07-05 09:57:00 【Dotnet cross platform】
Introduce
With the characteristics of queues , Add a function of delaying consumption queue message to it , That is, you can specify the point in time at which messages in the queue are consumed .
Use scenarios
There are many applications of delay queue in projects , Especially like e-commerce platforms :
After the order is successful , stay 30 No payment in minutes , Cancel the order automatically
The takeout platform sends out the order notice , After placing the order successfully 60s Push SMS to users .
If the order has been in an open status , Deal with the customs bill in time , And return the inventory
Taobao new merchants have not uploaded product information within a month , Will freeze shops, etc
This introduction comes from other articles
programme
The following example is not encapsulated , So the code is for reference only
Redis Overdue Events
Be careful :
It is not guaranteed to delete and send a notice immediately within the set expiration time , When there is a large amount of data, the delay is relatively large
Delivery is not guaranteed
Forget when you send , Does not include persistence
But for example, some scenes , I don't pay much attention to this time , And there are other measures to ensure two levels , The implementation scheme is relatively simple .
redis since 2.8.0 Later versions provide Keyspace Notifications function , Allow customers to subscribe to Pub / Sub channel , In order to receive influence in some way Redis Dataset events .
To configure
You need to modify the configuration to enable expiration events , For example windows The client , Need modification redis.windows.conf file , stay linux Need to modify redis.conf, The modified content is :

-- uncomment
notify-keyspace-events Ex
-- notes
#notify-keyspace-events ""Then restart the server , such as windows
.\redis-server.exe .\redis.windows.confperhaps linux Use in docker-compose Redeployment redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" # Start and execute the specified redis.conf file Then use the client to subscribe to events
-- windows
.\redis-cli
-- linux
docker exec -it Container identification redis-cli
psubscribe [email protected]__:expiredConsole subscription
Use StackExchange.Redis Component subscription expiration event
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", " Order creation ", TimeSpan.FromSeconds(10));
Console.WriteLine(" Start subscription ");
var subscriber = connectionMultiplexer.GetSubscriber();
// Subscription library 0 Expired notification event
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
Console.ReadLine();Output results :
key Be overdue channel:[email protected]:expired key:orderno:123456
If you start multiple clients to listen , So many clients can receive expiration events .
WebApi Subscription in
establish RedisListenService Inherited from :BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Subscription library 0 Expired notification event
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}Register the background service
services.AddHostedService<RedisListenService>();Enable project , to redis Specify library settings , After expiration, you will receive an expiration notification event .
RabbitMq Delay queue
Version information Rabbitmq edition :3.10.5 Erlang edition :24.3.4.2
To use rabbitmq To delay, you need to install plug-ins (rabbitmq_delayed_message_exchange) Of
The plugin is introduced :https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
Download address :https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
The downloaded plug-in (d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez) Mapped to container plugins Under the table of contents :
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpineInto the container
docker exec -it Container name / identification bashTo enable the plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchangeSee if... Is enabled
rabbitmq-plugins list[E*] and [e*] Means to enable , Then restart the service
rabbitmq-server restartThen add switches in the management interface, and you can see

Production news
The type of message sent is :x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
// Set up Exchange Queue type
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
// Set the current message as delay queue
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $" The sending time is {DateTime.Now:yyyy-MM-dd HH:mm:ss} The delay time is :{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
// Set the expiration time of the message
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine(" Message sent successfully :" + message);
return "success";
}News consumption
Consumption news I got a background task (RabbitmqDelayedHostService) Processing
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory// Create a connection factory object
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
// Switch name
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
// Declare as manual confirmation
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($" The message was received at {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
// Manual validation
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}Register the background task
services.AddHostedService<RabbitmqDelayedHostService>();Output results
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
Other options
Hangfire Delay queue
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));Time wheel
Redisson DelayQueue
Time manager
边栏推荐
- About getfragmentmanager () and getchildfragmentmanager ()
- Theme. AppCompat. Light. Darkactionbar not found
- 初识结构体
- 正式上架!TDengine 插件入驻 Grafana 官网
- 宝塔面板MySQL无法启动
- TDengine 已经支持工业英特尔 边缘洞见软件包
- Solve liquibase – waiting for changelog lock Cause database deadlock
- 解决Navicat激活、注册时候出现No All Pattern Found的问题
- tongweb设置gzip
- MySQL字符类型学习笔记
猜你喜欢

Officially launched! Tdengine plug-in enters the official website of grafana

TDengine 已经支持工业英特尔 边缘洞见软件包

What are the advantages of the live teaching system to improve learning quickly?

Community group buying exploded overnight. How should this new model of e-commerce operate?

How to implement complex SQL such as distributed database sub query and join?

初识结构体

Single chip microcomputer principle and Interface Technology (esp8266/esp32) machine human draft

TDengine ×英特尔边缘洞见软件包 加速传统行业的数字化转型

.Net之延迟队列

Node red series (29): use slider and chart nodes to realize double broken line time series diagram
随机推荐
Vs code problem: the length of long lines can be configured through "editor.maxtokenizationlinelength"
Roll up, break 35 - year - old Anxiety, animation Demonstration CPU recording Function call Process
LeetCode 556. Next bigger element III
【对象数组a与对象数组b取出id不同元素赋值给新的数组】
QT timer realizes dynamic display of pictures
[object array A and object array B take out different elements of ID and assign them to the new array]
Mobile heterogeneous computing technology GPU OpenCL programming (Advanced)
idea用debug调试出现com.intellij.rt.debugger.agent.CaptureAgent,导致无法进行调试
使用el-upload封装得组件怎么清空已上传附件
[app packaging error] to proceed, either fix the issues identified by lint, or modify your build script as follow
The writing speed is increased by dozens of times, and the application of tdengine in tostar intelligent factory solution
Application of data modeling based on wide table
卷起來,突破35歲焦慮,動畫演示CPU記錄函數調用過程
Why does everyone want to do e-commerce? How much do you know about the advantages of online shopping malls?
Dry goods sorting! How about the development trend of ERP in the manufacturing industry? It's enough to read this article
从“化学家”到开发者,从甲骨文到 TDengine,我人生的两次重要抉择
[NTIRE 2022]Residual Local Feature Network for Efficient Super-Resolution
Community group buying exploded overnight. How should this new model of e-commerce operate?
[team PK competition] the task of this week has been opened | question answering challenge to consolidate the knowledge of commodity details
Three-level distribution is becoming more and more popular. How should businesses choose the appropriate three-level distribution system?