当前位置:网站首页>Tag filtering and SQL filtering of rocketmq
Tag filtering and SQL filtering of rocketmq
2022-06-29 18:35:00 【Brother baa is invincible】
Catalog
tag Filter
Application scenarios
stay RockerMQ Can be used in the topic Divide the business , For example, order 、 goods 、 Activities and other businesses are divided into different topic. To make the business logic clearer, you can also use tag Divide again , For example, the order is divided into clothing orders 、 Home appliance orders 、 Beverage order, etc .
principle
On the server side, we use tag Of hashcode Filter , When consumers tag Subscription related queue In the news tag Of hashcode When it is consistent, it will return directly , But this can only filter most tag, Because of existence hash Collision , Therefore, the client should also be based on tag Value to filter
Server use hashcode Filtering is because it reduces unnecessary network traffic , also hashcode Fast filtration , The bottom layer can directly use bit operations
Sample code
Producer code
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// Created a label for tag_a The news of
Message msg = new Message("topic_a", "tag_a", ("test").getBytes());
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}After sending, it will be sent to console You can see that there are defined values in the column of the message tag

Consumer code
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// consumer.subscribe("topic_a", "*");
consumer.subscribe("topic_a", "tag_a");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}by * Do not filter any messages , You need to subscribe to multiple tag use || separate , for example tag_a||tag_b
sql Filter
Introduce
In some cases , More complex filtering conditions may be required , It can be used at this time sql Filter ,sql Filtration performance ratio tag low , Only a few basic grammars are defined , as follows :
- Numerical comparison , such as :
>,>=,<,<=,BETWEEN,=; - Character comparison , such as :
=,<>,IN; IS NULLperhapsIS NOT NULL;- Logical symbols
AND,OR,NOT;
The constant support type is :
- The number , such as :
123,3.1415; - character , such as :
'abc', It must be enclosed in single quotes ; NULL, Special constants ;- Boolean value ,
TRUEorFALSE
To configure
Need to be in conf/broker.conf Add the following configuration , Otherwise, it will report a mistake
enablePropertyFilter=true
Start with the specified broker.conf file
./mqbroker -n localhost:9876 -c ../conf/broker.conf
The sample code
producer
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("topic_a", ("test").getBytes());
msg.putUserProperty("age", "11");
msg.putUserProperty("name", " Zhang San ");
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}consumer
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// subscribe age Greater than 10 also name It's Zhang San's message
consumer.subscribe("topic_a", MessageSelector.bySql("age > 10 and name = ' Zhang San '"));
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}边栏推荐
- [tcapulusdb knowledge base] tcapulusdb operation and maintenance doc introduction
- Sd6.24 summary of intensive training
- Jar包后台启动并输出日志
- Configure the local domain name through the hosts file
- Stepping on the pit: json Parse and json stringify
- Error building sqlsession problem
- Shell基本语法--流程控制
- 【日常训练】535. TinyURL 的加密与解密
- Adobe Premiere基础-不透明度(蒙版)(十一)
- 【TcaplusDB知识库】TcaplusDB单据受理-事务执行介绍
猜你喜欢

【网络是怎么连接的】第三章 探索集线器,交换机和路由器

JDBC knowledge

About microservices

Codeworks 5 questions per day (1700 for each) - the next day

Error building SqlSession问题

Adobe Premiere Basics - general operations for editing material files (offline files, replacing materials, material labels and grouping, material enabling, convenient adjustment of opacity, project pa

Servlet student management system (Mengxin hands-on version)

Adobe Premiere基础-批量素材导入序列-变速和倒放(回忆)-连续动作镜头切换-字幕要求(十三)
MySql存储过程循环的使用分析详解

对强缓存和协商缓存的理解
随机推荐
数据分析--时间序列预测
Leetcode 984. String without AAA or BBB (thought of netizens)
centos 7.5安装mysql 8.0.27----yum
工作流模块Jar包启动报错:liquibase – Waiting for changelog lock….
Yolov6+tensorrt+onnx: deployment based on win10+tensorrt8+yolov6+onnx
Markdown common fonts
Us judge ruled that the former security director of Uber accused of covering up hacking must face fraud charges
Sword finger offer 34 Path DFS method for binary tree neutralization
保持jupyter notebook在终端关闭时的连接方法
Cannot retrieve repository metadata processing records
Workflow module jar package startup error: liquibase – waiting for changelog lock
Adobe Premiere基础-素材嵌套(制作抖音结尾头像动画)(九)
VMware installation esxi
[how the network is connected] Chapter 3 explores hubs, switches and routers
SD6.25集训总结
程序员资源推荐指南
MySql存储过程循环的使用分析详解
熊猫跑酷js小游戏代码
Adobe Premiere基础-常用的视频特效(边角定位,马赛克,模糊,锐化,手写工具,效果控件层级顺序)(十六)
jdbc认识上手