当前位置:网站首页>Tag filtering and SQL filtering of rocketmq
Tag filtering and SQL filtering of rocketmq
2022-06-29 18:35:00 【Brother baa is invincible】
Catalog
tag Filter
Application scenarios
stay RockerMQ Can be used in the topic Divide the business , For example, order 、 goods 、 Activities and other businesses are divided into different topic. To make the business logic clearer, you can also use tag Divide again , For example, the order is divided into clothing orders 、 Home appliance orders 、 Beverage order, etc .
principle
On the server side, we use tag Of hashcode Filter , When consumers tag Subscription related queue In the news tag Of hashcode When it is consistent, it will return directly , But this can only filter most tag, Because of existence hash Collision , Therefore, the client should also be based on tag Value to filter
Server use hashcode Filtering is because it reduces unnecessary network traffic , also hashcode Fast filtration , The bottom layer can directly use bit operations
Sample code
Producer code
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// Created a label for tag_a The news of
Message msg = new Message("topic_a", "tag_a", ("test").getBytes());
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}After sending, it will be sent to console You can see that there are defined values in the column of the message tag

Consumer code
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// consumer.subscribe("topic_a", "*");
consumer.subscribe("topic_a", "tag_a");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}by * Do not filter any messages , You need to subscribe to multiple tag use || separate , for example tag_a||tag_b
sql Filter
Introduce
In some cases , More complex filtering conditions may be required , It can be used at this time sql Filter ,sql Filtration performance ratio tag low , Only a few basic grammars are defined , as follows :
- Numerical comparison , such as :
>,>=,<,<=,BETWEEN,=; - Character comparison , such as :
=,<>,IN; IS NULLperhapsIS NOT NULL;- Logical symbols
AND,OR,NOT;
The constant support type is :
- The number , such as :
123,3.1415; - character , such as :
'abc', It must be enclosed in single quotes ; NULL, Special constants ;- Boolean value ,
TRUEorFALSE
To configure
Need to be in conf/broker.conf Add the following configuration , Otherwise, it will report a mistake
enablePropertyFilter=true
Start with the specified broker.conf file
./mqbroker -n localhost:9876 -c ../conf/broker.conf
The sample code
producer
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("topic_a", ("test").getBytes());
msg.putUserProperty("age", "11");
msg.putUserProperty("name", " Zhang San ");
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}consumer
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// subscribe age Greater than 10 also name It's Zhang San's message
consumer.subscribe("topic_a", MessageSelector.bySql("age > 10 and name = ' Zhang San '"));
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}边栏推荐
- Chapter 02_ MySQL data directory
- 6.29模拟赛总结
- JDBC Codes connexes
- Failed to allocate graph: myriad device is not opened
- 数据分析基础--预测模型
- Adobe Premiere基础-常用的视频特效(裁剪,黑白,剪辑速度,镜像,镜头光晕)(十五)
- svg画圆路径动画
- Adobe Premiere foundation - time remapping (10)
- Jar包后台启动并输出日志
- [tcapulusdb knowledge base] tcapulusdb operation and maintenance doc introduction
猜你喜欢

AMAZING PANDAVERSE:META”无国界,来2.0新征程激活时髦属性

Yolov6+tensorrt+onnx: deployment based on win10+tensorrt8+yolov6+onnx

Chapter 02_ MySQL data directory

Configure the local domain name through the hosts file

通过 hosts文件配置本地域名

Stepping on the pit: json Parse and json stringify

About microservices

JWT登录验证

龙canvas动画

熊猫跑酷js小游戏代码
随机推荐
centos 7.5安装mysql 8.0.27----yum
【TcaplusDB知识库】TcaplusDB单据受理-事务执行介绍
[tcapulusdb knowledge base] tcapulusdb doc acceptance - table creation approval introduction
Leetcode 984. String without AAA or BBB (thought of netizens)
MySQL -connector/j driver download
mysql — 清空表中数据
3H proficient in opencv (VII) - color detection
Record that the server has been invaded by viruses: the SSH password has been changed, the login fails, the malicious program runs full of CPU, the jar package fails to start automatically, and you ha
Chapter 02_ MySQL data directory
MySql存储过程循环的使用分析详解
Weibo comments on high-performance and high availability architecture design (module 5 of architecture practice camp)
Elegant writing controller (parameter verification + unified exception handling)
【网络是怎么连接的】第三章 探索集线器,交换机和路由器
[wangdingbei 2020 Qinglong formation]areuserialz
Shell basic syntax -- process control
Request header field XXXX is not allowed by access control allow headers in preflight response
保持jupyter notebook在终端关闭时的连接方法
mysql -connector/j驱动下载
Adobe Premiere基础-不透明度(蒙版)(十一)
6.29 simulation summary