当前位置:网站首页>10.回退消息
10.回退消息
2022-07-29 04:14:00 【Machoul】
rabbitmq回退消息
mandatory参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这件事情的。
可以通过设置mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。
案例演示
添加application.properties配置文件
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
添加配置类
/** * 回退消息配置类 */
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
添加回调接口
/** * 消息回退回调接口 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId():"";
if (ack){
log.info("交换机已经收到id为:{}的消息",id);
}else {
log.info("交换机还未收到id为:{}的消息,由于原因:{}",id ,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息:{}被服务器退回,退回原因:{},交换机是:{},路由key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
}
}
添加消费者
/** * 消息消费者 */
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info("接收到队列confirm.queue消息:{}",msg);
}
}
添加生产者
/** * 消息生产者 */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
/** * 初始化rabbitTemplate */
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
/** * true:交换机无法将消息进行路由时,会讲该消息返回给生产者 * false:如果发现消息无法进行路由,则直接丢弃 */
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey,correlationData2);
log.info("发送消息内容:{}",message);
}
}
结果
边栏推荐
- VScode连接ssh遇到的问题
- 基于STM32和阿里云的环境检测系统设计
- Fu Yingna: Yuan universe is the new generation of Internet!
- LCA board
- "Weilai Cup" 2022 Niuke summer multi school training camp 1 J serval and essay (heuristic merger)
- 请问为什么我进行mysql数据update时,kafka中采集到的是先删除原纪录(op d)再新增新
- 全屋WiFi方案:Mesh路由器组网和AC+AP
- 有一种密码学专用语言叫做ASN.1
- Why do I delete the original record (OP d) and then add a new one in Kafka when I update MySQL data
- Data mining -- Introduction to the basis of association analysis (Part 1)
猜你喜欢
Svg -- loading animation
Design of environment detection system based on STM32 and Alibaba cloud
从淘宝,天猫,1688,微店,京东,苏宁,淘特等其他平台一键复制商品到拼多多平台(批量上传宝贝详情接口教程)
Why are there so many unknowns when opengauss starts?
Lua语言(stm32+2G/4G模块)和C语言(stm32+esp8266)从字符串中提取相关数据的方法-整理
Const char* and char*, string constants
VScode连接ssh遇到的问题
Nacos registry
不会就坚持67天吧 平方根
[Openstack] keystone,nova
随机推荐
C语言:联合体知识点总结
Database SQL statement realizes function query of data decomposition
The data source is SQL server. I want to configure the incremental data of the last two days of the date field updatedate to add
After I get the winfrom specific control ID from the database, I need to find the corresponding control through this ID and assign a value to the text text of the control. What should I do
不会就坚持68天吧 狒狒吃香蕉
Rhel8 patch package production
Function pointer and callback function
顺序表和链表
Don't the JDBC SQL connector of the big guys Flink now support all databases, such as vertica?
C language to achieve three chess game (detailed explanation)
不会就坚持61天吧 最短的单词编码
When array is used as a function parameter, it is better to use the array size as a function parameter
C语言力扣第61题之旋转链表。双端队列与构造循环链表
Svg -- loading animation
C declaration and initialization and assignment
When defining an array, the size must be constant
Applet: Area scrolling, pull-down refresh, pull-up load more
Is the array name a pointer
Fuzzy query of SQL
Design of environment detection system based on STM32 and Alibaba cloud