当前位置:网站首页>消息可靠性处理
消息可靠性处理
2022-07-27 05:01:00 【一梦无痕bzy】
一、发送方消息可靠投递
发送方在投递消息时为了保证消息投递的可靠性,rabbitmq提供了两种模式。分别为confirm确认模式、return退回模式。
confirm确认模式:消息从发起方到交换机,无论成功与否都会调confirmCallback回调函数
return退回模式:消息从交换机到队列,如果失败会调returnCallback回调函数
springboot代码
1、修改配置文件
spring:
rabbitmq:
host: 192.168.4.150
port: 5672
username: test
password: test
virtual-host: /
#开启发送端确认
publisher-confirm-type: correlated
#开启发送消息抵达队列的确认
publisher-returns: true
#只要抵达队列,以异步发送优先回调returnConfirm
template:
mandatory: true
2、定制rabbitTemplate
package com.example.demo.config;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.HashMap;
import java.util.Map;
@Slf4j
@Configuration
public class RabbitMqConfig {
@Autowired
private CachingConnectionFactory connectionFactory;
@Autowired
private RabbitTemplate rabbitTemplate;
/** * 重新定义rabbitTemplate */
@PostConstruct
public void initRabbitTemplate() {
//设置消息抵达交换机的确认回调。无论成功与否都会回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback() {
/** * @param correlationData 当前消息唯一关联数据 * @param b 是否成功收到 * @param cause 失败的原因 */
@Override
public void confirm(CorrelationData correlationData, boolean b, String cause) {
log.info("抵达交换机的回调------correlationData:{},ack:{},cause:{}", correlationData, b, cause);
}
});
//设置消息抵达队列的确认回调。只有消息没有投递到指定队列,才会回调
rabbitTemplate.setReturnCallback(new RabbitTemplate.ReturnCallback() {
/** * @param message; 投递失败的消息详细信息 * @param replyCode; 回复的状态吗 * @param replyText; 回复的文本内容 * @param exchange; 当时这个消息发给那个交换机 * @param routingKey; 当时这个消息用那个路由键 */
@Override
public void returnedMessage(Message message,int replyCode,String replyText,String exchange,String routingKey) {
log.info("抵达队列的回调------message:{},replyCode:{},replyText:{},exchange:{},routingKey:{}",
message,
replyCode,
replyText,
exchange,
routingKey);
}
});
}
/** * 声明队列 * @return */
@Bean
public Queue test1Queue() {
//durable:队列是否持久化,默认是false。为true时队列开启持久化功能
//exclusive:队列是否设置为排他队列,默认是false。为true时设置为排他队列,只对首次声明它的连接可见,
// 其他连接无法声明相同名称的其他队列,并且在连接断开时自动删除,即使持久化也会被删除
//autoDelete:队列是否自动删除,默认false。为true时,当没有消费者使用此队列,该队列会自动删除
//一般设置一下队列的持久化就好,其余两个就是默认false
return new Queue("test1_Queue",true,false,false);
}
/** * 声明路由模式交换机 * @return */
@Bean
DirectExchange directExchange(){
return new DirectExchange("direct_Exchange", true, false);
}
/** * 交换机与队列绑定 * @return */
@Bean
Binding truckHistoryBinding(){
return BindingBuilder.bind(test1Queue()).to(directExchange()).with(test1Queue().getName());
}
}
测试时只要指定错误的交换机和routingKey便可通过打印的日志看到效果
二、接收方消息可靠接收
consumer ack:消息处理成功从队列删除、没成功队列里一直有。之前文章中消费者写法就是ack自动确认的方式

springboot手动确认代码
package com.example.demo.mq;
import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import java.io.IOException;
@Slf4j
@Component
@RabbitListener(queues = "test5_Queue", ackMode = "MANUAL")
public class Test5QueueReceiver {
@Value("${isReceiver}")
private boolean isReceiver;
@RabbitHandler
public void process(String msg, Channel channel, Message message) {
//判断该消费者是否能处理该消息
if (false == isReceiver) {
try {
//拒绝处理消息
//第一个参数是消息唯一标识;第二个参数为true批量拒绝,为false一个个拒绝;第三个参数为false让rabbitmq销毁被拒绝的消息,为true让rabbitmq不销毁消息,继续发送到其他节点
//详情见:https://blog.csdn.net/d20062056/article/details/109648909?utm_medium=distribute.pc_relevant.none-task-blog-baidujs_title-0&spm=1001.2101.3001.4242
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
} catch (IOException e) {
log.debug("直接确认MQ消息出错。",e);
}
return;
}
try {
// int i = 1 / 0;
System.err.println(msg);
// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("出现错误", e);
// RabbitMQ的ack机制中,第二个参数返回true,表示需要将这条消息投递给其他的消费者重新消费
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
} catch (IOException ioException) {
e.printStackTrace();
}
}
}
}
三、消息补偿(没有固定的方式,合理即可)
之所以有消息补偿是因为上面的可靠投递、可靠接受都是基于rabbitmq服务本身没问题的情况下,但是如果rabbitmq服务本身出现问题,那消息还是没有保障的。
- 生产者发消息时可以往DB中存一份
- 每次消费者处理完则干掉DB中相应的记录
- 每隔一段时间查看一下DB中的记录,重发存在的记录
- 当某个记录存在的时间特别长,可以人为处理
边栏推荐
- 牛客剑指offer--JZ12 矩阵中的路径
- 老子云携手福昕鲲鹏,首次实现3D OFD三维版式文档的重大突破
- 实用小工具: Kotlin 代码片段
- JVM Part 1: memory and garbage collection part 8 - runtime data area - Method area
- 2、 MySQL advanced
- A math problem cost the chip giant $500million
- Acceptance and neglect of events
- Detailed description of binary search tree
- Why is count (*) slow
- 弹球小游戏
猜你喜欢

Installation and template setting of integrated development environment pychar

Tcp server是如何一个端口处理多个客户端连接的(一对一还是一对多)

JVM Part 1: memory and garbage collection part 7 -- runtime data area heap

How to test the payment process?

Derivation and explanation of PBR physical illumination calculation formula

Gradio quickly builds ml/dl Web Services

Introduction to Kali system ARP (network disconnection sniffing password packet capturing)

微淼联合创始人孙延芳:以合规为第一要义,做财商教育“正规军”

支付流程如何测试?

How to sinicize the JMeter interface?
随机推荐
实用小工具: Kotlin 代码片段
TypeScript 详解
How to test the payment process?
JVM Part 1: memory and garbage collection part 10 - runtime data area - direct memory
稀疏数组→五子棋的存盘续盘等操作
String class
OFDM 16 lecture 2-ofdm and the DFT
Introduction to Kali system ARP (network disconnection sniffing password packet capturing)
二、MySQL高级
集合框架的使用
来自“飞人”乔丹的启示!奥尼尔开启的另一个“赛场”
Advantages of smart exhibition hall design and applicable industry analysis
JDBC API 详解
[untitled] I is circularly accumulated under certain conditions. The condition is usually the length of the loop array. When it exceeds the length, the loop will stop. Because the object cannot judge
Deep Qt5 signal slot new syntax
Solution to Dlib installation failure
35. Scroll
Domestic mainstream ERP software market
SQL数据库→约束→设计→多表查询→事务
JVM上篇:内存与垃圾回收篇五--运行时数据区-虚拟机栈