当前位置:网站首页>Redis消息队列
Redis消息队列
2022-07-06 05:31:00 【知知之之】
List 实现消息队列
Redis 列表是简单的字符串列表,按照插入顺序排序。你可以添加一个元素到列表的头部(左边)或者尾部(右边)。
所以常用来做异步队列使用。将需要延后处理的任务结构体序列化成字符串塞进 Redis 的列表,另一个线程从这个列表中轮询数据进行处理。
即时消费问题
通过 LPUSH,RPOP 这样的方式,会存在一个性能风险点,就是消费者如果想要及时的处理数据,就要在程序中写个类似 while(true) 这样的逻辑,不停的去调用 RPOP 或 LPOP 命令,这就会给消费者程序带来些不必要的性能损失。
所以,Redis 还提供了 BLPOP、BRPOP 这种阻塞式读取的命令(带 B-Bloking的都是阻塞式),客户端在没有读到队列数据时,自动阻塞,直到有新的数据写入队列,再开始读取新数据。这种方式就节省了不必要的 CPU 开销。
- LPUSH、BRPOP 左进右阻塞出
- RPUSH、BLPOP 右进左阻塞出
如果将超时时间设置为 0 时,即可无限等待,直到弹出消息
因为 Redis 单线程的特点,所以在消费数据时,同一个消息会不会同时被多个 consumer 消费掉,但是需要我们考虑消费不成功的情况。
ack 机制
List 队列中的消息一经发送出去,便从队列里删除。如果由于网络原因消费者没有收到消息,或者消费者在处理这条消息的过程中崩溃了,就再也无法还原出这条消息。究其原因,就是缺少消息确认机制。
为了保证消息的可靠性,消息队列都会有完善的消息确认机制(Acknowledge),即消费者向队列报告消息已收到或已处理的机制。
RPOPLPUSH、BRPOPLPUSH (阻塞)从一个 list 中获取消息的同时把这条消息复制到另一个 list 里(可以当做备份),而且这个过程是原子的。
这样我们就可以在业务流程安全结束后,再删除队列元素,实现消息确认机制。
订阅与发布实现消息队列
"发布/订阅"模式包含两种角色,分别是发布者和订阅者。订阅者可以订阅一个或者多个频道(channel),而发布者可以向指定的频道(channel)发送消息,所有订阅此频道的订阅者都会收到此消息。
Redis 通过 PUBLISH 、 SUBSCRIBE 等命令实现了订阅与发布,Redis 发布订阅 (pub/sub) 有个缺点就是消息无法持久化,如果出现网络断开、Redis 宕机等,消息就会被丢弃。而且也没有 Ack 机制来保证数据的可靠性,假设一个消费者都没有,那消息就直接被丢弃了。
Streams 实现消息队列
Redis 5.0 版本新增了一个更强大的数据结构——Stream。它提供了消息的持久化和主备复制功能,可以让任何客户端访问任何时刻的数据,并且能记住每一个客户端的访问位置,还能保证消息不丢失。
它就像是个仅追加内容的消息链表,把所有加入的消息都串起来,每个消息都有一个唯一的 ID 和对应的内容。而且消息是持久化的。
每个 Stream 都有唯一的名称,它就是 Redis 的 key,在我们首次使用 xadd 指令追加消息时自动创建。
Redis Stream 借鉴了很多 Kafka 的设计。
- Consumer Group:有了消费组的概念,每个消费组状态独立,互不影响,一个消费组可以有多个消费者
- last_delivered_id :每个消费组会有个游标 last_delivered_id 在数组之上往前移动,表示当前消费组已经消费到哪条消息了
- pending_ids :消费者的状态变量,作用是维护消费者的未确认的 id。pending_ids 记录了当前已经被客户端读取的消息,但是还没有 ack。如果客户端没有 ack,这个变量里面的消息 ID 会越来越多,一旦某个消息被 ack,它就开始减少。这个 pending_ids 变量在 Redis 官方被称之为 PEL,也就是 Pending Entries List,这是一个很核心的数据结构,它用来确保客户端至少消费了消息一次,而不会在网络传输的中途丢失了没处理。
Stream 不像 Kafak 那样有分区的概念,如果想实现类似分区的功能,就要在客户端使用一定的策略将消息写到不同的 Stream。
- xgroup create:创建消费者组
- xgreadgroup:读取消费组中的消息
- xack:ack 掉指定消息
按消费组消费
Stream 提供了 xreadgroup 指令可以进行消费组的组内消费,需要提供消费组名称、消费者名称和起始消息 ID。它同 xread 一样,也可以阻塞等待新消息。读到新消息后,对应的消息 ID 就会进入消费者的 PEL(正在处理的消息) 结构里,客户端处理完毕后使用 xack 指令通知服务器,本条消息已经处理完毕,该消息 ID 就会从 PEL 中移除。
Stream 还是不能当做主流的 MQ 来使用的,而且使用案例也比较少,慎用。
边栏推荐
- Web Security (V) what is a session? Why do I need a session?
- The ECU of 21 Audi q5l 45tfsi brushes is upgraded to master special adjustment, and the horsepower is safely and stably increased to 305 horsepower
- 02. Develop data storage of blog project
- Unity gets the width and height of Sprite
- Summary of deep learning tuning tricks
- 04. 项目博客之日志
- HAC集群修改管理员用户密码
- How to use PHP string query function
- 05. 博客项目之安全
- Huawei equipment is configured with OSPF and BFD linkage
猜你喜欢
【OSPF 和 ISIS 在多路访问网络中对掩码的要求】
Easy to understand IIC protocol explanation
图数据库ONgDB Release v-1.0.3
The ECU of 21 Audi q5l 45tfsi brushes is upgraded to master special adjustment, and the horsepower is safely and stably increased to 305 horsepower
Steady, 35K, byte business data analysis post
Implementing fuzzy query with dataframe
Codeforces Round #804 (Div. 2) Editorial(A-B)
The ECU of 21 Audi q5l 45tfsi brushes is upgraded to master special adjustment, and the horsepower is safely and stably increased to 305 horsepower
Unity Vector3. Use and calculation principle of reflect
59. Spiral matrix
随机推荐
Fluent implements a loadingbutton with loading animation
指針經典筆試題
初识CDN
[mask requirements of OSPF and Isis in multi access network]
26file filter anonymous inner class and lambda optimization
UCF (summer team competition II)
应用安全系列之三十七:日志注入
Knowledge points of circular structure
In 2022, we must enter the big factory as soon as possible
js Array 列表 实战使用总结
jdbc使用call调用存储过程报错
【华为机试真题详解】统计射击比赛成绩
HAC cluster modifying administrator user password
02. Develop data storage of blog project
Codeforces Round #804 (Div. 2) Editorial(A-B)
[QNX hypervisor 2.2 user manual]6.3.3 using shared memory (shmem) virtual devices
nacos-高可用seata之TC搭建(02)
HAC集群修改管理员用户密码
Ora-01779: the column corresponding to the non key value saving table cannot be modified
03. Login of development blog project