当前位置:网站首页>最终一致性性分布式事务 TCC
最终一致性性分布式事务 TCC
2022-07-26 16:01:00 【InfoQ】
- 强一致性分布式事务解决方案:基于 CAP 理论
- 最终一致性分布式事务解决方案:基于 Base 理论
- TCC 解决方案
- 可靠消息最终一致性解决方案
- 最大努力通知型解决方案
TCC 解决方案

- try 阶段: 仅做业务的一致性检查和预留相应的资源。
- confirm 阶段: 当 try 阶段所有分支事务执行成功后开始执行 confirm 阶段。
- 默认一定成功。出错(异常):就要 重试或者人工处理,对出错的事务进行干预。
- cancel 阶段: 在业务执行异常或出现错误的情况下,需要回滚事务的操作,执行分支事务的取消操作,并且释放 try 阶段预留的资源。
- 默认一定成功。出错(异常):就要 重试或者人工处理,对出错的事务进行干预。
- 锁定资源的粒度变小: 提升系统的性能。
- 保证分布式事务执行后数据一致性: confirm 阶段 和 cancel 阶段需具备幂等性。
- 解决 XA 规范的单点故障问题: 主业务和分支业务都能集群部署。
- 耦合性: 代码需要耦合到具体业务。
- 开发成本: 业务方法都要拆分成 try、confirm 和 cancel 三个阶段。
TCC 需要注意的问题
(1)空回滚问题
- 全局事务 ID:生成全局事务记录,贯穿整个分布式事务的执行流程。
- 分支事务记录表:用于记录分支事务,将全局事务ID 和 分支事务 ID 保存到分支事务表中。
- 执行 cancel 阶段前,先读取分支事务表中的数据:
- 若存在 try 阶段插入的记录,则执行正常操作 - 回滚事务
- 若不存在,则为空回滚,不做任何操作
(2)悬挂问题
- try 阶段:先注册分支事务,再执行 RPC 调用
- 此时发生服务器宕机、应用崩溃或者网络异常等,RPC 调用超时
- 判定RPC 调用超时,就会回滚事务
- 这时,RPC 请求到了对应业务方,但此时事务已经回滚,try 阶段预留的资源就无法释放了
- 在执行 try 阶段的方法时,判断是否已有执行 confirm 或 cancel 阶段的记录
- 如果存在,则不再执行 try 阶段的方法
(3)幂等问题
- 增加事务的执行状态
- 每次执行分支事务以及 confirm 阶段 和 cancel 阶段的方法时,都查询此事务的执行状态
实际工作中 TCC 三种方案
(1)通用型
// 消息队列 + 事务消息public void doBusiness() { // 消息队列名称 String queueName = "queue"; // 消息内容:json 格式 String msg = "{}"; // 调用 MQ,预发送消息 String msgId = msgService.createPreMsg(queueName, msg); try { // 执行业务1 try(业务层面需要做好幂等、悬挂) // 执行业务2 try(业务层面需要做好幂等、悬挂) // 执行业务3 try(业务层面需要做好幂等、悬挂) } catch (Exception e) { // 回滚业务1 cancel(业务层面需要做好幂等、悬挂、空回滚) // 回滚业务2 cancel(业务层面需要做好幂等、悬挂、空回滚) // 回滚业务3 cancel(业务层面需要做好幂等、悬挂、空回滚) } RpcContext.getContext().asyncCall(() -> { // 执行业务1 commit(业务层面需要做好幂等、悬挂) // 执行业务2 commit(业务层面需要做好幂等、悬挂) // 执行业务3 commit(业务层面需要做好幂等、悬挂) msgService.confirmMsg(queueName, msgId); });}(2)异步确保型
- 优点:不要从业务服务进行配合改造,提供 try、confirm、cancel 3个接口
(3)补偿型
- Do 接口:执行接口,执行业务逻辑
- Compensate 接口:补偿接口

Hmily-TCC 实战
<!-- dubbo --><dependency> <groupId>org.dromara</groupId> <artifactId>hmily-spring-boot-starter-dubbo</artifactId> <version>${hmily.version}</version> <exclusions> <exclusion> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> </exclusion> </exclusions></dependency><!-- springcloud --><dependency> <groupId>org.dromara</groupId> <artifactId>hmily-spring-boot-starter-springcloud</artifactId> <version>${hmily.version}</version></dependency>// 1. try 阶段@Transactional(rollbackFor = Exception.class)@HmilyTCC(confirmMethod = "confirm", cancelMethod = "cancel")public void doBusiness() { String txNo; // 1. 幂等处理:若已处理过,则直接返回 // 2. 悬挂处理:若已处理过,则直接返回 // 3. 业务、RPC调用等}// 2. confirm 阶段@Transactional(rollbackFor = Exception.class)public void confirm(){ String txNo; // 幂等处理 // do something}// 3. cancel 阶段@Transactional(rollbackFor = Exception.class)public void cancel(){ String txNo; // 幂等处理 // do something}// 以 SpringCloud 中 Feign 为栗// 需要加上注解:@[email protected](value = "account-service")public interface AccountClient { @Hmily @RequestMapping("/account-service/account/payment") Boolean payment(@RequestBody AccountDTO accountDO);}实战:模拟下订单减库存
- 拉取代码,编译
$ git clone [email protected]:dromara/hmily.git$ cd hmily/$ mvn -DskipTests clean install -U复制代码- 构建项目,这里以 springcloud 为例,使用 hmily-demo-tcc-springcloud 工程

- 执行 MySQL 脚本
# 脚本在$ cd ./hmily-demo/sqlhmily-demo.sql- 修改项目配置
- 订单服务:修改 application.yml 和 hmily.yml 中 MySQL 连接配置
- 账号服务:修改 application.yml 和 hmily.yml 中 MySQL 连接配置
- 库存服务:修改 application.yml 和 hmily.yml 中 MySQL 连接配置
- 注册中心:使用的是 eureka,不用修改
- 启动服务,先启动 eureka,再相继启动其他服务

- 验证:访问 swagger 中 /order/orderPay 接口
- 浏览器访问:http://127.0.0.1:8090/swagger-ui.html
Hmily-TCC 源码浅析
(1)框架初始化阶段
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\org.dromara.hmily.spring.boot.starter.parent.configuration.HmilyAutoConfiguration@[email protected](proxyTargetClass = true)public class HmilyAutoConfiguration { // 1. 处理添加 @HmilyTCC 注解的切面入口 @Bean public SpringHmilyTransactionAspect hmilyTransactionAspect() { return new SpringHmilyTransactionAspect(); } // 2. 支持使用注解调用的 RPC 框架 @Bean @ConditionalOnProperty(value = "hmily.support.rpc.annotation", havingValue = "true") public BeanPostProcessor refererAnnotationBeanPostProcessor() { return new RefererAnnotationBeanPostProcessor(); } // 3. 框架启动初始化类 @Bean @Qualifier("hmilyTransactionBootstrap") @Primary public HmilyApplicationContextAware hmilyTransactionBootstrap() { return new HmilyApplicationContextAware(); }}(2)TCC 的 Try 阶段
// AbstractHmilyTransctionAspect是SpringHmilyTransactionAspect的父类。@Aspectpublic abstract class AbstractHmilyTransactionAspect { private final HmilyTransactionInterceptor interceptor = new HmilyGlobalInterceptor(); @Pointcut("@annotation(org.dromara.hmily.annotation.HmilyTCC) || @annotation(org.dromara.hmily.annotation.HmilyTAC) || @annotation(org.dromara.hmily.annotation.HmilyXA)") public void hmilyInterceptor() { } // 切面环绕执行 @Around("hmilyInterceptor()") public Object interceptTccMethod(final ProceedingJoinPoint proceedingJoinPoint) throws Throwable { return interceptor.invoke(proceedingJoinPoint); }}- select(context):根据 Hmily 事务上下文,获取事务处理器
- 首次执行,事务上下文为 null,事务处理器是 StarterHmilyTccTransactionHandler
- handleTransaction():执行 Hmily 事务
public class HmilyGlobalInterceptor implements HmilyTransactionInterceptor { // ... ... static { // 根据引入不同的 RPC 支持包,获取不同的 RPC 参数加载器 // ===== 重点 ===== // 因为使用的是 SpringCloud,所以获取的是 SpringCloudParameterLoader parameterLoader; } @Override public Object invoke(final ProceedingJoinPoint pjp) throws Throwable { HmilyTransactionContext context = parameterLoader.load(); return invokeWithinTransaction(context, pjp); } private Object invokeWithinTransaction(final HmilyTransactionContext context, final ProceedingJoinPoint point) throws Throwable { MethodSignature signature = (MethodSignature) point.getSignature(); // ===== 重点 ===== // 获取事务处理器,进行事务处理 // 首次执行,事务上下文为 null,事务处理器是 StarterHmilyTccTransactionHandler return getRegistry(signature.getMethod()).select(context) .handleTransaction(point, context); } // ... ...}public class StarterHmilyTccTransactionHandler implements HmilyTransactionHandler, AutoCloseable { @Override public Object handleTransaction(final ProceedingJoinPoint point, final HmilyTransactionContext context) throws Throwable { Object returnValue; try { // 0. 这块主要做:创建主事务、存储、构建分支事务,创建事务上下文等。 HmilyTransaction hmilyTransaction = executor.preTry(point); try { // 执行切面进入点的原始 try 方法,也就是上文提到的 makePayment 方法 returnValue = point.proceed(); // try 执行成功事务日志状态 hmilyTransaction.setStatus(HmilyActionEnum.TRYING.getCode()); executor.updateStartStatus(hmilyTransaction); } catch (Throwable throwable) { // 如果出现异常, 异步执行 cancel 方法 disruptor.getProvider().onData(() -> { executor.globalCancel(currentTransaction); }); throw throwable; } // try 方法执行成功,执行 confirm方法 disruptor.getProvider().onData(() -> { executor.globalConfirm(currentTransaction); }); } finally { // 清理资源与缓存 // 记录调用耗时时间 } return returnValue; }}- 默认 ThreadLocal
- TransimttableThreadLocal:阿里提供的跨线程 ThreadLocal 的实现
RPC 调用:Feign
@Configurationpublic class HmilyFeignConfiguration { // 1. 对 RPC 调用进行参数的传递 @Bean @Qualifier("hmilyFeignInterceptor") public RequestInterceptor hmilyFeignInterceptor() { return new HmilyFeignInterceptor(); } // 2. 对添加了 Hmily 注解的 Bean 实例进行代理 @Bean public HmilyFeignBeanPostProcessor feignPostProcessor() { return new HmilyFeignBeanPostProcessor(); } // 3. 处理 Hystrix 跨线程传递参数问题 @Bean @ConditionalOnProperty(name = "feign.hystrix.enabled") public HystrixConcurrencyStrategy hmilyHystrixConcurrencyStrategy() { return new HmilyHystrixConcurrencyStrategy(); }}public class HmilyFeignInterceptor implements RequestInterceptor { @Override public void apply(final RequestTemplate requestTemplate) { // 在 header 中设置事务上下文 RpcMediator.getInstance().transmit(requestTemplate::header, HmilyContextHolder.get()); }}public class HmilyFeignHandler implements InvocationHandler { @Override public Object invoke(final Object proxy, final Method method, final Object[] args) throws Throwable { if (Object.class.equals(method.getDeclaringClass())) { return method.invoke(this, args); } else { // 获取事务上下文 final HmilyTransactionContext context = HmilyContextHolder.get(); if (Objects.isNull(context)) { // 如果为空,则进行正常调用 return this.delegate.invoke(proxy, method, args); } final Hmily hmily = method.getAnnotation(Hmily.class); if (Objects.isNull(hmily)) { // 如果为空,则进行正常调用 return this.delegate.invoke(proxy, method, args); } try { // 构建参与者对象,进行缓存 // ... // 发起真正的调用 final Object invoke = delegate.invoke(proxy, method, args); // 如果调用成功,缓存参与者对象至发起者 if (context.getRole() == HmilyRoleEnum.PARTICIPANT.getCode()) { // ... } else { // ... } return invoke; } catch (Throwable e) { LOGGER.error("HmilyFeignHandler invoker exception :", e); throw e; } } }}(3)TCC 的 Confirm 阶段
- 使用 disrupto 队列异步执行executor.globalConfirm(currentTransaction);
public final class HmilyTccTransactionExecutor { public void globalConfirm(final HmilyTransaction currentTransaction) throws HmilyRuntimeException { // 更新事务状态为 confirm currentTransaction.setStatus(HmilyActionEnum.CONFIRMING.getCode()); HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction); // 从本地缓存里面获取所有的参与者对象 for (HmilyParticipant hmilyParticipant : hmilyParticipants) { try { // 如果参与者的角色是发起者 if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) { // 执行本地调用 } else { // 执行 RPC 调用 } successList.add(true); } catch (Throwable e) { //... } finally { HmilyContextHolder.remove(); } } if (successList.stream().allMatch(e -> e)) { // 如果每个参与者都执行成功,删除主事务 HmilyRepositoryStorage.removeHmilyTransaction(currentTransaction); } }}(4)TCC 的 Cancel 阶段
public final class HmilyTccTransactionExecutor { public void globalCancel(final HmilyTransaction currentTransaction) { // 更新事务日志状态为 cancel currentTransaction.setStatus(HmilyActionEnum.CANCELING.getCode()); HmilyRepositoryStorage.updateHmilyTransactionStatus(currentTransaction); for (HmilyParticipant hmilyParticipant : hmilyParticipants) { try { // 如果是发起者,执行本地调用 if (hmilyParticipant.getRole() == HmilyRoleEnum.START.getCode()) { } else { // 执行远端 RPC 调用 } } catch (Throwable e) { // ... ... } finally { HmilyContextHolder.remove(); } } }}(5)事务恢复
- 在执行 try 阶段方法时,服务宕机
- 执行 confirm 阶段方法时,有RPC 服务调用不成功
- 执行 cancel 阶段方法时,有RPC 服务调用不成功
- 默认 60s 执行一次
- 默认最大重试次数 10次,超过则将事务日志设置成 DEATH 状态,需要人工处理。
public class HmilyTransactionSelfRecoveryScheduled implements AutoCloseable { private void selfTccRecovery() { selfTccRecoveryExecutor.scheduleWithFixedDelay(() -> { try { // ... for (HmilyParticipant hmilyParticipant : hmilyParticipantList) { // 1. 判断是否超过最大重试次数 if (hmilyParticipant.getRetry() > hmilyConfig.getRetryMax()) { // 更新日志状态为 DEATH continue; } // 2. 如果事务处于 PRE_TRY 状态,即 try 还没执行,则无需处理 // 3. 锁事务日志:避免多个定时任务同时执行 // 若采用数据库来存储,则通过更新 version 字段来获取锁 final boolean successful = hmilyRepository.lockHmilyParticipant(hmilyParticipant); if (successful) { // 根据全局事务id 获取全局事务对象 HmilyTransaction globalHmilyTransaction; // 如果没有全局事务,证明事务流程已经完成 // 则根据自身的事务状态进行恢复 // 这种场景常见于 RPC 接口调用超时,但是自身执行又成功 if (Objects.isNull(globalHmilyTransaction)) { tccRecovery(hmilyParticipant.getStatus(), hmilyParticipant); } else { // 根据全局事务状态进行恢复 tccRecovery(globalHmilyTransaction.getStatus(), hmilyParticipant); } } } } catch (Exception e) { LOGGER.error("hmily scheduled transaction log is error:", e); } }, hmilyConfig.getScheduledInitDelay(), hmilyConfig.getScheduledRecoveryDelay(), TimeUnit.SECONDS); } private void tccRecovery(final int status, final HmilyParticipant hmilyParticipant) { // 如果事务状态是 TRYING 和 CANCELING,执行 cancel 阶段 if (status == HmilyActionEnum.TRYING.getCode() || status == HmilyActionEnum.CANCELING.getCode()) { hmilyTransactionRecoveryService.cancel(hmilyParticipant); } else if (status == HmilyActionEnum.CONFIRMING.getCode()) { // 反之,执行 confirm 阶段 hmilyTransactionRecoveryService.confirm(hmilyParticipant); } }}(6)事务日志存储
- HmilyTransaction:事务主体类,包含多个 HmilyParticipant, 对应 hmily_transaction_global 表
- HmilyParticipant:分支事务类,包含多个 HmilyInvocation,对应 hmily_transaction_participant 表
- HmilyInvocation:事务方法的参数列表实体类
- TCC 事务恢复单线程池:selfTccRecoveryExecutor
- TAC 事务恢复单线程池:selfTacRecoveryExecutor
- 事务日志清理线程池:cleanHmilyTransactionExecutor
- 物理删除线程池:phyDeletedExecutor
- HmilyRepositoryEventPublisher:进行初始化
- TCC 事务中,事务状态变化均通过这个
边栏推荐
- Modify the password of the root user of MySQL database
- Summary of key knowledge of C language
- 初识OpenGL (4)链接着色器
- Parker pump pv140r1k1t1pmmc
- We were tossed all night by a Kong performance bug
- Google Earth engine - merra-2 m2t1nxlv: 1980 present global pressure, temperature, wind and other data sets
- How to configure tke cluster node Max pod
- Some cutting-edge research work sharing of SAP ABAP NetWeaver containerization
- Operating system migration practice: deploying MySQL database on openeuler
- Pat grade a 1049 counting ones
猜你喜欢

2022年最新北京建筑安全员模拟题库及答案

Implementation of SAP ABAP daemon

2022年最新西藏建筑施工架子工(建筑特种作业)模拟考试试题及答案
![[RCTF2015]EasySQL](/img/68/328ee5cffc8b267b6b0f284eb8db2c.png)
[RCTF2015]EasySQL

Understand │ XSS attack, SQL injection, CSRF attack, DDoS attack, DNS hijacking

DELTA控制器RMC200

操作系统迁移实战之在openEuler上部署MySQL数据库

Botu PLC Sequential switch function block (SCL)
![[tool sharing] automatic generation of file directory structure tool mddir](/img/bc/1071c0dfb20d16f5fdde641092c1af.png)
[tool sharing] automatic generation of file directory structure tool mddir

互联网协议
随机推荐
“核弹级” Log4j 漏洞仍普遍存在,并造成持续影响
Mapwithstate of spark streaming state flow
Some cutting-edge research work sharing of SAP ABAP NetWeaver containerization
Comprehensively design an oppe homepage -- Design of star models
综合设计一个OPPE主页--明星机型的设计
Botu PLC Sequential switch function block (SCL)
博途PLC顺序开关机功能块(SCL)
Taishan Office Technology Lecture: the zoom ratio of word is slightly different from the display
bucher齿轮泵QX81-400R301
Happy 10th birthday, clojure
Clojure 运行原理之字节码生成篇
The difference between oncreate and onrestoreinstancestate recovery data of activity
FTP protocol
Clojure operation principle bytecode generation
Technology vane | interpretation of cloud native technology architecture maturity model
阿里云DMS MySQL云数据库建表报错,求解!!
辨析 Ruby 中的 Method 与 Proc
共议公共数据开放,“数牍方案”亮相数字中国建设峰会
《From SICP to Lisp》视频回播
We were tossed all night by a Kong performance bug