当前位置:网站首页>Redis publish/subscribe
Redis publish/subscribe
2022-07-30 06:56:00 【If I don't see you tomorrow】
Redis实现了发布/订阅功能,Developers can use this lightweight function to quickly implement scene applications.
SUBSCRIBE、UNSUBSCRIBE和PUBLISH实现了发布/Subscription messaging paradigm,This decoupling of publishers and subscribers enables greater scalability and more dynamic network topologies.
注:RedisIt does not store message body information itself.If the actual production environment is at the time of consumption,Network fluctuations caused one of the consumers to hang for a while,那么当它重新连接上的时候,Messages generated during this period of time will not be consumed by the consumer.
Pub/Sub
推送消息的格式
The message is an array reply with three elements.
第一个元素是消息类型:
subscribe: Indicates that we successfully subscribed to the channel given as the second element in the reply.The third parameter represents the number of channels we are currently subscribed to.unsubscribe: Indicates that we successfully unsubscribed from the channel given as the second element in the reply.The third parameter represents the number of channels we are currently subscribed to.当最后一个参数为零时,我们不再订阅任何频道,客户端可以发出任何类型的Redis命令,因为我们处于Pub/Sub状态之外.message:It's due to another clientPUBLISHThe message received by the command.The second element is the name of the originating channel,The third parameter is the actual message payload(payload).
# 订阅 msg 和 chat_room 两个频道
# 1 - 6 行是执行 subscribe 之后的反馈信息
# 第 7 - 9 行才是接收到的第一条信息
# 第 10 - 12 行是第二条
redis> subscribe msg chat_room
Reading messages... (press Ctrl-C to quit)
1) "subscribe" # 返回值的类型:显示订阅成功
2) "msg" # 订阅的频道名字
3) (integer) 1 # 目前已订阅的频道数量
1) "subscribe"
2) "chat_room"
3) (integer) 2
1) "message" # 返回值的类型:信息
2) "msg" # 来源(从那个频道发送过来)
3) "hello moto" # 信息内容
1) "message"
2) "chat_room"
3) "testing...haha"
Database and scope
Pub/Subwith keyspace(key space)无关.It is designed not to be disturbed by the key space at any level,Include the database number.
在db 10上发布,将被db 1上的订阅者听到.
If you need a range of some kind,Please prefix the channel with the environment name(测试、预发布、生产…).
Messages that match patterns and channel subscriptions
如果客户端订阅了多个与已发布消息匹配的模式,或者订阅了与该消息匹配的模式和通道,then the client may receive a message multiple times.如下例所示:
SUBSCRIBE foo
PSUBSCRIBE f*
在上面的例子中,If a message is sent tochannel foo,The client will receive two messages:一条是type message,一条是type pmessage.
存储结构
redisServer.pubsub_patterns属性是一个链表,链表中保存着所有和模式相关的信息.链表中的每个节点都包含一个redis.h/pubsubPattern结构
struct redisServer {
// ...
list *pubsub_patterns;
// ...
};
typedef struct pubsubPattern {
redisClient *client;
robj *pattern;
} pubsubPattern;
client属性保存着订阅模式的客户端,而pattern属性则保存着被订阅的模式.
每当调用
PSUBSCRIBE命令订阅一个模式时,程序就创建一个包含客户端信息和被订阅模式的pubsubPattern结构, 并将该结构添加到redisServer.pubsub_patterns链表中.
作为例子,下图展示了一个包含两个模式的pubsub_patterns链表, 其中client123和client256都正在订阅tweet.shop.*模式:
如果这时客户端client10086执行PSUBSCRIBE broadcast.list.*, 那么pubsub_patterns链表将被更新成这样:
通过遍历整个
pubsub_patterns链表,程序可以检查所有正在被订阅的模式,以及订阅这些模式的客户端.
代码示例
bean注册
@Configuration(proxyBeanMethods = false)
@ConditionalOnClass(RedisOperations.class)
@EnableConfigurationProperties(RedisProperties.class)
@Import({
RedisAutoConfiguration.class })
@AutoConfigureBefore(RedissonAutoConfiguration.class)
public class MyRedisAutoConfiguration {
@Bean
public NameUpdateSubscriber nameUpdateSubscriber(){
return new NameUpdateSubscriber();
}
@Bean
public RedisMessageListenerContainer container(@Qualifier("redisConnectionFactory") RedisConnectionFactory connectionFactory, NameUpdateSubscriber nameUpdateSubscriber) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
//Configure the subscription item to subscribe to
container.addMessageListener(nameUpdateSubscriber, new PatternTopic(RedisConstant.NAME_UPDATE_CHANNEL));
return container;
}
}
推送消息
public void delNameCache(Long id) {
Boolean delete = stringRedisTemplate.delete(getCacheKey(id));
if (Objects.nonNull(delete) && delete) {
stringRedisTemplate.convertAndSend(RedisConstant.NAME_UPDATE_CHANNEL, String.valueOf(id));
}
}
消费监听
public class GuildNameUpdateSubscriber extends MessageListenerAdapter {
@Autowired
private StringRedisTemplate redisTemplate;
@Autowired
private NameRepository nameRepository;
public NameUpdateSubscriber() {
}
@Override
public void onMessage(Message message, byte[] pattern) {
byte[] body = message.getBody();
byte[] channel = message.getChannel();
String msg = redisTemplate.getStringSerializer().deserialize(body);
String topic = redisTemplate.getStringSerializer().deserialize(channel);
log.info("监听到channel为 {} 的消息:{}", topic, msg);
if (Objects.equals(topic, RedisConstant.NAME_UPDATE_CHANNEL) && StringUtils.isNotBlank(msg)) {
nameRepository.refreshCache(Long.parseLong(msg));
}
}
}
参考资料:
边栏推荐
- sql concat() function
- 十八、Kotlin进阶学习:1、挂起函数执行的顺序;2、使用 async 和 await 异步执行挂起函数;3、协程的调度器;4、父子协程;
- 【SQL】first_value 应用场景 - 首单 or 复购
- C#下大批量一键空投实现
- 【MySQL功法】第5话 · SQL单表查询
- 《MySQL高级篇》四、索引的存储结构
- Flink-流/批/OLAP一体得到Flink引擎
- [Getting C language from zero basis - navigation summary]
- Reasons and solutions for Invalid bound statement (not found)
- The first WebAssembly program
猜你喜欢

C#下大批量一键空投实现

FastAPI Quick Start
![Monstache执行monstache -f config.toml出错No processor type exists with name [attachment] [type=parse_exc](/img/2d/50c9001125cd613087044d2b6c78b1.png)
Monstache执行monstache -f config.toml出错No processor type exists with name [attachment] [type=parse_exc
![[Mozhe Academy] Identity Authentication Failure Vulnerability Actual Combat](/img/c3/4a4e23a97e4650a17ff5cfc5233043.png)
[Mozhe Academy] Identity Authentication Failure Vulnerability Actual Combat

SQL Server 数据库之生成与执行 SQL 脚本

在线sql编辑查询工具sql-editor
misc-file steganography of CTF

SQL Server安装教程

oracle row to column, column to row summary

mysql delete duplicate data in the table, (retain only one row)
随机推荐
根据ip地址获取地理位置及坐标(离线方式)
The Request request body is repackaged to solve the problem that the request body can only be obtained once
GraphQL (1) Basic introduction and application examples
TDengine cluster construction
POI工具类
【MySQL功法】第5话 · SQL单表查询
awd --waf deployment
phpok website vulnerability exploitation analysis
Trust anchor for certification path not found.异常解决方法。
【Spark】Spark 高频面试题英语版(1)
"MySQL Advanced Chapter" four, the storage structure of the index
【数仓】数据质量
[PASECA2019]honey_shop
Competition WP in May
在不同的服务器上基于docker部署redis主从同步
FastAPI 快速入门
Flink PostgreSQL CDC配置和常见问题
C#下大批量一键空投实现
Invalid bound statement (not found)出现的原因和解决方法
3 minutes to tell you how to become a hacker | Zero foundation to hacker introductory guide, you only need to master these five skills