当前位置:网站首页>11. Backup switch
11. Backup switch
2022-07-29 04:15:00 【Machoul】
Backup switch
Concept
When the switch receives a non routable message , This message will be forwarded to the backup switch , Forward and process by backup switch , Usually the backup switch type is fanout, In this way, all messages can be delivered to the queue bound to it , Then we bind a queue under the backup switch , So all those messages that the original switch cannot be routed , All entered this queue . Of course , We can also set up an alarm queue , Monitor and alarm with independent consumers
A key point : Previously, we used dead letter queue to process failed messages , But these messages have no chance to enter the queue because they are not routable , Therefore, the dead letter queue cannot be used for saving .

demonstration
Add configuration file
server.port=8080
spring.rabbitmq.host=172.16.140.133
spring.rabbitmq.port=5672
spring.rabbitmq.username=admin
spring.rabbitmq.password=123456
spring.rabbitmq.publisher-confirm-type=correlated
Add configuration class
/** * Backup switch configuration class */
@Configuration
public class ConfirmConfig {
public static final String CONFIRM_EXCHANGE = "confirm.exchange";
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
public static final String BACKUP_EXCHANGE_NAME = "backup.exchange";
public static final String BACKUP_QUEUE_NAME = "backup.queue";
public static final String WARNING_QUEUE_NAME = "warning.queue";
@Bean("backupExchange")
public FanoutExchange backupExchange(){
return new FanoutExchange(BACKUP_EXCHANGE_NAME);
}
@Bean("warningQueue")
public Queue warningQueue(){
return QueueBuilder.durable(WARNING_QUEUE_NAME).build();
}
@Bean("backupQueue")
public Queue backupQueue(){
return QueueBuilder.durable(BACKUP_QUEUE_NAME).build();
}
@Bean("confirmQueue")
public Queue confirmQueue(){
return QueueBuilder.durable(CONFIRM_QUEUE_NAME).build();
}
@Bean("confirmExchange")
public DirectExchange confirmExchange(){
ExchangeBuilder exchangeBuilder = ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange",BACKUP_EXCHANGE_NAME);
return exchangeBuilder.build();
}
@Bean
public Binding warningBinding(@Qualifier("warningQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding backupBinding(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange){
return BindingBuilder.bind(queue).to(exchange);
}
@Bean
public Binding queueBinding(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") DirectExchange exchange){
return BindingBuilder.bind(queue).to(exchange).with("key1");
}
}
Add callback interface
/** * Message fallback callback interface */
@Component
@Slf4j
public class MyCallBack implements RabbitTemplate.ConfirmCallback,RabbitTemplate.ReturnsCallback {
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause) {
String id = correlationData != null ? correlationData.getId():"";
if (ack){
log.info(" The switch has received id by :{} The news of ",id);
}else {
log.info(" The switch hasn't received id by :{} The news of , For some reason :{}",id ,cause);
}
}
@Override
public void returnedMessage(ReturnedMessage returned) {
log.info(" news :{} Returned by server , Reasons for return :{}, The switch is :{}, route key:{}",new String(returned.getMessage().getBody()),returned.getReplyText(),returned.getExchange(),returned.getRoutingKey());
}
}
Add ordinary consumers
@Component
@Slf4j
public class ConfirmConsumer {
public static final String CONFIRM_QUEUE_NAME = "confirm.queue";
@RabbitListener(queues = CONFIRM_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info(" Received queue confirm.queue news :{}",msg);
}
}
Add alarm consumer
/** * Alarm message consumers */
@Component
@Slf4j
public class WarningConsumer {
public static final String WARNING_QUEUE_NAME = "warning.queue";
@RabbitListener(queues = WARNING_QUEUE_NAME)
public void receiveMsg(Message message){
String msg = new String(message.getBody());
log.info(" The alarm found a non routable message :{}",msg);
}
}
Add message producers
/** * Message producer */
@RestController
@RequestMapping("/confirm")
@Slf4j
public class ProducerController {
public static final String CONFIRM_EXCHANGE_NAME = "confirm.exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
@Autowired
private MyCallBack myCallBack;
/** * initialization rabbitTemplate */
@PostConstruct
public void init(){
rabbitTemplate.setConfirmCallback(myCallBack);
/** * true: When the switch cannot route messages , The message will be returned to the producer * false: If a message is found to be unable to route , Throw away */
rabbitTemplate.setMandatory(true);
rabbitTemplate.setReturnsCallback(myCallBack);
}
@GetMapping("sendMessage/{message}")
public void sendMessage(@PathVariable String message){
CorrelationData correlationData = new CorrelationData("1");
String routingKey = "key1";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey,message+routingKey,correlationData);
CorrelationData correlationData2 = new CorrelationData("2");
String routingKey2 = "key2";
rabbitTemplate.convertAndSend(CONFIRM_EXCHANGE_NAME,routingKey2,message+routingKey2,correlationData2);
log.info(" Send message content :{}",message);
}
}
Result analysis

- Sent two messages ,confirm.queue Received the message , The alarm queue also received a message
- When mandatory When parameters are used with backup switches , The priority of backup switch is high , The callback of the fallback message will not be executed .
边栏推荐
- Solution: module 'xlrd' has no attribute 'open_ Error reporting of workbook '
- 不会就坚持58天吧 实现前缀树
- Safari's compatibility with Z-index
- VScode连接ssh遇到的问题
- 开课!看smardaten如何分解复杂业务场景
- [kvm] create virtual machine from kickstart file
- Semantic segmentation correlation
- 如何查询版本的提交号
- When array is used as a function parameter, it is better to use the array size as a function parameter
- Shielding ODBC load balancing mode in gbase 8A special scenarios?
猜你喜欢
随机推荐
Compilation and linking
MySQL gets the maximum value record by field grouping
[material delivery UAV] record (ROS + Px4 + yolov5 + esp8266 + steering gear)
SQL server当存储过程接收的参数是int类型时,如何做判断?
Machine vision Series 1: Visual Studio 2019 dynamic link library DLL establishment
Locker 2022.1.1
HC06 HC05 BT
9.延迟队列
Locally call tensorboard and Jupiter notebook on the server (using mobaxterm)
Communication between parent-child components and parent-child components provide and inject
Lua语言(stm32+2G/4G模块)和C语言(stm32+esp8266)从字符串中提取相关数据的方法-整理
AssertionError(“Torch not compiled with CUDA enabled“)
索引的最左前缀原理
MPU6050
Rhel8 patch package production
openFeign异步调用问题
Code or script to speed up the video playback of video websites
Why do I delete the original record (OP d) and then add a new one in Kafka when I update MySQL data
Mmdetection preliminary use
How to write the filter conditions of data integration and what syntax to use? SQL syntax processing bizdate can not be









