当前位置:网站首页>RockerMQ消息发送与消费模式
RockerMQ消息发送与消费模式
2022-06-27 08:15:00 【咩哥无敌】
目录
消息发送模式
简介
RocketMQ有三种发送模式,分别是同步发送、异步发送、单向发送,不同的模式适用于不同的业务场景
代码
public static void main(String[] args) {
DefaultMQProducer defaultProducer = getDefaultProducer();
Producer producer = new Producer();
producer.Sync(defaultProducer);
producer.Async(defaultProducer);
producer.oneway(defaultProducer);
defaultProducer.shutdown();
}
public static DefaultMQProducer getDefaultProducer() {
try {
DefaultMQProducer producer = new DefaultMQProducer("producerGroup");
producer.setNamesrvAddr("localhost:9876");
producer.start();
return producer;
} catch (Exception e) {
e.printStackTrace();
return null;
}
}
/**
* 同步发送
* @param producer
*/
public void Sync(DefaultMQProducer producer) {
try {
// 同步消息发送失败后,重新发送几次
producer.setRetryTimesWhenSendFailed(0);
Message msg = new Message("topic", "同步发送".getBytes());
SendResult res = producer.send(msg);
System.out.println("res" + res);
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 异步发送
* @param producer
*/
public void Async(DefaultMQProducer producer) {
try {
// 异步消息发送失败后,重新发送几次
producer.setRetryTimesWhenSendAsyncFailed(0);
Message msg = new Message("topic", "异步消息".getBytes());
producer.send(msg, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
System.out.println("sendResult:" + sendResult);
}
@Override
public void onException(Throwable throwable) {
System.out.println("throwable:" + throwable);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
/**
* 单向发送
* @param producer
*/
public void oneway(DefaultMQProducer producer) {
try {
Message msg = new Message("topic", "单向发送".getBytes());
producer.sendOneway(msg);
}catch (Exception e) {
e.printStackTrace();
}
}三种发送模式的区别
同步发送:消息发送到master broker后并同步到slave broker后,才会响应客户端,效率慢,但丢失数据的风险小
异步发送:消息发送到master broker后就响应客户端,无需等待成功同步到slave broker,效率高,风险也高。例如master broker处理完消息后,响应完客户端后,未同步到slave broker
单向发送:生产者只需要生产消息,无需broker返回结果,效率最快,风险也最高,适用于允许丢失消息的场景中
消息消费模式
简介
RocketMQ有两种消费模式,分别是集群(CLUSTERING)和广播(BROADCASTING),默认是集群模式。消费模式通过消费者来定义。
集群消费模式
介绍
集群消费模式是指消息只会被集群中的一个消费者消费,如果有多个集群,每个集群都只会消费一次,消息重投时不能保证路由到同一台机器,消息状态由broker维护
代码
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroup");
consumer.setNamesrvAddr("localhost:9876");
consumer.subscribe("topic", "*");
consumer.setMessageModel(MessageModel.CLUSTERING);
consumer.registerMessageListener((MessageListenerConcurrently) (msgList, context) -> {
for (MessageExt msg : msgList) {
System.out.println(new String(msg.getBody()));;
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
});
consumer.start();
System.out.println("ConsumerA start...");
}步骤
用上面的代码启动两个消费者分别是A、B

使用生产者代码发送一条消息或在监控平台中创建topic并发送一条消息
监控平台发送消息步骤如下

结果只会有一个消费者接收到了消息
广播消费模式
介绍
广播消息是指会给集群内当前所有注册的消费者推送消息,当前没有消费者在线则会等到有一个消费者拉到消息,但消费失败不会重投
演示
将上述代码的MessageModel换成BROADCASTING即可
consumer.setMessageModel(MessageModel.BROADCASTING);其他步骤都一样
边栏推荐
- 洛谷刷题心得记录
- Etcd tutorial - Chapter 5 etcd etcdctl usage
- 分析日志.log
- C how to call line and rows when updating the database
- [10. difference]
- AQS underlying source code of concurrent programming JUC
- (note) Anaconda navigator flashback solution
- c的时间函数算效率
- 【批处理DOS-CMD命令-汇总和小结】-批处理命令中的参数%0、%1、%2、%[0-9]、%0-9和批处理命令参数位置切换命令shift,dos命令中操作符%用法
- Refer to | the computer cannot access the Internet after the hotspot is turned on in win11
猜你喜欢
![[12. maximum continuous non repeating subsequence]](/img/eb/230cd6062e28374c86863f2122e43b.png)
[12. maximum continuous non repeating subsequence]

Preliminary understanding of C #

Redis master-slave replication and sentinel mode

L'introduction en bourse de Wild Wind Pharmaceutical a pris fin: Yu pinzeng, qui avait l'intention de lever 540 millions de RMB, a effectué un investissement P2P.

Persistence mechanism of redis

Programming life - what do you think of the 35 year old bottleneck of programmers?

爬一个网页的所有导师信息

Eight misunderstandings, broken one by one (final): the cloud is difficult to expand, the customization is poor, and the administrator will lose control?

MSSQL how to export and delete multi table data using statements

"Short video" Linxia fire rescue detachment carries out fire safety training
随机推荐
【10. 差分】
(note) Anaconda navigator flashback solution
Redis的持久化机制
【批处理DOS-CMD命令-汇总和小结】-将文件夹映射成虚拟磁盘——subst
JS EventListener
Eight misunderstandings, broken one by one (final): the cloud is difficult to expand, the customization is poor, and the administrator will lose control?
【13. 二进制中1的个数、位运算】
[batch dos-cmd command - summary and summary] - environment variables, path variables, search file location related instructions - set, path, where, what if there are spaces in the path parameters of
Etcd tutorial - Chapter 5 etcd etcdctl usage
Refer to | the computer cannot access the Internet after the hotspot is turned on in win11
100%弄明白5种IO模型
期货反向跟单靠谱吗?
并发编程JUC的AQS底层源码
win命令行中导入、导出数据库相关表
SQL attendance query interval: one hour
2022 love analysis · panoramic report of it operation and maintenance manufacturers
The difference between ArrayList and LinkedList
MSSQL how to export and delete multi table data using statements
Coggle 30 days of ML July competition learning
Index +sql exercise optimization