当前位置:网站首页>RocketMQ的tag过滤和sql过滤
RocketMQ的tag过滤和sql过滤
2022-06-29 18:03:00 【咩哥无敌】
目录
tag过滤
应用场景
在RockerMQ中可以用topic将业务划分,例如将订单、商品、活动等业务划分在不同的topic。为了使业务逻辑更清晰还可以用tag再次划分,例如将订单划分为服装订单、家电订单、酒水订单等。
原理
在服务端采用的是tag的hashcode过滤,当消费者的tag与订阅的queue中消息的tag的hashcode一致时就会直接返回,但这样只能过滤大部分tag,因为存在hash碰撞,所以还要在客户端还要根据tag值进行过滤
服务端使用hashcode过滤是因为减少了不必要的网络传输,并且hashcode过滤快,底层直接可以使用位运算
示例代码
生产者代码
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 建立了标签为tag_a的消息
Message msg = new Message("topic_a", "tag_a", ("test").getBytes());
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
发送完之后在console中就能看到消息标签那列存在定义的值
消费者代码
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// consumer.subscribe("topic_a", "*");
consumer.subscribe("topic_a", "tag_a");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
为*时不过滤任何消息,需要订阅多个tag用||隔开,例如tag_a||tag_b
sql过滤
介绍
在某些情况下,可能需要更复杂的过滤条件,这时候就可以使用sql过滤,sql过滤性能比tag低,只定义了一些基本的语法,如下:
- 数值比较,比如:
>
,>=
,<
,<=
,BETWEEN
,=
; - 字符比较,比如:
=
,<>
,IN
; IS NULL
或者IS NOT NUL
L;- 逻辑符号
AND
,OR
,NOT
;
常量支持类型为:
- 数值,比如:
123
,3.1415
; - 字符,比如:
'abc'
,必须用单引号包裹起来; NULL
,特殊的常量;- 布尔值,
TRUE
或FALSE
配置
需要在conf/broker.conf下添加以下配置,不然会报错
enablePropertyFilter=true
启动时使用指定的broker.conf文件
./mqbroker -n localhost:9876 -c ../conf/broker.conf
实例代码
生产者
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("topic_a", ("test").getBytes());
msg.putUserProperty("age", "11");
msg.putUserProperty("name", "张三");
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
消费者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅age大于10并且name是张三的消息
consumer.subscribe("topic_a", MessageSelector.bySql("age > 10 and name = '张三'"));
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}
边栏推荐
- mysql — 清空表中数据
- 【TcaplusDB知识库】TcaplusDB单据受理-建表审批介绍
- Abc253 D fizzbuzz sum hard (tolerance exclusion theorem)
- Xiaomai technology x hologres: high availability of real-time data warehouse construction of ten billion level advertising
- JS merge two one-dimensional arrays and remove the same items (collation)
- Bloom filter:
- YoloV6+TensorRT+ONNX:基于WIN10+TensorRT8+YoloV6+ONNX的部署
- It's really easy to make money in foreign lead and build a website
- How QQ opens online customer service
- 最长异或路径(dfs+01trie)
猜你喜欢
Xiaobai yuesai 51 supplement e g f
My first experience of remote office | community essay solicitation
自动化软件测试 - 利用短信转发器结合Selenium读取短信验证码
The soft youth under the blessing of devcloud makes education "smart" in the cloud
Adobe Premiere foundation - batch material import sequence - variable speed and rewind (recall) - continuous action shot switching - subtitle requirements (13)
Adobe Premiere foundation - opacity (matte) (11)
jdbc认识上手
Image migration and data migration synchronization of old and new servers with different Alibaba cloud accounts
Yolov6+tensorrt+onnx: deployment based on win10+tensorrt8+yolov6+onnx
Have you grasped the most frequently asked question in the interview about massive data processing?
随机推荐
Detailed analysis on the use of MySQL stored procedure loop
markdown知识轻轻来袭
记录服务器被入侵病毒:ssh密码被更改登录失败、恶意程序跑满了cpu、jar包启动失败自动kill、一直弹出You have new mail in /var/spool/mail/root
YoloV6+TensorRT+ONNX:基于WIN10+TensorRT8+YoloV6+ONNX的部署
shell教程之循环语句for,while,until用法
Adobe Premiere foundation - opacity (mixed mode) (XII)
/usr/bin/ld: warning: **libmysqlclient. so. 20**, needed by //usr/
Adobe Premiere Foundation - réglage du son (correction du volume, réduction du bruit, tonalité téléphonique, changement de hauteur, égaliseur de paramètres) (XVIII)
Proxmox VE Install 7.2
程序员资源推荐指南
Image migration and data migration synchronization of old and new servers with different Alibaba cloud accounts
jdbc_相关代码
通过 hosts文件配置本地域名
Adobe Premiere foundation - cool text flash (14)
/usr/bin/ld: warning: **libmysqlclient.so.20**, needed by //usr/
3h精通OpenCV(八)-形状检测
Maximum length of palindrome substring (string hash + binary)
[tcapulusdb knowledge base] tcapulusdb operation and maintenance doc introduction
Adobe Premiere基础-不透明度(混合模式)(十二)
源码安装MAVROS