当前位置:网站首页>RocketMQ的tag过滤和sql过滤
RocketMQ的tag过滤和sql过滤
2022-06-29 18:03:00 【咩哥无敌】
目录
tag过滤
应用场景
在RockerMQ中可以用topic将业务划分,例如将订单、商品、活动等业务划分在不同的topic。为了使业务逻辑更清晰还可以用tag再次划分,例如将订单划分为服装订单、家电订单、酒水订单等。
原理
在服务端采用的是tag的hashcode过滤,当消费者的tag与订阅的queue中消息的tag的hashcode一致时就会直接返回,但这样只能过滤大部分tag,因为存在hash碰撞,所以还要在客户端还要根据tag值进行过滤
服务端使用hashcode过滤是因为减少了不必要的网络传输,并且hashcode过滤快,底层直接可以使用位运算
示例代码
生产者代码
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
// 建立了标签为tag_a的消息
Message msg = new Message("topic_a", "tag_a", ("test").getBytes());
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}发送完之后在console中就能看到消息标签那列存在定义的值

消费者代码
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// consumer.subscribe("topic_a", "*");
consumer.subscribe("topic_a", "tag_a");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}为*时不过滤任何消息,需要订阅多个tag用||隔开,例如tag_a||tag_b
sql过滤
介绍
在某些情况下,可能需要更复杂的过滤条件,这时候就可以使用sql过滤,sql过滤性能比tag低,只定义了一些基本的语法,如下:
- 数值比较,比如:
>,>=,<,<=,BETWEEN,=; - 字符比较,比如:
=,<>,IN; IS NULL或者IS NOT NULL;- 逻辑符号
AND,OR,NOT;
常量支持类型为:
- 数值,比如:
123,3.1415; - 字符,比如:
'abc',必须用单引号包裹起来; NULL,特殊的常量;- 布尔值,
TRUE或FALSE
配置
需要在conf/broker.conf下添加以下配置,不然会报错
enablePropertyFilter=true
启动时使用指定的broker.conf文件
./mqbroker -n localhost:9876 -c ../conf/broker.conf
实例代码
生产者
public static void main(String[] args) {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
Message msg = new Message("topic_a", ("test").getBytes());
msg.putUserProperty("age", "11");
msg.putUserProperty("name", "张三");
producer.send(msg);
producer.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}消费者
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
// 订阅age大于10并且name是张三的消息
consumer.subscribe("topic_a", MessageSelector.bySql("age > 10 and name = '张三'"));
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
}边栏推荐
- Sister Juan takes you to learn database -- 5-day dash day4
- Adobe Premiere基础-时间重映射(十)
- mysql -connector/j驱动下载
- JS merge two one-dimensional arrays and remove the same items (collation)
- EasyCVR部署服务器集群时,出现一台在线一台不在线是什么原因?
- 3H proficient in opencv (V) - perspective transformation
- js两个二维数组合并并去除相同项(整理)
- shell教程之循环语句for,while,until用法
- 6.29模拟赛总结
- Find the maximum XOR value in the sequence given a number (01 Dictionary)
猜你喜欢

JDBC Codes connexes

Parental delegation mechanism

Visio annotation, annotation location

Precondition end of script headers or end of script output before headers

Adobe Premiere基础-时间重映射(十)

MySQL -connector/j driver download

Adobe Premiere基础-不透明度(混合模式)(十二)

源码安装MAVROS

Top 30 open source software

mysql -connector/j驱动下载
随机推荐
Lodash deep copy usage
Yolov6+tensorrt+onnx: deployment based on win10+tensorrt8+yolov6+onnx
MaxCompute Studio
Bloom filter:
Detailed introduction and Simulation of bitmap
Relationship among controller, service and Dao
Adobe Premiere基础-不透明度(混合模式)(十二)
3H proficient in opencv (VI) - image stacking
WBF:检测任务NMS后虑框新方式?
给定一个数在序列中求最大异或值(01字典)
If the evaluation conclusion of waiting insurance is poor, does it mean that waiting insurance has been done in vain?
JDBC Codes connexes
shell教程之循环语句for,while,until用法
Proxmox VE Install 7.2
Adobe Premiere基础-声音调整(音量矫正,降噪,电话音,音高换挡器,参数均衡器)(十八)
Maxcompute string replacement function -replace
JWT登录验证
3h精通OpenCV(六)-图像堆叠
jdbc认识上手
【TcaplusDB知识库】TcaplusDB单据受理-建表审批介绍