当前位置:网站首页>10.回退消息
10.回退消息
2022-07-29 04:14:00 【Machoul】
rabbitmq回退消息
mandatory参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这件事情的。
可以通过设置mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。
案例演示
添加application.properties配置文件
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
添加配置类
/** * 回退消息配置类 */
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
添加回调接口
/** * 消息回退回调接口 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId():"";
if (ack){
log.info("交换机已经收到id为:{}的消息",id);
}else {
log.info("交换机还未收到id为:{}的消息,由于原因:{}",id ,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息:{}被服务器退回,退回原因:{},交换机是:{},路由key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
}
}
添加消费者
/** * 消息消费者 */
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info("接收到队列confirm.queue消息:{}",msg);
}
}
添加生产者
/** * 消息生产者 */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
/** * 初始化rabbitTemplate */
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
/** * true:交换机无法将消息进行路由时,会讲该消息返回给生产者 * false:如果发现消息无法进行路由,则直接丢弃 */
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey,correlationData2);
log.info("发送消息内容:{}",message);
}
}
结果

边栏推荐
- 淘宝商品详情接口(商品详情页面数据接口)
- %s. %c, character constant, string constant, const char*, pointer array, string array summary
- Pat a1069/b1019 the black hole of numbers
- 基于STM32和阿里云的环境检测系统设计
- 索引的最左前缀原理
- Fuzzy query of SQL
- 对一个元素使用多种变形的方法
- How to query the submission number of a version
- Locally call tensorboard and Jupiter notebook on the server (using mobaxterm)
- Asp.net MVC中文件夹中的控制器如何跳转到根目录的控制器中?
猜你喜欢

安装ros的laser_scan_matche库所遇到的问题(一)

The solution of porting stm32f103zet6 program to c8t6+c8t6 download program flash timeout

Why are there so many unknowns when opengauss starts?

RMAN do not mark expired backups

MySQL gets the maximum value record by field grouping

C语言力扣第61题之旋转链表。双端队列与构造循环链表

MPU6050

Press the missing number of interview question 17.04 | | 260. the number that appears only once (including bit operation knowledge points)

开课!看smardaten如何分解复杂业务场景

Design of environment detection system based on STM32 and Alibaba cloud
随机推荐
There is a special cryptology language called asn.1
“蔚来杯“2022牛客暑期多校训练营1 J Serval and Essay(启发式合并)
Design of environment detection system based on STM32 and Alibaba cloud
Asp.net MVC中文件夹中的控制器如何跳转到根目录的控制器中?
mmdetection初步使用
[kvm] common commands
VScode连接ssh遇到的问题
Whole house WiFi solution: mesh router networking and ac+ap
不会就坚持71天吧 链表排序
Function pointer and callback function
SQL server当存储过程接收的参数是int类型时,如何做判断?
MPU6050
淘宝商品详情接口(商品详情页面数据接口)
LDP -- label distribution protocol
Communication between parent-child components and parent-child components provide and inject
[kvm] create virtual machine from kickstart file
Object detection: object_ Detection API +ssd target detection model
Multi rotor six axis hardware selection
这个报错是什么鬼啊,不影响执行结果,但是在执行sql时一直报错。。。连接maxComputer是使用
MPU6050