当前位置:网站首页>分布式事务解决方案
分布式事务解决方案
2022-06-10 03:45:00 【liushangzaibeijing】
文章目录
前言
当业务量级扩大之后的分库,以及微服务落地之后的业务服务化,都会产生分布式数据不一致的问题。既然本地事务无法满足需求,因此分布式事务就要登上舞台。
我们有必要先来了解下 CAP 原则和 BASE 理论。CAP 原则是 Consistency(一致性)、Availablity(可用性)和 Partition-tolerance(分区容错性)的缩写,它是分布式系统中的平衡理论。
- 一致性要求所有节点每次读操作都能保证获取到最新数据;
- 可用性要求无论任何故障产生后都能保证服务仍然可用;
- 分区容错性要求被分区的节点可以正常对外提供服务。

事实上,任何系统只可同时满足其中二个,无法三者兼顾,分区容错性是一个最基本的要求。所以系统解决方案。放弃一定可用性保证强一致性。采用最终一致性保证高可用。
业内比较常用的分布式事务解决方案:
- 强一致性的两阶段提交协议,三阶段提交协议,比如seata.
- 最终一致性的可靠事件模式、补偿模式,阿里的 TCC 模式。
简介
XA是由X/Open组织提出的分布式事务的规范。 XA规范主要定义了**(全局)事务管理器和(局 部)资源管理器(RM)**之间的接口。主流的关系型 数据库产品都是实现了XA接口的。
XA接口是双向的系统接口,在事务管理器 (TM)以及一个或多个资源管理器(RM)之 间形成通信桥梁。
XA之所以需要引入事务管理器是因为,在分布 式系统中,从理论上讲两台机器理论上无法达 到一致的状态,需要引入一个单点进行协调。
由全局事务管理器管理和协调的事务,可以跨 越多个资源(如数据库或JMS队列)和进程。 全局事务管理器一般使用 XA 二阶段提交协议 与数据库进行交互。

组成部分
TC (Transaction Coordinator) 事务协调者维护全局和分支事务的状态,驱动全局事务提交或回滚。
RM资源管理器(resource manager):用来管理系统资源,是通向事务资源的途径。数据库就是一种资源管理器。资源管理还应该具有管理事务提交或回滚的能力。
TM事务管理器(transaction manager):事务管理器是分布式事务的核心管理者。事务管理器与每个资源管理器(resource manager)进行通信,协调并完成事务的处理。事务的各个分支由唯一命名进行标识
Xid 接口 Xid, Xid 接口是 X/Open 事务标识符 XID 结构的 Java 映射。此接口指定三个访问器方法,以检索全局事务格式 ID、全局事务 ID 和分支限定符。Xid 接口供事务管理器和资源管理器使用。此接口对应用程序不可见。
- 总体流程
1.TM向TC注册全局事务
2.调用各资源管理器(即对各数据库数据进行操作), RM向TC注册分支事务, 此时sql会暂存,不会立即提交
3.TM向TC下达全局事务提交, 此时TC会依次执行sql
4.如果有分支事务失败了,则会对之前提交的sql进行回滚
二阶段提交
XA需要两阶段提交: prepare 和 commit.
第一阶段为 准备(prepare)阶段。即所有的参与者准备执行事务并锁住需要的资源。参与者ready时,向transaction manager报告已准备就绪。
第二阶段为提交阶段(commit)。当transaction manager确认所有参与者都ready后,向所有参与者发送commit命令。
分布式事务的两阶段提交是把整个事务提交分为 prepare 和 commit 两个阶段。以电商系统为例,分布式系统中有订单、账户和库存三个服务,如下图:

第一阶段,事务协调者向事务参与者发送 prepare 请求,事务参与者收到请求后,如果可以提交事务,回复 yes,否则回复 no。
第二阶段,如果所有事务参与者都回复了 yes,事务协调者向所有事务参与者发送 commit 请求,否则发送 rollback 请求。
存在的问题
- 本地事务在 prepare 阶段锁定资源,如果有其他事务也要修改 xiaoming 这个账户,就必须等待前面的事务完成。这样就造成了系统性能下降。
- 协调节点单点故障,如果第一个阶段 prepare 成功了,但是第二个阶段协调节点发出 commit 指令之前宕机了,所有服务的数据资源处于锁定状态,事务将无限期地等待。
- 数据不一致,如果第一阶段 prepare 成功了,但是第二阶段协调节点向某个节点发送 commit 命令时失败,就会导致数据不一致。
三阶段提交
为了解决两阶段提交的问题,三阶段提交做了改进:
- 在协调节点和事务参与者都引入了超时机制。
- 第一阶段的 prepare 阶段分成了两步,canCommi 和 preCommit。
如下图:

引入 preCommit 阶段后,协调节点会在 commit 之前再次检查各个事务参与者的状态,保证它们的状态是一致的。但是也存在问题,那就是如果第三阶段发出 rollback 请求,有的节点没有收到,那没有收到的节点会在超时之后进行提交,造成数据不一致。
MySQL对XA的支持
MySQL 从5.0.3开始支持XA分布式事务,且只有InnoDB存储引擎支持。MySQL Connector/J 从5.0.0版本之后开始直接提供对XA的支持。
需要注意的是, 在DTP模型中,mysql属于资源管理器(RM)。而一个完整的分布式事务中,一般会存在多个RM,由事务管理器TM来统一进行协调。因此,这里所说的mysql对XA分布式事务的支持,一般指的是单台mysql实例如何执行自己的事务分支。
XA 事务SQL语法
https://dev.mysql.com/doc/refman/5.7/en/xa-statements.html
XA {START|BEGIN} xid [JOIN|RESUME] //开启XA事务,如果使用的是XA START而不是XA BEGIN,那么不支持[JOIN|RESUME],xid是一个唯一值,表示事务分支标识符
XA END xid [SUSPEND [FOR MIGRATE]] //结束一个XA事务,不支持[SUSPEND [FOR MIGRATE]]
XA PREPARE xid 准备提交
XA COMMIT xid [ONE PHASE] //提交,如果使用了ONE PHASE,则表示使用一阶段提交。两阶段提交协议中,如果只有一个RM参与,那么可以优化为一阶段提交
XA ROLLBACK xid //回滚
XA RECOVER [CONVERT XID] //列出所有处于PREPARE阶段的XA事务
下面是一个简单的msyql XA事务案例,演示了mysql作为全局事务中的一个事务分支,将一行记录插入到一个表中
-- 开启一个分布式事务
XA START "xatest";
-- 插入记录
INSERT INTO USER(id,NAME,age) VALUES(12,"tianshozuhi",22);
-- 完成分布式事务
XA END "xatest";
-- 分布式事务提交 阶段一 prepare阶段
XA PREPARE "xatest";
-- 分布式事务提交 阶段二 COMMIT阶段
XA COMMIT "xatest";
-- 分布式事务提交 阶段二 rollback阶段
XA ROLLBACK "xatest"
-- 列出所有处于PREPARE阶段的XA事务
XA RECOVER
XA事务执行流程
XA事务的状态,按照如下步骤进行展开
使用XA START来启动一个XA事务,并把它置于
ACTIVE状态。对于一个ACTIVE状态的 XA事务,我们可以执行构成事务的SQL语句,然后发布一个XA END语句。XA END把事务放入
IDLE状态。对于一个IDLE 状态XA事务,可以执行一个XA PREPARE语句或一个XA COMMIT…ONE PHASE语句:
XA PREPARE把事务放入
PREPARED状态。在此点上的XA RECOVER语句将在其输出中包括事务的xid值,因为XA RECOVER会列出处于PREPARED状态的所有XA事务。XA COMMIT…ONE PHASE用于预备和提交事务。xid值将不会被XA RECOVER列出,因为事务终止。
对于一个PREPARED状态的 XA事务,您可以发布一个XA COMMIT语句来提交和终止事务,或者发布XA ROLLBACK来回滚并终止事务。
针对一个给定的客户端连接而言,XA事务和非XA事务(即本地事务)是互斥的。例如,已经执行了”XA START”命令来开启一个XA事务,则本地事务不会被启动,直到XA事务已经被提交或被 回滚为止。相反的,如果已经使用START TRANSACTION启动一个本地事务,则XA语句不能被使用,直到该事务被提交或被 回滚为止。
最后,如果一个XA事务处于ACTIVE状态,是不能直接进行提交的,如果这样做,mysql会抛出异常:
ERROR 1399 (XAE07): XAER_RMFAIL: The command cannot be executed
when global transaction is in the ACTIVE state
XID数据结构
mysql中使用xid来作为一个事务分支的标识符。事实上xid作为事务分支标识符是在XA规范中定义的,
XA规范定义了一个xid有4个部分组成:
- gtrid: 全局事务标识符(global transaction identifier),最大不能超过64字节
- bqual: 分支限定符(branch qualifier),最大不能超过64字节
- data: xid的值,其是 gtrid和bqual拼接后的内容。因为gtrid和bqual最大都是64个字节,因此data的最大长度为128
- formatId: 而formatId的作用就是记录gtrid、bqual的格式,。XA规范建议使用OSI CCR风格定义其格式
通过jdbc操作mysql xa事务
MySQL Connector/J 从5.0.0版本之后开始直接提供对XA的支持,也就是提供了java版本XA接口的实现。意味着我们可以直接通过java代码来执行mysql xa事务。
需要注意的是,业务开发人员在编写代码时,不应该直接操作这些XA事务操作的接口。因为在DTP模型中,RM上的事务分支的开启、结束、准备、提交、回滚等操作,都应该是由事务管理器TM来统一管理。
由于目前我们还没有接触到TM,那么我们不妨做一回"人肉事务管理器",用你智慧的大脑,来控制多个mysql实例上xa事务分支的执行,提交/回滚。通过直接操作这些接口,你将对xa事务有更深刻的认识。
public class MysqlXAConnectionTest {
/** * 分布式事务管理器 * @throws SQLException */
@Test
public void testTM() throws SQLException {
//true表示打印XA语句,,用于调试
boolean logXaCommands = true;
Connection connOne = getConnection("jdbc:mysql://182.92.189.235:3310/demo","root", "xiu123");
Connection connTwo = getConnection("jdbc:mysql://182.92.189.235:3310/demo2","root", "xiu123");
XAResource rm1 = getXAResource(logXaCommands,connOne);
XAResource rm2 = getXAResource(logXaCommands,connTwo);
// AP请求TM执行一个分布式事务,TM生成全局事务id
byte[] gtrid = "g12345".getBytes();
int formatId = 1;
try {
// ==============分别执行RM1和RM2上的事务分支====================
// TM生成rm1上的事务分支id
byte[] bqual1 = "b00001".getBytes();
Xid xid1 = new MysqlXid(gtrid, bqual1, formatId);
执行rm1上的事务分支
rm1.start(xid1, XAResource.TMNOFLAGS);
PreparedStatement ps1 = connOne.prepareStatement("INSERT into `user`(name,age) VALUES ('tianshouzhi','23')");
ps1.execute();
rm1.end(xid1, XAResource.TMSUCCESS);
// TM生成rm2上的事务分支id
byte[] bqual2 = "b00002".getBytes();
Xid xid2 = new MysqlXid(gtrid, bqual2, formatId);
// 执行rm2上的事务分支
rm2.start(xid2, XAResource.TMNOFLAGS);
PreparedStatement ps2 = connTwo.prepareStatement("INSERT INTO `order`(order_num) VALUES ('2')");
ps2.execute();
rm2.end(xid2, XAResource.TMSUCCESS);
// ===================两阶段提交================================
// phase1:询问所有的RM 准备提交事务分支
int rm1_prepare = rm1.prepare(xid1);
int rm2_prepare = rm2.prepare(xid2);
// phase2:提交所有事务分支
//TM判断有2个事务分支,所以不能优化为一阶段提交
boolean onePhase = false;
//所有事务分支都prepare成功,提交所有事务分支
if (rm1_prepare == XAResource.XA_OK
&& rm2_prepare == XAResource.XA_OK
) {
rm1.commit(xid1, onePhase);
rm2.commit(xid2, onePhase);
} else {
//如果有事务分支没有成功,则回滚
rm1.rollback(xid1);
rm1.rollback(xid2);
}
} catch (XAException e) {
// 如果出现异常,也要进行回滚
e.printStackTrace();
}
}
/** * 获取数据库连接 */
private Connection getConnection(String url,String name,String password) throws SQLException {
return DriverManager.getConnection(url, name, password);
}
/** * 获取XA资源 */
private XAResource getXAResource(boolean logXaCommands,Connection connection) throws SQLException {
// 获得资源管理器操作接口实例 RM1
XAConnection xaConn1 = new MysqlXAConnection((com.mysql.jdbc.Connection) connection, logXaCommands);
return xaConn1.getXAResource();
}
在这个案例中,演示了2个RM的情况下分布式事务的工作流程。因为我们充当了"人肉事务管理器”TM,因此很多本应该由TM来处理的工作处理细节也直接体现在上述代码中,如:生成全局事务id和分支事务id、在RM上开启事务分支、两阶段提交等。虽然我们自己作为"人肉事务管理器”是很不可靠的,但是上述代码可以让我们了解一个TM内部的主要工作流程是怎样的。
在实际开发中,代码绝不会像上表面那样复杂,因为我们通常都会使用第三方或者容器提供的TM功能,因此在操作分布式事务时,代码可以得到极大的简化。
MySQL Connector/J XA事务支持源码简单分析**
最后,我们对上述源码进行一下简单的分析。在前面直接使用mysql命令操作的时候,我们通过"XA START xid”等XA命令来执行XA事务。而在上述java代码中,我们是获取了一个普通的链接Connection之后,封装成了MysqlXAConnection。如下:
com.mysql.jdbc.jdbc2.optional.MysqlXAConnection
public class MysqlXAConnection extends MysqlPooledConnection implements XAConnection, XAResource {
private com.mysql.jdbc.Connection underlyingConnection;
private Log log;
protected boolean logXaCommands;
//构造方法
public MysqlXAConnection(com.mysql.jdbc.Connection connection, boolean logXaCommands) throws SQLException {
super(connection);
this.underlyingConnection = connection;
this.log = connection.getLog();
this.logXaCommands = logXaCommands;
}
}
可以看到,MysqlXAConnection本身就实现了XAResource接口,因此当调用getXAResource()方法时,返回的就是其自己
com.mysql.jdbc.jdbc2.optional.MysqlXAConnection#getXAResource
public XAResource getXAResource() throws SQLException {
return this;
}
之后,我们调用XAResource的start方法来开启XA事务。start方法源码如下所示:
com.mysql.jdbc.jdbc2.optional.MysqlXAConnection#start
public void start(Xid xid, int flags) throws XAException {
//1、封装XA命令
StringBuilder commandBuf = new StringBuilder(MAX_COMMAND_LENGTH);
commandBuf.append("XA START ");
appendXid(commandBuf, xid);
//2、添加flag标记
switch (flags) {
case TMJOIN:
commandBuf.append(" JOIN");
break;
case TMRESUME:
commandBuf.append(" RESUME");
break;
case TMNOFLAGS:
// no-op
break;
default:
throw new XAException(XAException.XAER_INVAL);
}
//执行命令
dispatchCommand(commandBuf.toString());
this.underlyingConnection.setInGlobalTx(true);
}
可以看到,当我们调用MysqlXAConnection的start方法时,实际上就是执行了一个”XA START xid [JOIN|RESUME]”命令而已,和我们直接在命令行中的操作是一样一样的,只不过通过封装简化了我们的操作。
对于MysqlXAConnection的end、prepare、commit、rollback等方法,也都是是类似的,不再赘述。
最后提示, MySQL Connector/J 中提供的XA操作接口,如上面提到的XAConnection、XAResource、Xid等,实际上都遵循了JTA规范。
分布式事务解决方案
seata框架(强一致性)
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。Seata 将为用户提供了 AT、TCC、SAGA 和 XA 事务模式,为用户打造一站式的分布式解决方案。其中AT模式是Seata主推的模式,是基于二阶段提交来实现的。
以用户购买商品的业务逻辑。整个业务逻辑由3个微服务提供支持:
- 仓储服务(storage-service):对给定的商品扣除仓储数量。
- 订单服务(order-service):根据采购需求创建订单。
- 帐户服务(account-service):从用户帐户中扣除余额。

安装seata服务
解压服务 tar -zxvf seata-server-1.5.1.tar.gz -C /usr/local/seata/

修改配置执行初始化脚本
- 启动服务 sh seata-server.sh 服务安装完成。
用例
项目架构如下:
- seata server 1.5.1
- dubbo
- spring boot
AT模式下全局事务处理
AT模式需要依赖不同的业务数据,所以需要在不同的微服务对应的数据库下创建undo_log表,用于全局事务回滚。
测试正常结果
localhost:8084/order/create
请求参数
{"id":null,"userId":"1","commodityCode":"P001","count":2,"money":140}
响应参数
{
"code": "0",
"msg": "success",
"data": "下单成功!"
}
- 订单创建成功

金额扣减成功

库存扣减成功x

某个服务出错 调用
调用结果如下:
{
"code": "1",
"msg": "error",
"data": "扣款失败,可能是余额不足!"
}
同时订单创建失败、库存和金额都没有扣减
TCC模式下全局事务处理
还是以下单操作为例。
TCC 模式,不依赖于底层数据资源的事务支持,比较灵活但是
- 一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
- 二阶段 commit 行为:调用 自定义 的 commit 逻辑。
- 二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。
@LocalTCC
public interface OrderTCCService {
/** * 创建订单(try逻辑) */
@TwoPhaseBusinessAction(name = "create", commitMethod = "confirm", rollbackMethod = "cancel")
void create(@BusinessActionContextParameter(paramName = "order") Order order);
/** * Try逻辑,@TwoPhaseBusinessAction中的name属性要与当前方法名一致,用于指定Try逻辑对应的方法 */
void prepare(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);
/** * 二阶段Confirm确认方法,可以另命名,但要保证和上面commitMethod的值一致 * @param context 上下文,可以传递try方法的参数 * @return boolean 执行是否成功 */
boolean confirm(BusinessActionContext context);
/** * 二阶段回滚方法,方法名要和上面rollbackMethod的值一致 * 这里进行订单的取消、退款和库存恢复的回滚操作 */
boolean cancel(BusinessActionContext context);
}
- @LocalTCC:作用于服务接口上,表示实现该接口的实现类被 seata 来管理,seata 根据事务的状态,自动调用我们定义的方法,如果没问题则调用 Commit 方法,否则调用 Rollback 方法。
- @TwoPhaseBusinessAction:该注解用在接口的 Try 方法上,name 为 tcc 方法的 bean 名称,需要全局唯一,一般写方法名即可;commitMethod指定事务成功后的commit方法;rollbackMethod 指定事务失败后的rollback方法。
- @BusinessActionContextParameter: 该注解用来修饰 Try 方法的入参,被修饰的入参可以在 Commit 方法和 Rollback 方法中通过 BusinessActionContext 获取。
SAGA 模式事务处理
Saga是这一篇数据库论文saga提到的一个方案。其核心思想是将长事务拆分为多个本地短事务,由Saga事务协调器协调,如果正常结束那就正常完成,如果某个步骤失败,则根据相反顺序一次调用补偿操作。
SAGA事务的特点:
- 并发度高,不用像XA事务那样长期锁定资源
- 需要定义正常操作以及补偿操作,开发量比XA大
- 一致性较弱,对于转账,可能发生A用户已扣款,最后转账又失败的情况
最终一致性
基于上述CAP原理,市面上大部分的微服务应用系统都是满足AP,即放弃强一致性通过最终一致性从而保证高可用。针对最终一致性有如下三种模式。
可靠消息模式属于事件驱动架构,还是已上述下单模式为例,如下图

上述事件消费无法正确消费或者重复问题,所以可靠事件模型的特点在于保证可靠事件投递和避免重复消费。
避免重复消费:业务需要保证服务幂等性(唯一标识判断)
可靠事件投递: 需要满足
- 确保事件至少投递一次: mq框架的相关机制满足该特性
- 原子性的业务操作和事件发布,业务原子操作可以通过数据库的单机事务保证,事件发布原子操作可以使用ack确认机制确保。
可靠事件投递还可以通过本地事件表和外部事件表的形式进行处理。
本地事件表
本地消息事件表方法将事件和业务数据保存在同一个数据库中,使用一个额外的事件恢复服务来恢复事件,有本地事务保证更新业务和发布事件的原子性。
本地事件表的说明如下:
1、微服务在同一个本地事务中记录业务数据和事件
2、当微服务实时发布一个事件时,立即通知关联的业务服务,如果事件发布成功则立即删除记录的事件。
3、事件恢复服务定时从本地事务表中恢复未发布成功的事件,并重新发布,直到重新发布成功时才删除记录的事件。
其中操作 2 主要是为了增加发布事件的实时性,操作 3 是为了保证事件一定被发布。
外部事件表
外部事件表方法将事件持久化到外部的事件系统,事件系统需要提供实时事件消息服务以接收微服务发布的事件,同时事件系统还需要提供事件恢复服务来确认和恢复事件,如下图所示:
外部事件表的说明如下:
1、业务服务在事务提交前进行,通过实时事件服务向事件服务系统请求发送事件,事件系统只记录事件并不真正发送。
2、业务服务提交后,通过实时事件服务向事件系统确认事件需要被成功发送,确认“事件需要被成功发送”后事件系统才真正将事件发布到消息代理中。
3、业务服务在业务回滚时,通过实时事件服务取消事件系统中的事件。
4、如果业务服务在发送确认或取消之前停止了服务怎么办?事件系统的事件恢复服务会定期找到未确认发送的事件,并向业务服务查询事件状态,根据业务服务返回的状态决定事件是发布还是取消。
业务补偿
业务异常:业务逻辑产生错误的情况,如账户余额不足、商品库存不足等。
技术异常:非业务逻辑产生的异常,如网络连接异常、网络超时等。
补偿模式使用一个额外的协调服务(补偿框架)来协调各个需要保证一致性的微服务,协调服务按顺序调用各个服务,如果某个微服务调用异常(包括业务异常和技术异常),则取消之前所有已经调用成功的微服务。
还是以上述下单为例,所有业务补偿,其实是相关服务都需要失败后的回滚业务接口
比如 订单服务需要有创建订单接口外 还需要有取消订单接口(失败的回滚操作),库存和账户服务除了有扣减库存和扣减金额外还需要提供对应的恢复库存和恢复金额的操作。统一使用补偿框架进行成功提交或者失败回滚调用。

边栏推荐
- 先序遍历二叉树
- Monotone queue optimization DP example
- [pytorch pre training model modification, addition and deletion of specific layers]
- MySQL——安装
- Error code of text broadcast diagram
- 【L1、L2、smooth L1三类损失函数】
- Understanding of analog-to-digital conversion
- Dapr - what are the advantages of the microservice framework used by major manufacturers?
- 值得一看的共享文件实现方案
- Informatics Aosai yibentong 1260 [example 9.4] interceptor missile (noip1999) | Luogu p1020 [noip1999 popularization group] missile interception
猜你喜欢

Cultural and natural heritage day, mission airdrop

Post Microsoft build chatting about the new trend of Technology

vulnhub之hacksudo:Thor
![[actual combat] redis cluster (Part 2) - system version support](/img/dd/c4863d4c34ccc37dad9ed6d0b3413d.jpg)
[actual combat] redis cluster (Part 2) - system version support

RPC 实战与核心原理-进阶篇笔记

Opencv_ 100 questions_ Chapter I (1-5)

ACL 2022 | the latest hot research in NLP field, you must not miss it!

Decision engine system & real-time index calculation & risk situation awareness system & risk data list system & fraud intelligence system

How to customize ThreadPoolExecutor thread pool gracefully

【 YOLOv3中Loss部分计算】
随机推荐
【Pytorch的优化器总结归纳】
[MySQL] 数据库-视图
QT window, viewport, logical coordinates, physical coordinates
How to open an account for stocks? Is it safe to open an account online?
C language question brushing series (III)
OpenAI新博客:训练大模型的技术
redisson yml配置出错
Ssti (injection de gabarit) - (7)
flowable 三种方式部署流程
[actual combat] redis cluster (Part 1) - environment construction
Decision engine system & real-time index calculation & risk situation awareness system & risk data list system & fraud intelligence system
Code writing method of wechat applet search box
推特同意开放数据库供马斯克核查
MySQL——安装
先序遍历二叉树
阿里注册中心 Nacos 启动报错 Unable to start web server
C#封装FluentValidation,用了之后通篇还是AbstractValidator,真的看不下去了
[yolov5.yaml analysis]
汇编:代码示例
汇编:关于函数完整流程的栈桢解析
