当前位置:网站首页>. Net delay queue
. Net delay queue
2022-07-05 09:57:00 【Dotnet cross platform】
Introduce
With the characteristics of queues , Add a function of delaying consumption queue message to it , That is, you can specify the point in time at which messages in the queue are consumed .
Use scenarios
There are many applications of delay queue in projects , Especially like e-commerce platforms :
After the order is successful , stay 30 No payment in minutes , Cancel the order automatically
The takeout platform sends out the order notice , After placing the order successfully 60s Push SMS to users .
If the order has been in an open status , Deal with the customs bill in time , And return the inventory
Taobao new merchants have not uploaded product information within a month , Will freeze shops, etc
This introduction comes from other articles
programme
The following example is not encapsulated , So the code is for reference only
Redis Overdue Events
Be careful :
It is not guaranteed to delete and send a notice immediately within the set expiration time , When there is a large amount of data, the delay is relatively large
Delivery is not guaranteed
Forget when you send , Does not include persistence
But for example, some scenes , I don't pay much attention to this time , And there are other measures to ensure two levels , The implementation scheme is relatively simple .
redis since 2.8.0 Later versions provide Keyspace Notifications function , Allow customers to subscribe to Pub / Sub channel , In order to receive influence in some way Redis Dataset events .
To configure
You need to modify the configuration to enable expiration events , For example windows The client , Need modification redis.windows.conf file , stay linux Need to modify redis.conf, The modified content is :
-- uncomment
notify-keyspace-events Ex
-- notes
#notify-keyspace-events ""
Then restart the server , such as windows
.\redis-server.exe .\redis.windows.conf
perhaps linux Use in docker-compose Redeployment redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" # Start and execute the specified redis.conf file
Then use the client to subscribe to events
-- windows
.\redis-cli
-- linux
docker exec -it Container identification redis-cli
psubscribe [email protected]__:expired
Console subscription
Use StackExchange.Redis Component subscription expiration event
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", " Order creation ", TimeSpan.FromSeconds(10));
Console.WriteLine(" Start subscription ");
var subscriber = connectionMultiplexer.GetSubscriber();
// Subscription library 0 Expired notification event
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
Console.ReadLine();
Output results :
key Be overdue channel:[email protected]:expired key:orderno:123456
If you start multiple clients to listen , So many clients can receive expiration events .
WebApi Subscription in
establish RedisListenService Inherited from :BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
// Subscription library 0 Expired notification event
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key Be overdue channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}
Register the background service
services.AddHostedService<RedisListenService>();
Enable project , to redis Specify library settings , After expiration, you will receive an expiration notification event .
RabbitMq Delay queue
Version information Rabbitmq edition :3.10.5 Erlang edition :24.3.4.2
To use rabbitmq To delay, you need to install plug-ins (rabbitmq_delayed_message_exchange) Of
The plugin is introduced :https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
Download address :https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
The downloaded plug-in (d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez) Mapped to container plugins Under the table of contents :
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
Into the container
docker exec -it Container name / identification bash
To enable the plugin
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
See if... Is enabled
rabbitmq-plugins list
[E*] and [e*] Means to enable , Then restart the service
rabbitmq-server restart
Then add switches in the management interface, and you can see
Production news
The type of message sent is :x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
// Set up Exchange Queue type
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
// Set the current message as delay queue
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $" The sending time is {DateTime.Now:yyyy-MM-dd HH:mm:ss} The delay time is :{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
// Set the expiration time of the message
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine(" Message sent successfully :" + message);
return "success";
}
News consumption
Consumption news I got a background task (RabbitmqDelayedHostService) Processing
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory// Create a connection factory object
{
HostName = "localhost",//IP Address
Port = 5672,// Port number
UserName = "admin",// The user account
Password = "123456",// User password
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
// Switch name
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
// Declare as manual confirmation
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($" The message was received at {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
// Manual validation
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}
Register the background task
services.AddHostedService<RabbitmqDelayedHostService>();
Output results
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:22 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
Message sent successfully : The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:27,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:22 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
The message was received at 2022-07-02 18:54:28,routingKey:delay.delay message: The sending time is 2022-07-02 18:54:23 The delay time is :5000
Other options
Hangfire Delay queue
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));
Time wheel
Redisson DelayQueue
Time manager
边栏推荐
- Oracle combines multiple rows of data into one row of data
- [NTIRE 2022]Residual Local Feature Network for Efficient Super-Resolution
- How to correctly evaluate video image quality
- [JS sort according to the attributes in the object array]
- Tdengine offline upgrade process
- SQL learning group by multi table grouping scenario
- From "chemist" to developer, from Oracle to tdengine, two important choices in my life
- 代码语言的魅力
- [listening for an attribute in the array]
- Openes version query
猜你喜欢
What are the advantages of the live teaching system to improve learning quickly?
Wechat applet - simple diet recommendation (3)
[NTIRE 2022]Residual Local Feature Network for Efficient Super-Resolution
Oracle combines multiple rows of data into one row of data
美图炒币半年亏了3个亿,华为被曝在俄罗斯扩招,AlphaGo的同类又刷爆一种棋,今日更多大新闻在此...
On July 2, I invite you to TD Hero online press conference
[NTIRE 2022]Residual Local Feature Network for Efficient Super-Resolution
How to implement complex SQL such as distributed database sub query and join?
宝塔面板MySQL无法启动
Common fault analysis and Countermeasures of using MySQL in go language
随机推荐
Theme. AppCompat. Light. Darkactionbar not found
从“化学家”到开发者,从甲骨文到 TDengine,我人生的两次重要抉择
[hungry dynamic table]
What should we pay attention to when developing B2C websites?
Tdengine already supports the industrial Intel edge insight package
mysql安装配置以及创建数据库和表
Windows uses commands to run kotlin
Uncover the practice of Baidu intelligent testing in the field of automatic test execution
oracle和mysql批量Merge对比
The writing speed is increased by dozens of times, and the application of tdengine in tostar intelligent factory solution
单片机原理与接口技术(ESP8266/ESP32)机器人类草稿
Common fault analysis and Countermeasures of using MySQL in go language
SQL learning alter add new field
[object array A and object array B take out different elements of ID and assign them to the new array]
Six simple cases of QT
百度APP 基于Pipeline as Code的持续集成实践
Data visualization platform based on template configuration
The essence of persuasion is to remove obstacles
Wechat applet - simple diet recommendation (3)
Tdengine can read and write through dataX, a data synchronization tool