当前位置:网站首页>.Net之延迟队列
.Net之延迟队列
2022-07-05 09:36:00 【dotNET跨平台】
介绍
具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
使用场景
延迟队列在项目中的应用还是比较多的,尤其像电商类平台:
订单成功后,在30分钟内没有支付,自动取消订单
外卖平台发送订餐通知,下单成功后60s给用户推送短信。
如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
该介绍来自其他文章
方案
下面的例子没有进行封装,所以代码仅供参考
Redis过期事件
注意:
不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大
不保证一定送达
发送即忘策略,不包含持久化
但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。
redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。
配置
需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

-- 取消注释
notify-keyspace-events Ex
-- 注释
#notify-keyspace-events ""
然后重新启动服务器,比如windows
.\redis-server.exe .\redis.windows.conf
或者linux中使用docker-compose重新部署redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件
然后使用客户端订阅事件
-- windows
.\redis-cli
-- linux
docker exec -it 容器标识 redis-cli
psubscribe [email protected]__:expired
控制台订阅
使用StackExchange.Redis组件订阅过期事件
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");
var subscriber = connectionMultiplexer.GetSubscriber();
//订阅库0的过期通知事件
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key过期 channel:{channel} key:{key}");
});
Console.ReadLine();
输出结果:
key过期 channel:[email protected]:expired key:orderno:123456
如果启动多个客户端监听,那么多个客户端都可以收到过期事件。
WebApi中订阅
创建RedisListenService继承自:BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
//订阅库0的过期通知事件
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key过期 channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}
注册该后台服务
services.AddHostedService<RedisListenService>();
启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。
RabbitMq延迟队列
版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2
要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的
插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
进入容器
docker exec -it 容器名称/标识 bash
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看是否启用
rabbitmq-plugins list
[E*]和[e*]表示启用,然后重启服务
rabbitmq-server restart
然后在管理界面添加交换机都可以看到

生产消息
发送的消息类型是:x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP地址
Port = 5672,//端口号
UserName = "admin",//用户账号
Password = "123456",//用户密码
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
//设置Exchange队列类型
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
//设置当前消息为延时队列
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
//设置消息的过期时间
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine("成功发送消息:" + message);
return "success";
}
消费消息
消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "localhost",//IP地址
Port = 5672,//端口号
UserName = "admin",//用户账号
Password = "123456",//用户密码
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
//交换机名称
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
//声明为手动确认
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
//手动确认
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}
注册该后台任务
services.AddHostedService<RabbitmqDelayedHostService>();
输出结果
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
其他方案
Hangfire延迟队列
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));
时间轮
Redisson DelayQueue
计时管理器
边栏推荐
- 单片机原理与接口技术(ESP8266/ESP32)机器人类草稿
- 7 月 2 日邀你来TD Hero 线上发布会
- Flutter development: use safearea
- Tdengine can read and write through dataX, a data synchronization tool
- Develop and implement movie recommendation applet based on wechat cloud
- 基于模板配置的数据可视化平台
- 22-07-04 Xi'an Shanghao housing project experience summary (01)
- Oracle combines multiple rows of data into one row of data
- 【el-table如何禁用】
- How to improve the operation efficiency of intra city distribution
猜你喜欢
Idea debugs com intellij. rt.debugger. agent. Captureagent, which makes debugging impossible
Why does everyone want to do e-commerce? How much do you know about the advantages of online shopping malls?
OpenGL - Lighting
Tdengine can read and write through dataX, a data synchronization tool
Android 隐私沙盒开发者预览版 3: 隐私安全和个性化体验全都要
SMT32H7系列DMA和DMAMUX的一点理解
How to correctly evaluate video image quality
What about wechat mall? 5 tips to clear your mind
22-07-04 Xi'an Shanghao housing project experience summary (01)
[technical live broadcast] how to rewrite tdengine code from 0 to 1 with vscode
随机推荐
High performance spark_ Transformation performance
mysql安装配置以及创建数据库和表
Solve liquibase – waiting for changelog lock Cause database deadlock
SMT32H7系列DMA和DMAMUX的一点理解
Common fault analysis and Countermeasures of using MySQL in go language
Node の MongoDB Driver
【OpenCV 例程200篇】219. 添加数字水印(盲水印)
Community group buying has triggered heated discussion. How does this model work?
Tdengine can read and write through dataX, a data synchronization tool
STM32 simple multi-level menu (array table lookup method)
Android SQLite database encryption
Lepton 无损压缩原理及性能分析
[two objects merged into one object]
Dry goods sorting! How about the development trend of ERP in the manufacturing industry? It's enough to read this article
A keepalived high availability accident made me learn it again
Data visualization platform based on template configuration
Understand the window query function of tdengine in one article
Principle and performance analysis of lepton lossless compression
OpenGL - Model Loading
Why don't you recommend using products like mongodb to replace time series databases?