当前位置:网站首页>.Net之延迟队列
.Net之延迟队列
2022-07-05 09:36:00 【dotNET跨平台】
介绍
具有队列的特性,再给它附加一个延迟消费队列消息的功能,也就是说可以指定队列中的消息在哪个时间点被消费。
使用场景
延迟队列在项目中的应用还是比较多的,尤其像电商类平台:
订单成功后,在30分钟内没有支付,自动取消订单
外卖平台发送订餐通知,下单成功后60s给用户推送短信。
如果订单一直处于某一个未完结状态时,及时处理关单,并退还库存
淘宝新建商户一个月内还没上传商品信息,将冻结商铺等
该介绍来自其他文章
方案
下面的例子没有进行封装,所以代码仅供参考
Redis过期事件
注意:
不保证在设定的过期时间立即删除并发送通知,数据量大的时候会延迟比较大
不保证一定送达
发送即忘策略,不包含持久化
但是比如有些场景,对这个时间不是那么看重,并且有其他措施双层保障,该实现方案是比较简单。
redis自2.8.0之后版本提供Keyspace Notifications功能,允许客户订阅Pub / Sub频道,以便以某种方式接收影响Redis数据集事件。
配置
需要修改配置启用过期事件,比如在windows客户端中,需要修改redis.windows.conf文件,在linux中需要修改redis.conf,修改内容是:
-- 取消注释
notify-keyspace-events Ex
-- 注释
#notify-keyspace-events ""
然后重新启动服务器,比如windows
.\redis-server.exe .\redis.windows.conf
或者linux中使用docker-compose重新部署redis
redis:
container_name: redis
image: redis
hostname: redis
restart: always
ports:
- "6379:6379"
volumes:
- $PWD/redis/redis.conf:/etc/redis.conf
- /root/common-docker-compose/redis/data:/data
command:
/bin/bash -c "redis-server /etc/redis.conf" #启动执行指定的redis.conf文件
然后使用客户端订阅事件
-- windows
.\redis-cli
-- linux
docker exec -it 容器标识 redis-cli
psubscribe [email protected]__:expired
控制台订阅
使用StackExchange.Redis组件订阅过期事件
var connectionMultiplexer = ConnectionMultiplexer.Connect(_redisConnection);
var db = connectionMultiplexer.GetDatabase(0);
db.StringSet("orderno:123456", "订单创建", TimeSpan.FromSeconds(10));
Console.WriteLine("开始订阅");
var subscriber = connectionMultiplexer.GetSubscriber();
//订阅库0的过期通知事件
subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key过期 channel:{channel} key:{key}");
});
Console.ReadLine();
输出结果:
key过期 channel:[email protected]:expired key:orderno:123456
如果启动多个客户端监听,那么多个客户端都可以收到过期事件。
WebApi中订阅
创建RedisListenService继承自:BackgroundService
public class RedisListenService : BackgroundService
{
private readonly ISubscriber _subscriber;
public RedisListenService(IServiceScopeFactory serviceScopeFactory)
{
using var scope = serviceScopeFactory.CreateScope();
var configuration = scope.ServiceProvider.GetRequiredService<IConfiguration>();
var connectionMultiplexer = ConnectionMultiplexer.Connect(configuration["redis"]);
var db = connectionMultiplexer.GetDatabase(0);
_subscriber = connectionMultiplexer.GetSubscriber();
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
//订阅库0的过期通知事件
_subscriber.Subscribe("[email protected]__:expired", (channel, key) =>
{
Console.WriteLine($"key过期 channel:{channel} key:{key}");
});
return Task.CompletedTask;
}
}
注册该后台服务
services.AddHostedService<RedisListenService>();
启用项目,给redis指定库设置值,等过期后会接收到过期通知事件。
RabbitMq延迟队列
版本信息 Rabbitmq版本:3.10.5 Erlang版本:24.3.4.2
要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的
插件介绍:https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq
下载地址:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下:
docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=customer -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine
进入容器
docker exec -it 容器名称/标识 bash
启用插件
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查看是否启用
rabbitmq-plugins list
[E*]和[e*]表示启用,然后重启服务
rabbitmq-server restart
然后在管理界面添加交换机都可以看到
生产消息
发送的消息类型是:x-delayed-message
[HttpGet("send/delay")]
public string SendDelayedMessage()
{
var factory = new ConnectionFactory()
{
HostName = "localhost",//IP地址
Port = 5672,//端口号
UserName = "admin",//用户账号
Password = "123456",//用户密码
VirtualHost = "customer"
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
var exchangeName = "delay-exchange";
var routingkey = "delay.delay";
var queueName = "delay_queueName";
//设置Exchange队列类型
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
//设置当前消息为延时队列
channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
channel.QueueDeclare(queueName, true, false, false, argMaps);
channel.QueueBind(queueName, exchangeName, routingkey);
var time = 1000 * 5;
var message = $"发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time}";
var body = Encoding.UTF8.GetBytes(message);
var props = channel.CreateBasicProperties();
//设置消息的过期时间
props.Headers = new Dictionary<string, object>()
{
{ "x-delay", time }
};
channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);
Console.WriteLine("成功发送消息:" + message);
return "success";
}
消费消息
消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理
public class RabbitmqDelayedHostService : BackgroundService
{
private readonly IModel _channel;
private readonly IConnection _connection;
public RabbitmqDelayedHostService()
{
var connFactory = new ConnectionFactory//创建连接工厂对象
{
HostName = "localhost",//IP地址
Port = 5672,//端口号
UserName = "admin",//用户账号
Password = "123456",//用户密码
VirtualHost = "customer"
};
_connection = connFactory.CreateConnection();
_channel = _connection.CreateModel();
//交换机名称
var exchangeName = "exchangeDelayed";
var queueName = "delay_queueName";
var routingkey = "delay.delay";
var argMaps = new Dictionary<string, object>()
{
{"x-delayed-type", "topic"}
};
_channel.ExchangeDeclare(exchange: exchangeName, type: "x-delayed-message", true, false, argMaps);
_channel.QueueDeclare(queueName, true, false, false, argMaps);
_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);
//声明为手动确认
_channel.BasicQos(0, 1, false);
}
protected override Task ExecuteAsync(CancellationToken stoppingToken)
{
var queueName = "delay_queueName";
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var routingKey = ea.RoutingKey;
Console.WriteLine($"接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} ");
//手动确认
_channel.BasicAck(ea.DeliveryTag, true);
};
_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);
return Task.CompletedTask;
}
public override void Dispose()
{
_connection.Dispose();
_channel.Dispose();
base.Dispose();
}
}
注册该后台任务
services.AddHostedService<RabbitmqDelayedHostService>();
输出结果
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000
其他方案
Hangfire延迟队列
BackgroundJob.Schedule(
() => Console.WriteLine("Delayed!"),
TimeSpan.FromDays(7));
时间轮
Redisson DelayQueue
计时管理器
边栏推荐
- Flutter development: a way to solve the problem of blank space on the top of listview
- 一文读懂TDengine的窗口查询功能
- [object array A and object array B take out different elements of ID and assign them to the new array]
- 盗版DALL·E成梗图之王?日产5万张图像,挤爆抱抱脸服务器,OpenAI勒令改名
- Unity skframework framework (XXIII), minimap small map tool
- SQL learning group by multi table grouping scenario
- On July 2, I invite you to TD Hero online press conference
- uni-app---uni.navigateTo跳转传参使用
- Gradientdrawable get a single color
- Viewpager pageradapter notifydatasetchanged invalid problem
猜你喜欢
SMT32H7系列DMA和DMAMUX的一点理解
Tdengine connector goes online Google Data Studio app store
LeetCode 31. Next spread
H. 265 introduction to coding principles
百度智能小程序巡检调度方案演进之路
Baidu app's continuous integration practice based on pipeline as code
Idea debugs com intellij. rt.debugger. agent. Captureagent, which makes debugging impossible
Unity skframework framework (XXIII), minimap small map tool
Dry goods sorting! How about the development trend of ERP in the manufacturing industry? It's enough to read this article
Android 隐私沙盒开发者预览版 3: 隐私安全和个性化体验全都要
随机推荐
tongweb设置gzip
[how to disable El table]
Tdengine connector goes online Google Data Studio app store
Develop and implement movie recommendation applet based on wechat cloud
A keepalived high availability accident made me learn it again
[team PK competition] the task of this week has been opened | question answering challenge to consolidate the knowledge of commodity details
Solve liquibase – waiting for changelog lock Cause database deadlock
7 月 2 日邀你来TD Hero 线上发布会
From "chemist" to developer, from Oracle to tdengine, two important choices in my life
使用el-upload封装得组件怎么清空已上传附件
【对象数组的排序】
Android SQLite database encryption
Mobile heterogeneous computing technology GPU OpenCL programming (Advanced)
Vs code problem: the length of long lines can be configured through "editor.maxtokenizationlinelength"
Node-RED系列(二九):使用slider与chart节点来实现双折线时间序列图
idea用debug调试出现com.intellij.rt.debugger.agent.CaptureAgent,导致无法进行调试
写入速度提升数十倍,TDengine 在拓斯达智能工厂解决方案上的应用
Community group buying has triggered heated discussion. How does this model work?
Uncover the practice of Baidu intelligent testing in the field of automatic test execution
Are databases more popular as they get older?