当前位置:网站首页>RockerMQ消息发送与消费模式
RockerMQ消息发送与消费模式
2022-06-27 08:15:00 【咩哥无敌】
目录
消息发送模式
简介
RocketMQ有三种发送模式,分别是同步发送、异步发送、单向发送,不同的模式适用于不同的业务场景
代码
public static void main(String[] args) {
DefaultMQProducer defaultProducer = getDefaultProducer();
Producer producer = new Producer();
producer.Sync(defaultProducer);
producer.Async(defaultProducer);
producer.oneway(defaultProducer);
defaultProducer.shutdown();
}
public static DefaultMQProducer getDefaultProducer() {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
return producer;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 同步发送
* @param producer
*/
public void Sync(DefaultMQProducer producer) {
try {
// 同步消息发送失败后,重新发送几次
producer.setRetryTimesWhenSendFailed(0);
Message msg = new Message("topic", "同步发送".getBytes());
SendResult res = producer.send(msg);
System.out.println("res" + res);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 异步发送
* @param producer
*/
public void Async(DefaultMQProducer producer) {
try {
// 异步消息发送失败后,重新发送几次
producer.setRetryTimesWhenSendAsyncFailed(0);
Message msg = new Message("topic", "异步消息".getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("sendResult:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("throwable:" + throwable);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 单向发送
* @param producer
*/
public void oneway(DefaultMQProducer producer) {
try {
Message msg = new Message("topic", "单向发送".getBytes());
producer.sendOneway(msg);
}catch (Exception e) {
e.printStackTrace();
}
}三种发送模式的区别
同步发送:消息发送到master broker后并同步到slave broker后,才会响应客户端,效率慢,但丢失数据的风险小
异步发送:消息发送到master broker后就响应客户端,无需等待成功同步到slave broker,效率高,风险也高。例如master broker处理完消息后,响应完客户端后,未同步到slave broker
单向发送:生产者只需要生产消息,无需broker返回结果,效率最快,风险也最高,适用于允许丢失消息的场景中
消息消费模式
简介
RocketMQ有两种消费模式,分别是集群(CLUSTERING)和广播(BROADCASTING),默认是集群模式。消费模式通过消费者来定义。
集群消费模式
介绍
集群消费模式是指消息只会被集群中的一个消费者消费,如果有多个集群,每个集群都只会消费一次,消息重投时不能保证路由到同一台机器,消息状态由broker维护
代码
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("ConsumerA start...");
}步骤
用上面的代码启动两个消费者分别是A、B

使用生产者代码发送一条消息或在监控平台中创建topic并发送一条消息
监控平台发送消息步骤如下

结果只会有一个消费者接收到了消息
广播消费模式
介绍
广播消息是指会给集群内当前所有注册的消费者推送消息,当前没有消费者在线则会等到有一个消费者拉到消息,但消费失败不会重投
演示
将上述代码的MessageModel换成BROADCASTING即可
consumer.setMessageModel(MessageModel.BROADCASTING);其他步骤都一样
边栏推荐
- Redis installation under Linux
- Mysql-8 download, installation and configuration tutorial under Windows
- "Short video" Linxia fire rescue detachment carries out fire safety training
- 100%弄明白5种IO模型
- Set the address book function to database maintenance, and add user name and password
- 【批处理DOS-CMD命令-汇总和小结】-批处理命令中的参数%0、%1、%2、%[0-9]、%0-9和批处理命令参数位置切换命令shift,dos命令中操作符%用法
- What is futures reverse documentary?
- Code source AQS sous - jacent pour la programmation simultanée juc
- 游戏资产复用:更快找到所需游戏资产的新方法
- Helix QAC is updated to 2022.1 and will continue to provide high standard compliance coverage
猜你喜欢
![[11. two dimensional difference]](/img/b2/da624f8a7f97c46b8e346cf6d6da49.png)
[11. two dimensional difference]

Zabbix部署说明(Server+Win客户端+交换机(H3C))

2022.06.26(LC_6100_统计放置房子的方式数)

oracle用一条sql查出哪些数据不在某个表里

vim 从嫌弃到依赖(19)——替换

Binary tree structure and heap structure foundation

(resolved) the following raise notimplementederror occurs when Minet tests

Mysql事务中MVCC理解超简单

AQS underlying source code of concurrent programming JUC

After working in a large factory for ten years with an annual salary of 400000 yuan, I was suddenly laid off. If the company wanted to abandon you, it wouldn't leave any kindness
随机推荐
并发编程JUC的AQS底层源码
期货反向跟单—交易员的培训问题
[paper reading] internally semi supervised methods
若xn>0,且x(n+1)/xn>1-1/n(n=1,2,...),证明级数∑xn发散
第6届蓝桥杯
ServletConfig与ServletContext
Code source AQS sous - jacent pour la programmation simultanée juc
C how to call line and rows when updating the database
[daily practice] realization of product card animation effect
Binary tree structure and heap structure foundation
【论文阅读】Intrinsically semi-supervised methods
2022.6.26-----leetcode.710
关联GIS:条条道路通UE5城
Recognize the ordering of O (nlogn)
【原创】TypeScript字符串utf-8编码解码
Preliminary understanding of C #
Blind survey shows that female code farmers are better than male code farmers
lvgl使用demo及说明2
爬一个网页的所有导师信息
vim 从嫌弃到依赖(19)——替换