当前位置:网站首页>11. Backup switch
11. Backup switch
2022-07-29 04:15:00 【Machoul】
Backup switch
Concept
When the switch receives a non routable message , This message will be forwarded to the backup switch , Forward and process by backup switch , Usually the backup switch type is fanout, In this way, all messages can be delivered to the queue bound to it , Then we bind a queue under the backup switch , So all those messages that the original switch cannot be routed , All entered this queue . Of course , We can also set up an alarm queue , Monitor and alarm with independent consumers
A key point : Previously, we used dead letter queue to process failed messages , But these messages have no chance to enter the queue because they are not routable , Therefore, the dead letter queue cannot be used for saving .
demonstration
Add configuration file
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
Add configuration class
/** * Backup switch configuration class */
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME);
return exchangeBuilder.build();
}
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding backupBinding(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
Add callback interface
/** * Message fallback callback interface */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId():"";
if (ack){
log.info(" The switch has received id by :{} The news of ",id);
}else {
log.info(" The switch hasn't received id by :{} The news of , For some reason :{}",id ,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info(" news :{} Returned by server , Reasons for return :{}, The switch is :{}, route key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
}
}
Add ordinary consumers
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info(" Received queue confirm.queue news :{}",msg);
}
}
Add alarm consumer
/** * Alarm message consumers */
@Component
@Slf4j
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info(" The alarm found a non routable message :{}",msg);
}
}
Add message producers
/** * Message producer */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
/** * initialization rabbitTemplate */
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
/** * true: When the switch cannot route messages , The message will be returned to the producer * false: If a message is found to be unable to route , Throw away */
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey2,correlationData2);
log.info(" Send message content :{}",message);
}
}
Result analysis
- Sent two messages ,confirm.queue Received the message , The alarm queue also received a message
- When mandatory When parameters are used with backup switches , The priority of backup switch is high , The callback of the fallback message will not be executed .
边栏推荐
- opengauss预检查安装
- 有一种密码学专用语言叫做ASN.1
- Don't the JDBC SQL connector of the big guys Flink now support all databases, such as vertica?
- C语言:typedef知识点总结
- 全屋WiFi方案:Mesh路由器组网和AC+AP
- The principle of inverse Fourier transform (IFFT) in signal processing
- Design of environment detection system based on STM32 and Alibaba cloud
- Is there any way for Youxuan database to check the log volume that the primary cluster transmits to the standby cluster every day?
- Code or script to speed up the video playback of video websites
- Locker 2022.1.1
猜你喜欢
Install the laser of ROS_ scan_ Problems encountered in match library (I)
通过js来实现一元二次方程的效果,输入a,b,c系数后可计算出x1和x2的值
Lua语言(stm32+2G/4G模块)和C语言(stm32+esp8266)从字符串中提取相关数据的方法-整理
Some problems about pointers
Const char* and char*, string constants
Fu Yingna: Yuan universe is the new generation of Internet!
15.federation
Blood cases caused by < meta charset=UTF-8> -- Analysis of common character codes
Object detection: object_ Detection API +ssd target detection model
基于STM32和阿里云的环境检测系统设计
随机推荐
How to set the SQL execution timeout for flick SQL
(.*?) regular expression
Interview notes of a company
Machine vision Series 1: Visual Studio 2019 dynamic link library DLL establishment
数据集成这个地方的过滤条件该咋写,用的啥语法?sql语法处理bizdate可以不
Const read only variable constant
The structure pointer must be initialized, and the pointer must also be initialized
The data source is SQL server. I want to configure the incremental data of the last two days of the date field updatedate to add
为什么opengauss启动的时候这么多的unknown?
Taobao product details interface (product details page data interface)
小程序:区域滚动、下拉刷新、上拉加载更多
12.优先级队列和惰性队列
Change the value of the argument by address through malloc and pointer
不会就坚持59天吧 替换单词
不会就坚持63天吧 最大的异或
Cad2020 introductory learning (2021.4.13)
Const char* and char*, string constants
Beginner: array & String
全屋WiFi方案:Mesh路由器组网和AC+AP
AssertionError(“Torch not compiled with CUDA enabled“)