seata源码解析:seata server各种消息处理流程
2022-08-04 19:23:00 【51CTO】
- 对应的channel是否注册过,没注册过直接关闭连接,否则到第二步
- 针对不同的消息交给DefaultCoordinator类的不同方法来处理,并返回结果
// DefaultCoordinator
protected void doGlobalBegin( GlobalBeginRequest request, GlobalBeginResponse response, RpcContext rpcContext)
throws TransactionException {
response. setXid( core. begin( rpcContext. getApplicationId(), rpcContext. getTransactionServiceGroup(),
request. getTransactionName(), request. getTimeout()));
if ( LOGGER. isInfoEnabled()) {
LOGGER. info( "Begin new global transaction applicationId: {},transactionServiceGroup: {}, transactionName: {},timeout:{},xid:{}",
rpcContext. getApplicationId(), rpcContext. getTransactionServiceGroup(), request. getTransactionName(), request. getTimeout(), response. getXid());
// DefaultCore
public String begin( String applicationId, String transactionServiceGroup, String name, int timeout)
throws TransactionException {
// 创建一个 GlobalSession
GlobalSession session = GlobalSession. createGlobalSession( applicationId, transactionServiceGroup, name,
MDC. put( RootContext. MDC_KEY_XID, session. getXid());
// 将 ROOT_SESSION_MANAGER 加入到这个 GlobalSession 的监听器列表中
session. addSessionLifecycleListener( SessionHolder. getRootSessionManager());
// 开启 GlobalSession
session. begin();
// 发布事件,如果你对这个事件感兴趣,可以注册这个事件
// transaction start event
eventBus. post( new GlobalTransactionEvent( session. getTransactionId(), GlobalTransactionEvent. ROLE_TC,
session. getTransactionName(), applicationId, transactionServiceGroup, session. getBeginTime(), null, session. getStatus()));
// 返回 xid
return session. getXid();
public Long branchRegister( BranchType branchType, String resourceId, String clientId, String xid,
String applicationData, String lockKeys) throws TransactionException {
// 根据 xid 从 SessionManager 中获取到 GlobalSession
GlobalSession globalSession = assertGlobalSessionNotNull( xid, false);
return SessionHolder. lockAndExecute( globalSession, () -> {
globalSessionStatusCheck( globalSession);
globalSession. addSessionLifecycleListener( SessionHolder. getRootSessionManager());
// 创建新的分支事务即 branchSession
BranchSession branchSession = SessionHelper. newBranchByGlobal( globalSession, branchType, resourceId,
applicationData, lockKeys, clientId);
MDC. put( RootContext. MDC_KEY_BRANCH_ID, String. valueOf( branchSession. getBranchId()));
// 对分支事务需要的资源加锁,加锁的逻辑在别的文章详解
branchSessionLock( globalSession, branchSession);
try {
// 将 branchSession 加到 globalSession 的属性中
globalSession. addBranch( branchSession);
} catch ( RuntimeException ex) {
branchSessionUnlock( branchSession);
throw new BranchTransactionException( FailedToAddBranch, String
. format( "Failed to store branch xid = %s branchId = %s", globalSession. getXid(),
branchSession. getBranchId()), ex);
if ( LOGGER. isInfoEnabled()) {
LOGGER. info( "Register branch successfully, xid = {}, branchId = {}, resourceId = {} ,lockKeys = {}",
globalSession. getXid(), branchSession. getBranchId(), resourceId, lockKeys);
return branchSession. getBranchId();
- 根据 xid 从 SessionManager 中获取到 GlobalSession
- 创建新的分支事务即 BranchSession
- 将 branchSession 加到 globalSession 的属性中,此时GlobalSession会发布分支事务注册事件,SessionManager 收到事件后会在 branch_table 中插入一条记录
// DefaultCore
public GlobalStatus commit( String xid) throws TransactionException {
// 根据xid找到全局事务对象GlobalSession
GlobalSession globalSession = SessionHolder. findGlobalSession( xid);
if ( globalSession == null) {
// 已经被commit过了,直接返回成功
return GlobalStatus. Finished;
// 添加监听器
globalSession. addSessionLifecycleListener( SessionHolder. getRootSessionManager());
// just lock changeStatus
boolean shouldCommit = SessionHolder. lockAndExecute( globalSession, () -> {
// Highlight: Firstly, close the session, then no more branch can be registered.
// 关闭 GlobalSession 防止再次有新的 BranchSession 注册进来
globalSession. closeAndClean();
if ( globalSession. getStatus() == GlobalStatus. Begin) {
// 判断是否可以异步提交
// 目前只有at模式可以异步提交,因为是通过undolog的方式去做的
if ( globalSession. canBeCommittedAsync()) {
globalSession. asyncCommit();
return false;
} else {
globalSession. changeStatus( GlobalStatus. Committing);
return true;
return false;
// 同步提交
// XA/TCC只能同步提交
if ( shouldCommit) {
boolean success = doGlobalCommit( globalSession, false);
//If successful and all remaining branches can be committed asynchronously, do async commit.
if ( success && globalSession. hasBranch() && globalSession. canBeCommittedAsync()) {
globalSession. asyncCommit();
return GlobalStatus. Committed;
} else {
return globalSession. getStatus();
} else {
// 异步提交
// 只有AT模式能异步提交
return globalSession. getStatus() == GlobalStatus. AsyncCommitting ? GlobalStatus. Committed : globalSession. getStatus();
boolean success = true;
// start committing event
// 发布事件
eventBus. post( new GlobalTransactionEvent( globalSession. getTransactionId(), GlobalTransactionEvent. ROLE_TC,
globalSession. getTransactionName(), globalSession. getApplicationId(), globalSession. getTransactionServiceGroup(),
globalSession. getBeginTime(), null, globalSession. getStatus()));
if ( globalSession. isSaga()) {
success = getCore( BranchType. SAGA). doGlobalCommit( globalSession, retrying);
} else {
// 取出所有的分支事务,然后提交
Boolean result = SessionHelper. forEach( globalSession. getSortedBranches(), branchSession -> {
// if not retrying, skip the canBeCommittedAsync branches
if ( ! retrying && branchSession. canBeCommittedAsync()) {
return CONTINUE;
BranchStatus currentStatus = branchSession. getStatus();
// 一阶段失败
if ( currentStatus == BranchStatus. PhaseOne_Failed) {
globalSession. removeBranch( branchSession);
return CONTINUE;
try {
BranchStatus branchStatus = getCore( branchSession. getBranchType()). branchCommit( globalSession, branchSession);
switch ( branchStatus) {
case PhaseTwo_Committed:
globalSession. removeBranch( branchSession);
return CONTINUE;
case PhaseTwo_CommitFailed_Unretryable:
if ( globalSession. canBeCommittedAsync()) {
LOGGER. error(
"Committing branch transaction[{}], status: PhaseTwo_CommitFailed_Unretryable, please check the business log.", branchSession. getBranchId());
return CONTINUE;
} else {
// 分支事务,不能异步提交,并且还不重试,全局事务执行失败
SessionHelper. endCommitFailed( globalSession);
LOGGER. error( "Committing global transaction[{}] finally failed, caused by branch transaction[{}] commit failed.", globalSession. getXid(), branchSession. getBranchId());
return false;
// 当前是否正在重试
// retrying=true,说明是从重试队列进来的任务,不用再往重试队列放了
if ( ! retrying) {
globalSession. queueToRetryCommit();
return false;
if ( globalSession. canBeCommittedAsync()) {
LOGGER. error( "Committing branch transaction[{}], status:{} and will retry later",
branchSession. getBranchId(), branchStatus);
return CONTINUE;
} else {
LOGGER. error(
"Committing global transaction[{}] failed, caused by branch transaction[{}] commit failed, will retry later.", globalSession. getXid(), branchSession. getBranchId());
return false;
} catch ( Exception ex) {
StackTraceLogger. error( LOGGER, ex, "Committing branch transaction exception: {}",
new String[] { branchSession. toString()});
if ( ! retrying) {
globalSession. queueToRetryCommit();
throw new TransactionException( ex);
return CONTINUE;
// Return if the result is not null
// result 不为null 则为 false
if ( result != null) {
return result;
//If has branch and not all remaining branches can be committed asynchronously,
//do print log and return false
// 有分支事务,并且不允许异步提交,说明失败了
if ( globalSession. hasBranch() && ! globalSession. canBeCommittedAsync()) {
LOGGER. info( "Committing global transaction is NOT done, xid = {}.", globalSession. getXid());
return false;
//If success and there is no branch, end the global transaction.
// 分支事务全部提交成功了
if ( success && globalSession. getBranchSessions(). isEmpty()) {
// 全局事务状态改为已提交
SessionHelper. endCommitted( globalSession);
// committed event
eventBus. post( new GlobalTransactionEvent( globalSession. getTransactionId(), GlobalTransactionEvent. ROLE_TC,
globalSession. getTransactionName(), globalSession. getApplicationId(), globalSession. getTransactionServiceGroup(),
globalSession. getBeginTime(), System. currentTimeMillis(), globalSession. getStatus()));
LOGGER. info( "Committing global transaction is successfully done, xid = {}.", globalSession. getXid());
return success;
