当前位置:网站首页>Redis 发布/订阅
Redis 发布/订阅
2022-07-30 05:43:00 【若明天不见】
Redis实现了发布/订阅功能,开发者可通过该轻量级的功能快速进行场景应用。
SUBSCRIBE、UNSUBSCRIBE和PUBLISH实现了发布/订阅消息传递范式,发布者和订阅者的这种解耦可以实现更大的可扩展性和更动态的网络拓扑。
注:Redis本身是不存储消息体信息。若实际生产环境在消费的时候,网络波动导致其中一个消费者挂掉了一段时间,那么当它重新连接上的时候,中间这一段时间产生的消息则不会被该消费者消费。
Pub/Sub
推送消息的格式
消息是包含三个元素的数组回复。
第一个元素是消息类型:
subscribe: 表示我们成功订阅了作为回复中第二个元素给出的频道。第三个参数代表我们当前订阅的频道数量。unsubscribe: 表示我们成功取消订阅作为回复中第二个元素给出的频道。第三个参数代表我们当前订阅的频道数量。当最后一个参数为零时,我们不再订阅任何频道,客户端可以发出任何类型的Redis命令,因为我们处于Pub/Sub状态之外。message:它是由于另一个客户端发出的PUBLISH命令而收到的消息。第二个元素是发起通道的名称,第三个参数是实际的消息负载(payload)。
# 订阅 msg 和 chat_room 两个频道
# 1 - 6 行是执行 subscribe 之后的反馈信息
# 第 7 - 9 行才是接收到的第一条信息
# 第 10 - 12 行是第二条
redis> subscribe msg chat_room
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 返回值的类型:显示订阅成功
2) "msg" # 订阅的频道名字
3) (integer) 1 # 目前已订阅的频道数量
1) "subscribe"
2) "chat_room"
3) (integer) 2
1) "message" # 返回值的类型:信息
2) "msg" # 来源(从那个频道发送过来)
3) "hello moto" # 信息内容
1) "message"
2) "chat_room"
3) "testing...haha"
数据库和范围
Pub/Sub与密钥空间(key space)无关。它设计为不会被在任何级别的密钥空间干扰,包括数据库编号。
在db 10上发布,将被db 1上的订阅者听到。
如果您需要某种范围的范围,请在通道前加上环境名称(测试、预发布、生产…)。
匹配模式和频道订阅的消息
如果客户端订阅了多个与已发布消息匹配的模式,或者订阅了与该消息匹配的模式和通道,则该客户端可能会多次收到一条消息。如下例所示:
SUBSCRIBE foo
PSUBSCRIBE f*
在上面的例子中,如果一条消息被发送到channel foo,客户端将收到两条消息:一条是type message,一条是type pmessage。
存储结构
redisServer.pubsub_patterns属性是一个链表,链表中保存着所有和模式相关的信息。链表中的每个节点都包含一个redis.h/pubsubPattern结构
struct redisServer {
// ...
list *pubsub_patterns;
// ...
};
typedef struct pubsubPattern {
redisClient *client;
robj *pattern;
} pubsubPattern;
client属性保存着订阅模式的客户端,而pattern属性则保存着被订阅的模式。
每当调用
PSUBSCRIBE命令订阅一个模式时,程序就创建一个包含客户端信息和被订阅模式的pubsubPattern结构, 并将该结构添加到redisServer.pubsub_patterns链表中。
作为例子,下图展示了一个包含两个模式的pubsub_patterns链表, 其中client123和client256都正在订阅tweet.shop.*模式:
如果这时客户端client10086执行PSUBSCRIBE broadcast.list.*, 那么pubsub_patterns链表将被更新成这样:
通过遍历整个
pubsub_patterns链表,程序可以检查所有正在被订阅的模式,以及订阅这些模式的客户端。
代码示例
bean注册
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({
RedisAutoConfiguration.class })
@AutoConfigureBefore(RedissonAutoConfiguration.class)
public class MyRedisAutoConfiguration {
@Bean
public NameUpdateSubscriber nameUpdateSubscriber(){
return new NameUpdateSubscriber();
}
@Bean
public RedisMessageListenerContainer container(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory, NameUpdateSubscriber nameUpdateSubscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//配置要订阅的订阅项
container.addMessageListener(nameUpdateSubscriber, new PatternTopic(RedisConstant.NAME_UPDATE_CHANNEL));
return container;
}
}
推送消息
public void delNameCache(Long id) {
Boolean delete = stringRedisTemplate.delete(getCacheKey(id));
if (Objects.nonNull(delete) && delete) {
stringRedisTemplate.convertAndSend(RedisConstant.NAME_UPDATE_CHANNEL, String.valueOf(id));
}
}
消费监听
public class GuildNameUpdateSubscriber extends MessageListenerAdapter {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private NameRepository nameRepository;
public NameUpdateSubscriber() {
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String msg = redisTemplate.getStringSerializer().deserialize(body);
String topic = redisTemplate.getStringSerializer().deserialize(channel);
log.info("监听到channel为 {} 的消息:{}", topic, msg);
if (Objects.equals(topic, RedisConstant.NAME_UPDATE_CHANNEL) && StringUtils.isNotBlank(msg)) {
nameRepository.refreshCache(Long.parseLong(msg));
}
}
}
参考资料:
边栏推荐
- misc-log analysis of CTF
- npm run serve starts error npm ERR Missing script "serve"
- sql concat() function
- sql中 exists的用法
- JVM Learning (2) Garbage Collector
- CTF之misc-流量分析
- jsonpath
- vulnhub-XXE ctf security question
- Communication middleware Fast DDS basic concepts and communication examples
- 在不同的服务器上基于docker部署redis主从同步
猜你喜欢
随机推荐
记一次流量分析实战——安恒科技(八月ctf)
DVWA安装教程(懂你的不懂·详细)
Arrays工具类的使用
MySQL - Function and Constraint Commands
sql concat()函数
SSTI range
oracle行转列、列转行总结
uni-app使用npm命令安装组件
CTF之misc-文件隐写
A Spark task tuning 】 【 one day suddenly slow down how to solve
【调优】一个 Spark 任务某天突然变慢怎么解决
国内数字藏品交易平台开发市场会开放二级市场吗
Misc of CTF-Memory Analysis (Volatility)
uni-app installs components using npm commands
[PASECA2019]honey_shop
awd——waf部署
互联网商城盲盒app为何如此火爆
Awd summary
DVWA installation tutorial (understand what you don't understand · in detail)
Monstache执行monstache -f config.toml出错No processor type exists with name [attachment] [type=parse_exc

![[MATLAB] Image Processing - Recognition of Traffic Signs](/img/45/1a5797a17ebf6db965a64c85e0f037.png)






