当前位置:网站首页>10. Fallback message
10. Fallback message
2022-07-29 04:15:00 【Machoul】
rabbitmq Fallback message
mandatory Parameters
When only the producer confirmation mechanism is turned on , After the switch receives the message , A confirmation message will be sent directly to the message producer , If the message is found to be non routable , Then the message will be discarded directly , At this time, the producer does not know that the message is discarded .
Can be set by mandatory Parameter returns the message to the producer when the destination cannot be reached during message delivery .
Case presentation
add to application.properties The configuration file
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
Add configuration class
/** * Fallback message configuration class */
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
return new DirectExchange(CONFIRM_EXCHANGE);
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue,@Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
Add callback interface
/** * Message fallback callback interface */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId():"";
if (ack){
log.info(" The switch has received id by :{} The news of ",id);
}else {
log.info(" The switch hasn't received id by :{} The news of , For some reason :{}",id ,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info(" news :{} Returned by server , Reasons for return :{}, The switch is :{}, route key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
}
}
Add consumer
/** * Message consumer */
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info(" Received queue confirm.queue news :{}",msg);
}
}
Add producers
/** * Message producer */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
/** * initialization rabbitTemplate */
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
/** * true: When the switch cannot route messages , The message will be returned to the producer * false: If a message is found to be unable to route , Throw away */
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey,correlationData2);
log.info(" Send message content :{}",message);
}
}
result

边栏推荐
- Value transmission and address transmission of C language, pointer of pointer
- UnicodeDecodeError: ‘ascii‘ codec can‘t decode byte 0x90 in position 614: ordinal not in range(128)
- pat A1041 Be Unique
- GBase 8a特殊场景下屏蔽 ODBC 负载均衡方式?
- Lua语言(stm32+2G/4G模块)和C语言(stm32+esp8266)从字符串中提取相关数据的方法-整理
- 2021 sist summer camp experience + record post of School of information, Shanghai University of science and technology
- Note: restframe work records many to one tables, how to serialize in that table (reverse query)
- When array is used as a function parameter, it is better to use the array size as a function parameter
- kotlin的List,Map,Set等集合类不指定类型
- Change the value of the argument by address through malloc and pointer
猜你喜欢

伏英娜:元宇宙就是新一代互联网!

Beginner: array & String

开课!看smardaten如何分解复杂业务场景

Machine vision series 3:vs2019 opencv environment configuration
![[Openstack] keystone,nova](/img/de/70b654a29a813c8fe828c4018bd4e7.png)
[Openstack] keystone,nova

编译与链接

不会就坚持58天吧 实现前缀树

Lua语言(stm32+2G/4G模块)和C语言(stm32+esp8266)从字符串中提取相关数据的方法-整理

Record of problems encountered in ROS learning

Locally call tensorboard and Jupiter notebook on the server (using mobaxterm)
随机推荐
The difference between dynamic, VaR and object in fluent
How to query the submission number of a version
Semantic segmentation correlation
"Weilai Cup" 2022 Niuke summer multi school training camp 1 J serval and essay (heuristic merger)
A little understanding of pointer, secondary pointer, wild pointer, pointer as function return value
Nacos registry
C语言力扣第61题之旋转链表。双端队列与构造循环链表
伏英娜:元宇宙就是新一代互联网!
MySQL gets the maximum value record by field grouping
如何查询版本的提交号
C语言:浅谈各种复杂的声明
这个报错是什么鬼啊,不影响执行结果,但是在执行sql时一直报错。。。连接maxComputer是使用
信号处理中的反傅里叶变换(IFFT)原理
The data source is SQL server. I want to configure the incremental data of the last two days of the date field updatedate to add
Pointer of pointer???...
Target detection learning process
Common components of solder pad (2021.4.6)
Beginner: array & String
不会就坚持71天吧 链表排序
How to solve the problem of store ranking?