当前位置:网站首页>面试官:谈谈如何防止消息丢失和消息重复
面试官:谈谈如何防止消息丢失和消息重复
2022-08-02 18:04:00 【lxw1844912514】
一、什么是分布式事务
我们的服务器从单机发展到拥有多台机器的分布式系统,各个系统之间需要借助于网络进行通信,原有单机中相对可靠的方法调用以及进程间通信方式已经没有办法使用,同时网络环境也是不稳定的,造成了我们多个机器之间的数据同步问题,这就是典型的分布式事务问题。
在分布式事务中事务的参与者、支持事务的服务器、资源服务器以及事务管理器分别位于不同的分布式系统的不同节点之上。分布式事务就是要保证不同节点之间的数据一致性。
二、常见的分布式事务解决方案
1、2PC(二阶段提交)方案 - 强一致性
2、3PC(三阶段提交)方案
3、TCC (Try-Confirm-Cancel)事务 - 最终一致性
4、Saga事务 - 最终一致性
5、本地消息表 - 最终一致性
6、MQ事务 - 最终一致性
这里重点关注下使用消息队列实现分布式的一致性
三、基于 MQ 实现的分布式事务
1.本地消息表-最终一致性
消息的生产方,除了维护自己的业务逻辑之外,同时需要维护一个消息表。这个消息表里面记录的就是需要同步到别的服务的信息,当然这个消息表,每个消息都有一个状态值,来标识这个消息有没有被成功处理。
发送方的业务逻辑以及消息表中数据的插入将在一个事务中完成,这样避免了业务处理成功 + 事务消息发送失败
,或业务处理失败 + 事务消息发送成功
,这个问题。
举个栗子:
我们假定目前有两个服务,订单服务,购物车服务,用户在购物车中对几个商品进行合并下单,之后需要清空购物车中刚刚已经下单的商品信息。
1、消息的生产方也就是订单服务,完成了自己的逻辑(对商品进行下单操作)然后把这个消息通过 mq 发送到需要进行数据同步的其他服务中,也就是我们栗子中的购物车服务。
2、其他服务(购物车服务)会监听这个队列;
2、很久没收到这个消息,这种情况是不会发生的,消息的发送方会有一个定时的任务,会定时重试发送消息表中还没有处理的消息;
3、消息的生产方(订单服务)如果收到消息回执;
1、成功的话就修改本次消息已经处理完,也就是本次分布式事务的同步已经完成;2、如果消息的结果是执行失败,同时在本地回滚本次事务,标识消息已经处理完成;
1、服务器处理消息需要是幂等的,消息的生产方和接收方都需要做到幂等性;
2、发送放需要添加一个定时器来遍历重推未处理的消息,避免消息丢失,造成的事务执行断裂。
优点:
1、在设计层面上实现了消息数据的可靠性,不依赖消息中间件,弱化了对 mq 特性的依赖。
2、简单,易于实现。
缺点:
主要是需要和业务数据绑定到一起,耦合性比较高,使用相同的数据库,会占用业务数据库的一些资源。
2.MQ事务-最终一致性
下面分析下几种消息队列对事务的支持
(1)RocketMQ中如何处理事务
RocketMQ 中的事务,它解决的问题是,确保执行本地事务和发消息这两个操作,要么都成功,要么都失败。并且,RocketMQ 增加了一个事务反查的机制,来尽量提高事务执行的成功率和数据一致性。
主要是两个方面,正常的事务提交和事务消息补偿
正常的事务提交
1、发送消息(half消息),这个 half 消息和普通消息的区别,在事务提交 之前,对于消费者来说,这个消息是不可见的。
2、MQ SERVER
写入信息,并且返回响应的结果;
3、根据MQ SERVER
响应的结果,决定是否执行本地事务,如果MQ SERVER
写入信息成功执行本地事务,否则不执行;
4、根据本地事务执行的状态,决定是否对事务进行 Commit 或者 Rollback。MQ SERVER
收到 Commit,之后就会投递该消息到下游的订阅服务,下游的订阅服务就能进行数据同步,如果是 Rollback 则该消息就会被丢失;
如果MQ SERVER
没有收到 Commit 或者 Rollback 的消息,这种情况就需要进行补偿流程了
补偿流程
1、MQ SERVER
如果没有收到来自消息发送方的 Commit 或者 Rollback 消息,就会向消息发送端也就是我们的服务器发起一次查询,查询当前消息的状态;
2、消息发送方收到对应的查询请求,查询事务的状态,然后把状态重新推送给MQ SERVER
,MQ SERVER
就能之后后续的流程了。
相比于本地消息表来处理分布式事务,MQ 事务是把原本应该在本地消息表中处理的逻辑放到了 MQ 中来完成。
(2)Kafka中如何处理事务
Kafka 中的事务解决问题,确保在一个事务中发送的多条信息,要么都成功,要么都失败。也就是保证对多个分区写入操作的原子性。Kafka
通过配合 Kafka 的幂等机制来实现 Kafka 的 Exactly Once
,满足了读取-处理-写入
这种模式的应用程序。当然 Kafka 中的事务主要也是来处理这种模式的。
什么是读取-处理-写入
模式呢?
它的实现原理和 RocketMQ 的事务是差不多的,都是基于两阶段提交来实现的,在实现上可能更麻烦
Kafka 集群中也有一个特殊的用于记录事务日志的主题,里面记录的都是事务的日志。同时会有多个协调者的存在,每个协调者负责管理和使用事务日志中的几个分区。这样能够并行的执行事务,提高性能。
1、协调者设置事务的状态为PrepareCommit,写入到事务日志中;
2、协调者在每个分区中写入事务结束的标识,然后客户端就能把之前过滤的未提交的事务消息放行给消费端进行消费了;
1、协调者设置事务的状态为PrepareAbort,写入到事务日志中;
2、协调者在每个分区中写入事务回滚的标识,然后之前未提交的事务消息就能被丢弃了;
(3)RabbitMQ中的事务
RabbitMQ 中事务解决的问题是确保生产者的消息到达MQ SERVER
,这和其他 MQ 事务还是有点差别的,这里也不展开讨论了。
消息防丢失
先来分析下一条消息在 MQ 中流转所经历的阶段。
生产阶段:生产者产生消息,通过网络发送到 Broker 端。
存储阶段:Broker 拿到消息,需要进行落盘,如果是集群版的 MQ 还需要同步数据到其他节点。
消费阶段:消费者在 Broker 端拉数据,通过网络传输到达消费者端。
生产阶段防止消息丢失
发生网络丢包、网络故障等这些会导致消息的丢失
RabbitMQ 中的防丢失措施
在生产者发送消息之前,通过channel.txSelect
开启一个事务,接着发送消息, 如果消息投递 server 失败,进行事务回滚channel.txRollback
,然后重新发送, 如果 server 收到消息,就提交事务channel.txCommit
不过使用事务性能不好,这是同步操作,一条消息发送之后会使发送端阻塞,以等待RabbitMQ Server
的回应,之后才能继续发送下一条消息,生产者生产消息的吞吐量和性能都会大大降低。
3、使用发送确认机制。
确认机制有三种类型
1、同步确认
2、批量确认
3、异步确认
同步模式的效率很低,因为每一条消息度都需要等待确认好之后,才能处理下一条;
批量确认模式相比同步模式效率是很高,不过有个致命的缺陷,一旦回复确认失败,当前确认批次的消息会全部重新发送,导致消息重复发送;
异步模式就是个很好的选择了,不会有同步模式的阻塞问题,同时效率也很高,是个不错的选择。
Kafka 中的防丢失措施
Kafaka 中引入了一个 broker。broker 会对生产者和消费者进行消息的确认,生产者发送消息到 broker,如果没有收到 broker 的确认就可以选择继续发送。
只要 Producer 收到了 Broker 的确认响应,就可以保证消息在生产阶段不会丢失。有些消息队列在长时间没收到发送确认响应后,会自动重试,如果重试再失败,就会以返回值或者异常的方式告知用户。
只要正确处理 Broker 的确认响应,就可以避免消息的丢失。
RocketMQ 中的防丢失措施
使用 SYNC 的发送消息方式,等待 broker 处理结果
同步发送:Producer 向 broker 发送消息,阻塞当前线程等待 broker 响应 发送结果。
异步发送:Producer 首先构建一个向 broker 发送消息的任务,把该任务提交给线程池,等执行完该任务时,回调用户自定义的回调函数,执行处理结果。
Oneway发送:Oneway 方式只负责发送请求,不等待应答,Producer 只负责把请求发出去,而不处理响应结果。
存储阶段
在存储阶段正常情况下,只要 Broker 在正常运行,就不会出现丢失消息的问题,但是如果 Broker 出现了故障,比如进程死掉了或者服务器宕机了,还是可能会丢失消息的。
RabbitMQ 中的防丢失措施
防止在存储阶段消息额丢失,可以做持久化,防止异常情况(重启,关闭,宕机)。。。
RabbitMQ 持久化中有三部分:
交换器的持久化
交换器的持久化,是通过在声明队列时将 durable 参数置为 true 实现的,不设置持久化的话,交换器的信息将会丢失。
队列持久化
队列的持久化,是通过在声明队列时将 durable 参数置为 true 实现的,队列的持久化能保证其本身的元数据不会因异常情况而丢失,但是并不能保证内部所存储的消息不会丢失。
消息的持久化
消息的持久化,在投递时指定 delivery_mode=2
(1是非持久化),消息的持久化,需要配合队列的持久,只设置消息的持久化,重启之后队列消失,继而消息也会丢失。所以如果只设置消息持久化而不设置队列的持久化意义不大。
对于持久化,如果所有的消息都设置持久化,会影响写入的性能,所以可以选择对可靠性要求比较高的消息进行持久化处理。
比如数据在落盘的过程中宕机了,消息还没及时同步到内存中,这也是会丢数据的,这种问题可以通过引入镜像队列来解决。
镜像队列的作用:引入镜像队列,可已将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能够自动切换到镜像中的另一个节点上来保证服务的可用性。(更细节的这里不展开讨论了)
Kafka 中的防丢失措施
操作系统本身有一层缓存,叫做 Page Cache,当往磁盘文件写入的时候,系统会先将数据流写入缓存中。
1、控制竞选分区 leader 的 Broker。如果一个 Broker 落后原先的 Leader 太多,那么它一旦成为新的 Leader,必然会造成消息的丢失。
2、控制消息能够被写入到多个副本中才能提交,这样避免上面的问题1。
RocketMQ 中的防丢失措施
1、将刷盘方式改成同步刷盘;高并发架构消息队列面试题(全面解剖面试官心理)
2、对于多个节点的 Broker,需要将 Broker 集群配置成:至少将消息发送到 2 个以上的节点,再给客户端回复发送确认响应。这样当某个 Broker 宕机时,其他的 Broker 可以替代宕机的 Broker,也不会发生消息丢失。
消费阶段
消费阶段就很简单了,如果在网络传输中丢失,这个消息之后还会持续的推送给消费者,在消费阶段我们只需要控制在业务逻辑处理完成之后再去进行消费确认就行了。
总结:对于消息的丢失,也可以借助于本地消息表的思路,消息产生的时候进行消息的落盘,长时间未处理的消息,使用定时重推到队列中。
消息重复发送
消息在 MQ 中的传递,大致可以归类为下面三种:
1、At most once: 至多一次。消息在传递时,最多会被送达一次。是不安全的,可能会丢数据。
2、At least once: 至少一次。消息在传递时,至少会被送达一次。也就是说,不允许丢消息,但是允许有少量重复消息出现。
3、Exactly once:恰好一次。消息在传递时,只会被送达一次,不允许丢失也不允许重复,这个是最高的等级。
大部分消息队列满足的都是At least once
,也就是可以允许重复的消息出现。
我们消费者需要满足幂等性,通常有下面几种处理方案
1、利用数据库的唯一性
根据业务情况,选定业务中能够判定唯一的值作为数据库的唯一键,新建一个流水表,然后执行业务操作和流水表数据的插入放在同一事务中,如果流水表数据已经存在,那么就执行失败,借此保证幂等性。也可先查询流水表的数据,没有数据然后执行业务,插入流水表数据。不过需要注意,数据库读写延迟的情况。
2、数据库的更新增加前置条件
3、给消息带上唯一ID
每条消息加上唯一ID,利用方法1中通过增加流水表,借助数据库的唯一性来处理重复消息的消费。
边栏推荐
- Code Inspection for DevOps
- 从技术全景到场景实战,透析「窄带高清」的演进突破
- 无法超越的100米_百兆以太网传输距离_网线有哪几种?
- 数据治理:数据集成和应用模式的演进
- 如何应对机器身份带来的安全风险
- Monitor is easy to Mars debut: distributed operations help TOP3000 across management gap
- IDEA相关配置(特别完整)看完此篇就将所有的IDEA的相关配置都配置好了、设置鼠标滚轮修改字体大小、设置鼠标悬浮提示、设置主题、设置窗体及菜单的字体及字体大小、设置编辑区主题、通过插件更换主题
- 天翼云4.0来了!千城万池,无所不至!
- 洛谷P1966 火柴排队
- 宝塔搭建实测-基于ThinkPHP5.1的wms进销存源码
猜你喜欢
sed 命令
Taking advantage of cloud-network integration, e-Surfing Cloud has paved the way for digital transformation for government and enterprises
HDF驱动框架的API(1)
E-Surfing Cloud 4.0 Distributed Cloud Enables Digital Transformation of Thousands of Industries
Technical life | How to draw a big picture of business
Why young people are snapping up domestic iPhone, because it is much cheaper and more populist
LeetCode 2353. 设计食物评分系统(sortedcontainers)
mongodb的游标
redis总结_基础
NeRF: The Secret of 3D Reconstruction Technology in the Popular Scientific Research Circle
随机推荐
Gear 月度更新|6 月
shell中awk命令的if条件语句引入外置变量
How to build a quasi-real-time data warehouse?
cache2go-源码阅读
Enterprise cloud cost control, are you really doing it right?
如何应对机器身份带来的安全风险
记一次 .NET 某工控自动化控制系统 卡死分析
企业云成本管控,你真的做对了吗?
E-Surfing Cloud 4.0 Distributed Cloud Enables Digital Transformation of Thousands of Industries
编译型语言与解释型语言的区别
TSF微服务治理实战系列(一)——治理蓝图
Code Inspection for DevOps
新公链时代的跨链安全性解决方案
HDF驱动框架的API(3)
POE交换机全方位解读(下)
洛谷P2345 MooFest G
DevOps之代码检查
下载mysql的源码包
C# 术语
通信大学生走向岗位,哪些技能最实用?