当前位置:网站首页>10.回退消息
10.回退消息
2022-07-29 04:14:00 【Machoul】
rabbitmq回退消息
mandatory参数
在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,此时生产者是不知道消息被丢弃这件事情的。
可以通过设置mandatory参数在当消息传递过程中不可达目的地时将消息返回给生产者。
案例演示
添加application.properties配置文件
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
添加配置类
/** * 回退消息配置类 */
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
添加回调接口
/** * 消息回退回调接口 */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId():"";
if (ack){
log.info("交换机已经收到id为:{}的消息",id);
}else {
log.info("交换机还未收到id为:{}的消息,由于原因:{}",id ,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info("消息:{}被服务器退回,退回原因:{},交换机是:{},路由key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
}
}
添加消费者
/** * 消息消费者 */
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info("接收到队列confirm.queue消息:{}",msg);
}
}
添加生产者
/** * 消息生产者 */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
/** * 初始化rabbitTemplate */
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
/** * true:交换机无法将消息进行路由时,会讲该消息返回给生产者 * false:如果发现消息无法进行路由,则直接丢弃 */
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey,correlationData2);
log.info("发送消息内容:{}",message);
}
}
结果

边栏推荐
- SVG--loading动画
- 数据集成这个地方的过滤条件该咋写,用的啥语法?sql语法处理bizdate可以不
- 数据源是SQL server ,我要配置日期字段 updateDate 最后两天日期的增量数据,做增
- Shielding ODBC load balancing mode in gbase 8A special scenarios?
- RMAN do not mark expired backups
- 请问为什么我进行mysql数据update时,kafka中采集到的是先删除原纪录(op d)再新增新
- Compilation and linking
- When defining an array, the size must be constant
- 有一种密码学专用语言叫做ASN.1
- Differences and principles of bio, NiO and AIO
猜你喜欢

Copy products with one click from Taobao, tmall, 1688, wechat, jd.com, Suning, taote and other platforms to pinduoduo platform (batch upload baby details Interface tutorial)

Svg -- loading animation
![[paper translation] vectornet: encoding HD maps and agent dynamics from vectorized representation](/img/4b/150689d5e4809ae66a4297915ecd0c.png)
[paper translation] vectornet: encoding HD maps and agent dynamics from vectorized representation

顺序表和链表

Applet: Area scrolling, pull-down refresh, pull-up load more

信号处理中的反傅里叶变换(IFFT)原理

不会就坚持69天吧 合并区间

The solution of porting stm32f103zet6 program to c8t6+c8t6 download program flash timeout

Install the laser of ROS_ scan_ Problems encountered in match library (I)

MPU6050
随机推荐
HCIP BGP
Mmdetection preliminary use
C language - character array - string array - '\0' -sizeof-strlen() -printf()
The table of antd hides the pager when there is only one page
如何查询版本的提交号
The structure pointer must be initialized, and the pointer must also be initialized
Communication between parent-child components and parent-child components provide and inject
Machine vision Series 1: Visual Studio 2019 dynamic link library DLL establishment
[kvm] install KVM
The output comparison function of Tim is introduced in detail through PWM breathing lamp and PWM controlled DC motor
Data mining -- code implementation of association analysis example (Part 2)
C语言:typedef知识点总结
First knowledge of C language (3)
Safari's compatibility with Z-index
开课!看smardaten如何分解复杂业务场景
有没有大佬帮我看下flink sql连接kafka认证kerberos的参数配置是否有误
Function pointer and callback function
LCA board
Install the laser of ROS_ scan_ Problems encountered in match library (I)
LCA 板子