当前位置:网站首页>.Net之延迟队列
.Net之延迟队列
2022-07-05 09:36:00 【dotNET跨平台】
介绍
具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
使用场景
延迟队列在项目中的应用还是比较多的,尤其像电商类平台:
订单成功后,在30分钟内没有支付,自动取消订单
外卖平台发送订餐通知,下单成功后60s给用户推送短信。
如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
该介绍来自其他文章
方案
下面的例子没有进行封装,所以代码仅供参考
Redis过期事件
注意:
不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大
不保证一定送达
发送即忘策略,不包含持久化
但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。
redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。
配置
需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:

-- 取消注释
notify-keyspace-events Ex
-- 注释
#notify-keyspace-events ""然后重新启动服务器,比如windows
.\redis-server.exe .\redis.windows.conf或者linux中使用docker-compose重新部署redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件然后使用客户端订阅事件
-- windows
.\redis-cli
-- linux
docker exec -it 容器标识 redis-cli
psubscribe [email protected]__:expired控制台订阅
使用StackExchange.Redis组件订阅过期事件
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");
var subscriber = connectionMultiplexer.GetSubscriber();
//订阅库0的过期通知事件
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key过期 channel:{channel} key:{key}");
});
Console.ReadLine();输出结果:
key过期 channel:[email protected]:expired key:orderno:123456
如果启动多个客户端监听,那么多个客户端都可以收到过期事件。
WebApi中订阅
创建RedisListenService继承自:BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
//订阅库0的过期通知事件
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key过期 channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}注册该后台服务
services.AddHostedService<RedisListenService>();启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。
RabbitMq延迟队列
版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2
要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的
插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine进入容器
docker exec -it 容器名称/标识 bash启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange查看是否启用
rabbitmq-plugins list[E*]和[e*]表示启用,然后重启服务
rabbitmq-server restart然后在管理界面添加交换机都可以看到

生产消息
发送的消息类型是:x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP地址
Port = 5672,//端口号
UserName = "admin",//用户账号
Password = "123456",//用户密码
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
//设置Exchange队列类型
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
//设置当前消息为延时队列
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
//设置消息的过期时间
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine("成功发送消息:" + message);
return "success";
}消费消息
消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "localhost",//IP地址
Port = 5672,//端口号
UserName = "admin",//用户账号
Password = "123456",//用户密码
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
//交换机名称
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
//声明为手动确认
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
//手动确认
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}注册该后台任务
services.AddHostedService<RabbitmqDelayedHostService>();输出结果
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
其他方案
Hangfire延迟队列
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));时间轮
Redisson DelayQueue
计时管理器
边栏推荐
- idea用debug调试出现com.intellij.rt.debugger.agent.CaptureAgent,导致无法进行调试
- ThreadLocal source code learning
- Cross process communication Aidl
- cent7安装Oracle数据库报错
- [hungry dynamic table]
- Flutter development: a way to solve the problem of blank space on the top of listview
- How to correctly evaluate video image quality
- TDengine 连接器上线 Google Data Studio 应用商店
- Viewpager pageradapter notifydatasetchanged invalid problem
- H. 265 introduction to coding principles
猜你喜欢

Three-level distribution is becoming more and more popular. How should businesses choose the appropriate three-level distribution system?
![[listening for an attribute in the array]](/img/1f/96eb85ee0af83d601918bcd04e405e.png)
[listening for an attribute in the array]

分布式数据库下子查询和 Join 等复杂 SQL 如何实现?

写入速度提升数十倍,TDengine 在拓斯达智能工厂解决方案上的应用

7 月 2 日邀你来TD Hero 线上发布会

idea用debug调试出现com.intellij.rt.debugger.agent.CaptureAgent,导致无法进行调试

oracle 多行数据合并成一行数据

百度智能小程序巡檢調度方案演進之路

What should we pay attention to when entering the community e-commerce business?

Dry goods sorting! How about the development trend of ERP in the manufacturing industry? It's enough to read this article
随机推荐
Data visualization platform based on template configuration
Unity SKFramework框架(二十二)、Runtime Console 运行时调试工具
【sourceTree配置SSH及使用】
Flutter development: use safearea
Online chain offline integrated chain store e-commerce solution
What about wechat mall? 5 tips to clear your mind
为什么不建议你用 MongoDB 这类产品替代时序数据库?
Fluent development: setting method of left and right alignment of child controls in row
Cross process communication Aidl
百度智能小程序巡检调度方案演进之路
干货整理!ERP在制造业的发展趋势如何,看这一篇就够了
Gradientdrawable get a single color
SMT32H7系列DMA和DMAMUX的一点理解
TDengine可通过数据同步工具 DataX读写
idea用debug调试出现com.intellij.rt.debugger.agent.CaptureAgent,导致无法进行调试
Principle and performance analysis of lepton lossless compression
oracle 多行数据合并成一行数据
一文读懂TDengine的窗口查询功能
Android 隐私沙盒开发者预览版 3: 隐私安全和个性化体验全都要
About getfragmentmanager () and getchildfragmentmanager ()