当前位置:网站首页>Redis 订阅与 Redis Stream
Redis 订阅与 Redis Stream
2022-08-02 01:37:00 【Chon-Wang】
# 前言
一、Redis 发布订阅
Redis发布订阅是一种消息通信模式包含三个主体:发布者(PUB)、订阅者(SUB)、频道(channel)
三者之间的关系简单举例: 我(订阅者) 在
CSDN关注(订阅) 了 CSDN官方博客账号(频道),当 CSDN官方博客(发布者) 发送推文时,我(订阅者) 就可以收到 CSDN官方博客账号 的消息提醒。
Redis客户端可订阅任意数量的频道。消息无法持久化,出现
Redis宕机、网络断开等情况,消息无法保存订阅关系图
一个频道可被多个客户端订阅
消息推送流程图
1.1 常用命令
# 1. 订阅一个或多个符合给定模式的频道
# 格式: 频道名称
psubscribe test* # 订阅以 test 开头的所有频道
psubscribe te?t # 订阅 tezt 或 teat 等 所有频道
psubscribe te[sa]t # 订阅 test 或 teat 频道,其他的不支持
# 2. 退订所有以给定模式订阅的频道
# 格式: punsubscribe 频道名称
punsubscribe channel
# 3. 订阅单个或多个频道
# 格式: subscribe 频道名称1 频道名称2 [频道名称n]
subscribe testChannel1 testChannel2
# 4. 发送消息到指定频道
# 格式: publish 频道名称 消息内容
publish testChannel1 'Hello CSDN'
# 5. 退订指定频道
# 格式: unsubscribe 频道名称
unsubscribe testChannel1
1.2 演示视频
二、Redis Stream
Redis Stream主要用于 消息队列(MQ,Message Queue),是Redis 5.0新增的数据结构。Redis Stream可实现数据持久化和主从复制功能。Redis Stream是一个消息链表,每个消息都有唯一的id和 内容
2.1 消息队列常用命令
常用命令:
xadd、xdel、xlen、xrange、xrevrange、xread、xtrim、xinfo stream、xinfo groups
# 1. 添加消息到队列末尾
# 格式: xadd 队列名 消息ID field1 value1 [field... value...]
# 队列名称不存在则会自动创建
# 消息ID 可自定义,但要保证递增 或 * 代表系统自动生成
xadd testStream * user_id 10 user_name Chon
xadd testStream * user_id 11 user_name Leslie
xadd testStream1 * tag_id 1 tag_name Redis
xadd testStream1 * tag_id 2 tag_name PHP
xadd testStream1 * tag_id 3 tag_name Linux
xadd testStream1 * tag_id 4 tag_name MySQL
xadd testStream1 * tag_id 5 tag_name Vue
# 2. 删除消息 - 根据 消息ID 删除
# 格式: xdel 队列名 消息ID [消息ID...]
xdel testStream 1658995835554-0 # 改成自己的 消息ID
# 3. 获取消息队列总条数
# 格式xlen 队列名
xlen testStream1
# 4. 获取所有队列消息
# 格式: xrange 队列名 开始消息ID值 结束消息ID值 [count 数量]
# 开始值: - 表示最小值
# 结束值: + 表示最大值
xrange testStream - + # 获取所有消息
xrange testStream - + count 3 # 获取 3 条消息
# 5. 获取队列消息 - 反转获取,就是最后的成第一个了,倒数第二个成第二个了,哈哈
# 格式: xrevrange 队列名 结束消息ID值 开始消息ID值 [count 数量]
# 结束值: + 表示最大值
# 开始值: - 表示最小值
xrevrange testStream + - # 获取所有反转后的消息
xrevrange testStream + - count 3 # 获取 3 条反转后的消息
# 6. 获取队列消息 - 以阻塞或非阻塞方式
# 格式: xread [count 数量] [block 阻塞毫秒数] streams 队列名 [队列名...] 开始消息ID值 结束消息ID值
# count 可选
# block 可选,阻塞毫秒数不设置,则默认 非阻塞方式
# 0 0 表示获取全部
# 测试之前,在队列中多添加一些数据
xread count 1 streams testStream testStream1 0 0
xread count 1 block 1000 streams testStream testStream1 0 0 # 阻塞 1000 毫秒
# 7. 裁剪消息 - 限制长度
# 格式: xtrim 队列名 maxlen|minid [=|~] 操作值 [LIMIT count]
# 当使用 maxlen 时,操作值为 消息队列最终保留的数量,先进的先删
# 当使用 minid 时,操作值为 消息ID值,删除小于 消息ID值 的消息
xtrim testStream1 maxlen 3 # 先进先删,只保留 3 个消息内容
xtrim testStream1 minid 1658999744682-0 # 删除 消息ID 小于 1658999744682-0 的消息
# 8. 获取队列信息
# 格式: xinfo stream 队列名
xinfo stream testStream1
# 9. 获取 队列 在 所在组 中的信息
# 格式: xinfo groups 队列名
xinfo groups testStream1
2.2 消费者组常用命令
常用消费者组命令:
xgroup create、xgroup createconsumer、xreadgroup、xgroup setid、xgroup delconsumer、xgroup destroy、xgroup help
# 1. 创建消费者组
# 格式: xgroup create 队列名 分组名 id|$ [mkstream]
# id: 有两种表示:
# 表示获取范围,使用 开始消息ID值 结束消息ID值
# 表示从头获取,使用 0
# $: $ 表示从尾部开始,当前队列中的消息全部忽略
# mkstream: 表示如果消息队列不存在则自动创建
xgroup create testStream Group0 $ mkstream # 创建新 testStream0 消息队列并放入组中
xgroup create testStream1 Group1 0 # 获取全部消息放入组中
# 2. 创建使用者 - 创建 消息队列 在 消费组 中的使用者
# 格式: xgroup createconsumer 队列名 组名 使用者名
xgroup createconsumer testStream1 Group1 Chon
# 3. 获取列表消息 - 从消费者组
# 格式: xreadgroup group 分组名 使用者名 [count 数量] [block 阻塞毫秒数] streams 队列名 >
xreadgroup group Group1 Chon count 10 streams testStream1 >
# 4. 重新获取列表消息 - 从消费者组
# 格式: xgroup setid 队列名 分组名 id
# id: 有两种表示:
# 表示获取起始值,使用 开始消息ID值,大于该值才会获取到
# 表示从头获取,使用 0
# 必须是之前已经存放过的分组
xgroup setid testStream1 Group1 0 # 获取全部消息放入组中
xgroup setid testStream1 Group1 1659003269075-0 # 将 消息id 大于所给值的队列消息放入组中
# 5. 删除使用者 - 删除 消息队列 在 消费组 中的使用者
xgroup delconsumer testStream1 Group1 Chon
# 6. 删除消费者组
# 格式: xgroup destroy 队列名 组名
xgroup destroy testStream1 Group1
# 7. 查看分组命令帮助
xgroup help
边栏推荐
- S/4中究竟有多少个模块,你对这些模块了解多少
- Pytorch seq2seq model architecture to achieve English translation tasks
- Typescript31 - any type
- Anti-oversold and high concurrent deduction scheme for e-commerce inventory system
- Constructor instance method of typescript36-class
- Huawei's 5-year female test engineer resigns: what a painful realization...
- 有效进行自动化测试,这几个软件测试工具一定要收藏好!!!
- 电子制造仓储条码管理系统解决方案
- 飞桨开源社区季度报告来啦,你想知道的都在这里
- C语言实验十 函数(二)
猜你喜欢
随机推荐
Can't connect to MySQL server on 'localhost3306' (10061) Simple and clear solution
求大神解答,这种 sql 应该怎么写?
成都openGauss用户组招募啦!
理解分布式系统中的缓存架构(下)
百度、百图生科 | HelixFold-Single: 使用蛋白质语言模型作为替代进行无MSA蛋白质结构预测
【刷题篇】打家劫舍
微信支付软件架构,这也太牛逼了!
typescript30 - any type
typescript37-class的构造函数实例方法继承(extends)
Kubernetes — Flannel
kubernetes之服务发现
Use flex-wrap to wrap lines in flex layout
【ORB_SLAM2】void Frame::AssignFeaturesToGrid()
记录一次数组转集合出现错误的坑点,尽量使用包装类型数组进行转换
NFT到底有哪些实际用途?
信息收集之cms指纹识别
Flink_CDC construction and simple use
信息化和数字化的本质区别是什么?
3 Month Tester Readme: 4 Important Skills That Impacted My Career
Redis cluster mode











