当前位置:网站首页>Redis 订阅与 Redis Stream
Redis 订阅与 Redis Stream
2022-08-02 01:37:00 【Chon-Wang】
# 前言
一、Redis 发布订阅
Redis
发布订阅是一种消息通信模式包含三个主体:发布者(PUB)、订阅者(SUB)、频道(channel)
三者之间的关系简单举例: 我(订阅者) 在
CSDN
关注(订阅) 了 CSDN官方博客账号(频道),当 CSDN官方博客(发布者) 发送推文时,我(订阅者) 就可以收到 CSDN官方博客账号 的消息提醒。
Redis
客户端可订阅任意数量的频道。消息无法持久化,出现
Redis
宕机、网络断开等情况,消息无法保存订阅关系图
一个频道可被多个客户端订阅
消息推送流程图
1.1 常用命令
# 1. 订阅一个或多个符合给定模式的频道
# 格式: 频道名称
psubscribe test* # 订阅以 test 开头的所有频道
psubscribe te?t # 订阅 tezt 或 teat 等 所有频道
psubscribe te[sa]t # 订阅 test 或 teat 频道,其他的不支持
# 2. 退订所有以给定模式订阅的频道
# 格式: punsubscribe 频道名称
punsubscribe channel
# 3. 订阅单个或多个频道
# 格式: subscribe 频道名称1 频道名称2 [频道名称n]
subscribe testChannel1 testChannel2
# 4. 发送消息到指定频道
# 格式: publish 频道名称 消息内容
publish testChannel1 'Hello CSDN'
# 5. 退订指定频道
# 格式: unsubscribe 频道名称
unsubscribe testChannel1
1.2 演示视频
二、Redis Stream
Redis Stream
主要用于 消息队列(MQ,Message Queue),是Redis 5.0
新增的数据结构。Redis Stream
可实现数据持久化和主从复制功能。Redis Stream
是一个消息链表,每个消息都有唯一的id
和 内容
2.1 消息队列常用命令
常用命令:
xadd
、xdel
、xlen
、xrange
、xrevrange
、xread
、xtrim
、xinfo stream
、xinfo groups
# 1. 添加消息到队列末尾
# 格式: xadd 队列名 消息ID field1 value1 [field... value...]
# 队列名称不存在则会自动创建
# 消息ID 可自定义,但要保证递增 或 * 代表系统自动生成
xadd testStream * user_id 10 user_name Chon
xadd testStream * user_id 11 user_name Leslie
xadd testStream1 * tag_id 1 tag_name Redis
xadd testStream1 * tag_id 2 tag_name PHP
xadd testStream1 * tag_id 3 tag_name Linux
xadd testStream1 * tag_id 4 tag_name MySQL
xadd testStream1 * tag_id 5 tag_name Vue
# 2. 删除消息 - 根据 消息ID 删除
# 格式: xdel 队列名 消息ID [消息ID...]
xdel testStream 1658995835554-0 # 改成自己的 消息ID
# 3. 获取消息队列总条数
# 格式xlen 队列名
xlen testStream1
# 4. 获取所有队列消息
# 格式: xrange 队列名 开始消息ID值 结束消息ID值 [count 数量]
# 开始值: - 表示最小值
# 结束值: + 表示最大值
xrange testStream - + # 获取所有消息
xrange testStream - + count 3 # 获取 3 条消息
# 5. 获取队列消息 - 反转获取,就是最后的成第一个了,倒数第二个成第二个了,哈哈
# 格式: xrevrange 队列名 结束消息ID值 开始消息ID值 [count 数量]
# 结束值: + 表示最大值
# 开始值: - 表示最小值
xrevrange testStream + - # 获取所有反转后的消息
xrevrange testStream + - count 3 # 获取 3 条反转后的消息
# 6. 获取队列消息 - 以阻塞或非阻塞方式
# 格式: xread [count 数量] [block 阻塞毫秒数] streams 队列名 [队列名...] 开始消息ID值 结束消息ID值
# count 可选
# block 可选,阻塞毫秒数不设置,则默认 非阻塞方式
# 0 0 表示获取全部
# 测试之前,在队列中多添加一些数据
xread count 1 streams testStream testStream1 0 0
xread count 1 block 1000 streams testStream testStream1 0 0 # 阻塞 1000 毫秒
# 7. 裁剪消息 - 限制长度
# 格式: xtrim 队列名 maxlen|minid [=|~] 操作值 [LIMIT count]
# 当使用 maxlen 时,操作值为 消息队列最终保留的数量,先进的先删
# 当使用 minid 时,操作值为 消息ID值,删除小于 消息ID值 的消息
xtrim testStream1 maxlen 3 # 先进先删,只保留 3 个消息内容
xtrim testStream1 minid 1658999744682-0 # 删除 消息ID 小于 1658999744682-0 的消息
# 8. 获取队列信息
# 格式: xinfo stream 队列名
xinfo stream testStream1
# 9. 获取 队列 在 所在组 中的信息
# 格式: xinfo groups 队列名
xinfo groups testStream1
2.2 消费者组常用命令
常用消费者组命令:
xgroup create
、xgroup createconsumer
、xreadgroup
、xgroup setid
、xgroup delconsumer
、xgroup destroy
、xgroup help
# 1. 创建消费者组
# 格式: xgroup create 队列名 分组名 id|$ [mkstream]
# id: 有两种表示:
# 表示获取范围,使用 开始消息ID值 结束消息ID值
# 表示从头获取,使用 0
# $: $ 表示从尾部开始,当前队列中的消息全部忽略
# mkstream: 表示如果消息队列不存在则自动创建
xgroup create testStream Group0 $ mkstream # 创建新 testStream0 消息队列并放入组中
xgroup create testStream1 Group1 0 # 获取全部消息放入组中
# 2. 创建使用者 - 创建 消息队列 在 消费组 中的使用者
# 格式: xgroup createconsumer 队列名 组名 使用者名
xgroup createconsumer testStream1 Group1 Chon
# 3. 获取列表消息 - 从消费者组
# 格式: xreadgroup group 分组名 使用者名 [count 数量] [block 阻塞毫秒数] streams 队列名 >
xreadgroup group Group1 Chon count 10 streams testStream1 >
# 4. 重新获取列表消息 - 从消费者组
# 格式: xgroup setid 队列名 分组名 id
# id: 有两种表示:
# 表示获取起始值,使用 开始消息ID值,大于该值才会获取到
# 表示从头获取,使用 0
# 必须是之前已经存放过的分组
xgroup setid testStream1 Group1 0 # 获取全部消息放入组中
xgroup setid testStream1 Group1 1659003269075-0 # 将 消息id 大于所给值的队列消息放入组中
# 5. 删除使用者 - 删除 消息队列 在 消费组 中的使用者
xgroup delconsumer testStream1 Group1 Chon
# 6. 删除消费者组
# 格式: xgroup destroy 队列名 组名
xgroup destroy testStream1 Group1
# 7. 查看分组命令帮助
xgroup help
边栏推荐
- "Introduction to Natural Language Processing Practice" Question Answering Robot Based on Knowledge Graph
- 有效进行自动化测试,这几个软件测试工具一定要收藏好!!!
- The characteristics and principle of typescript29 - enumeration type
- 滴滴秋招提前批正式开始,现在投递免笔试
- Named parameter implementation of JDBC PreparedStatement
- typescript36-class的构造函数实例方法
- Detailed explanation of fastjson
- TKU记一次单点QPS优化(顺祝ITEYE终于回来了)
- typescript29-枚举类型的特点和原理
- Navicat data shows incomplete resolution
猜你喜欢
随机推荐
html+css+php+mysql实现注册+登录+修改密码(附完整代码)
C语言实验十 函数(二)
Flex布局详解
Test Cases: Four-Step Test Design Approach
hash table
Can‘t connect to MySQL server on ‘localhost3306‘ (10061) 简洁明了的解决方法
canal实现mysql数据同步
IDEA如何运行web程序
Reflex WMS中阶系列6:对一个装货重复run pick会有什么后果?
Kubernetes — Calico
tf.keras.callbacks.EarlyStopping()
百度、百图生科 | HelixFold-Single: 使用蛋白质语言模型作为替代进行无MSA蛋白质结构预测
Kubernetes — 核心资源对象 — 存储
¶Backtop 回到顶部 不生效
Flink_CDC搭建及简单使用
Why is on-chain governance so important, and how will Polkadot Gov 2.0 lead the development of on-chain governance?
Rust P2P Network Application Combat-1 P2P Network Core Concepts and Ping Program
datagrip 报错 “The specified database userpassword combination is rejected...”的解决方法
力扣 1374. 生成每种字符都是奇数个的字符串
YGG Guild Development Plan Season 1 Summary