当前位置:网站首页>seata源码解析:事务状态及全局锁的存储
seata源码解析:事务状态及全局锁的存储
2022-08-05 09:01:00 【51CTO】

事务状态的存储
在seata中,无论你使用哪种事务模式,都会将全局事务状态和分支事务状态存储下来。有三种存储模式可供你选择,db,redis,file。
本文只会详细分析db这一种存储模式,其他的类似。
当使用db这一种模式时,seata server都会将全局事务的状态存在global_table表中,将分支事务的状态存在branch_table表中,如下所示
-- the table to store GlobalSession data
CREATE
TABLE IF
NOT EXISTS `global_table`
(
`xid`
VARCHAR
(
128
)
NOT
NULL
,
`transaction_id`
BIGINT
,
`status`
TINYINT
NOT
NULL
,
`application_id`
VARCHAR
(
32
)
,
`transaction_service_group`
VARCHAR
(
32
)
,
`transaction_name`
VARCHAR
(
128
)
,
`timeout`
INT
,
`begin_time`
BIGINT
,
`application_data`
VARCHAR
(
2000
)
,
`gmt_create`
DATETIME
,
`gmt_modified`
DATETIME
,
PRIMARY KEY
(`xid`
)
,
KEY `idx_gmt_modified_status`
(`gmt_modified`
, `status`
)
,
KEY `idx_transaction_id`
(`transaction_id`
)
) ENGINE
= InnoDB
DEFAULT CHARSET
= utf8
;
-- the table to store BranchSession data
CREATE
TABLE IF
NOT EXISTS `branch_table`
(
`branch_id`
BIGINT
NOT
NULL
,
`xid`
VARCHAR
(
128
)
NOT
NULL
,
`transaction_id`
BIGINT
,
`resource_group_id`
VARCHAR
(
32
)
,
`resource_id`
VARCHAR
(
256
)
,
`branch_type`
VARCHAR
(
8
)
,
`status`
TINYINT
,
`client_id`
VARCHAR
(
64
)
,
`application_data`
VARCHAR
(
2000
)
,
`gmt_create`
DATETIME
(
6
)
,
`gmt_modified`
DATETIME
(
6
)
,
PRIMARY KEY
(`branch_id`
)
,
KEY `idx_xid`
(`xid`
)
) ENGINE
= InnoDB
DEFAULT CHARSET
= utf8
;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
global_table表中status表示的含义为全局事务的状态,对应的枚举类为GlobalStatus
branch_table表中status表示的含义为分支事务的状态,对应的枚举类为BranchStatus
当事务发生问题时,我们就可以通过global_table和branch_table的status字段判断事务是哪一步执行失败了。
全局事务的状态表

分支事务的状态表

状态解释来自官网:http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html
源码分析
前面的文章我们说过当执行如下命令时,会调用Server.java中的main方法
public
class
Server {
public
static
void
main(
String[]
args)
throws
IOException {
// 省略部分代码...
// SessionHolder负责事务日志的持久化存储
// 设置存储模式,有三种可选类型,file,db,redis
SessionHolder.
init(
parameterParser.
getStoreMode());
// 创建事务协调器
DefaultCoordinator
coordinator
=
new
DefaultCoordinator(
nettyRemotingServer);
// 初始化5个定时任务
coordinator.
init();
// 省略部分代码...
System.
exit(
0);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
SessionHolder#init根据传入的存储模式初始化4个不同的SessionManager,SessionManager是用来存储全局事务和分支事务状态的。其中全局事务用GlobalSession来表示,分支事务用BranchSession来表示,一个GlobalSession包含多个BranchSession
// 保存了所有的GlobalSession
private
static
SessionManager
ROOT_SESSION_MANAGER;
// 需要异步commit的GlobalSession
private
static
SessionManager
ASYNC_COMMITTING_SESSION_MANAGER;
// 需要重试commit的GlobalSession
private
static
SessionManager
RETRY_COMMITTING_SESSION_MANAGER;
// 需要重试roollback的GlobalSession
private
static
SessionManager
RETRY_ROLLBACKING_SESSION_MANAGER;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
在init方法中通过SPI的方式实例化对应的类
// SessionHolder
public
static
void
init(
String
mode) {
if (
StringUtils.
isBlank(
mode)) {
mode
=
CONFIG.
getConfig(
ConfigurationKeys.
STORE_MODE);
}
StoreMode
storeMode
=
StoreMode.
get(
mode);
if (
StoreMode.
DB.
equals(
storeMode)) {
// 通过spi加载SessionManager
ROOT_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName());
ASYNC_COMMITTING_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName(),
new
Object[] {
ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName(),
new
Object[] {
RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER
=
EnhancedServiceLoader.
load(
SessionManager.
class,
StoreMode.
DB.
getName(),
new
Object[] {
RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
}
else
if (
StoreMode.
FILE.
equals(
storeMode)) {
// 省略其他存储方式的加载逻辑
}
// 删除已经完成的GlobalSession
reload(
storeMode);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DataBaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性
为什么要搞4个SessionManager?
其实就是用不同的SessionManager管理不同状态的任务,这样逻辑比较清晰。
当不同的SessionManager调用allSessions方法时,返回的就是对应状态的GlobalSession,逻辑比较清晰
DataBaseSessionManager#allSessions

我们来看一下SessionManager的继承关系

SessionLifecycleListener看接口名字就是基于观察者模式设计的,当GlobalSession状态发生改变的时候,会发布通知给监听者,然后监听者做相应动作。目前SessionLifecycleListener接口的实现类只有各种SessionManager,当收到状态改变的通知时,将其状态存储下来
AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法
public
abstract
class
AbstractSessionManager
implements
SessionManager,
SessionLifecycleListener {
protected
TransactionStoreManager
transactionStoreManager;
// 省略部分代码
// 重写了SessionManager接口方法
@Override
public
void
addGlobalSession(
GlobalSession
session)
throws
TransactionException {
if (
LOGGER.
isDebugEnabled()) {
LOGGER.
debug(
"MANAGER["
+
name
+
"] SESSION["
+
session
+
"] "
+
LogOperation.
GLOBAL_ADD);
}
writeSession(
LogOperation.
GLOBAL_ADD,
session);
}
// 重写了SessionManager接口方法
@Override
public
void
updateGlobalSessionStatus(
GlobalSession
session,
GlobalStatus
status)
throws
TransactionException {
if (
LOGGER.
isDebugEnabled()) {
LOGGER.
debug(
"MANAGER["
+
name
+
"] SESSION["
+
session
+
"] "
+
LogOperation.
GLOBAL_UPDATE);
}
writeSession(
LogOperation.
GLOBAL_UPDATE,
session);
}
// 重写了SessionLifecycleListener接口方法
@Override
public
void
onBegin(
GlobalSession
globalSession)
throws
TransactionException {
addGlobalSession(
globalSession);
}
// 重写了SessionLifecycleListener接口方法
@Override
public
void
onStatusChange(
GlobalSession
globalSession,
GlobalStatus
status)
throws
TransactionException {
updateGlobalSessionStatus(
globalSession,
status);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了
AbstractSessionManager有3个实现类,说明seata支持3种存储模式。而最终存储的工作是交给TransactionStoreManager来实现的

这些增加事务和事务状态变化的持久化操作非常简单,就是执行插入sql和更新sql
说回我们的启动流程,在DefaultCoordinator#init方法中,初始化5个定时任务
- retryRollbacking:分支事务回滚失败时,不断重试
- retryCommitting:分支事务提交失败时,不断重试
- asyncCommitting:执行异步commit,用在at模式,因为at模式的commit操作其实就是删除undolog,可以异步执行
- timeoutCheck:当事务处于开始状态,将状态设置为超时回滚,将其放入重试回滚管理器,让其回滚全局事务
- undoLogDelete:向rm端发送请求,删除7天(默认)之前的undolog

这些重试操作执行的逻辑和全局事务的提交/回滚逻辑一致,就不介绍了
// DefaultCoordinator
public
void
init() {
// 重试rollback定时任务
retryRollbacking.
scheduleAtFixedRate(()
-> {
boolean
lock
=
SessionHolder.
retryRollbackingLock();
if (
lock) {
try {
handleRetryRollbacking();
}
catch (
Exception
e) {
LOGGER.
info(
"Exception retry rollbacking ... ",
e);
}
finally {
SessionHolder.
unRetryRollbackingLock();
}
}
},
0,
ROLLBACKING_RETRY_PERIOD,
TimeUnit.
MILLISECONDS);
// 重试commit定时任务
retryCommitting.
scheduleAtFixedRate(()
-> {
boolean
lock
=
SessionHolder.
retryCommittingLock();
if (
lock) {
try {
handleRetryCommitting();
}
catch (
Exception
e) {
LOGGER.
info(
"Exception retry committing ... ",
e);
}
finally {
SessionHolder.
unRetryCommittingLock();
}
}
},
0,
COMMITTING_RETRY_PERIOD,
TimeUnit.
MILLISECONDS);
// 省略部分代码
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
全局锁
在前面的文章中我们说过,在Seata AT模式中,我们用全局锁来避免脏写,同时也可以用全局锁将默认的隔离级别从读未提交升高为读已提交。
全局锁是存在TC端的,所以在TC端要提供相应的接口,来进行加锁,解锁,相应的资源是否被加锁等操作!
当分支事务提交的时候,需要把修改的资源锁定。当全局事务提交后才会把相应的资源解锁。
各种事务的加锁状态会存在lock_table表中
-- the table to store lock data
CREATE
TABLE IF
NOT EXISTS `lock_table`
(
`row_key`
VARCHAR
(
128
)
NOT
NULL
,
-- resource_id+table_name+pk的组合
`xid`
VARCHAR
(
128
)
,
`transaction_id`
BIGINT
,
`branch_id`
BIGINT
NOT
NULL
,
`resource_id`
VARCHAR
(
256
)
,
`table_name`
VARCHAR
(
32
)
,
`pk`
VARCHAR
(
36
)
,
`gmt_create`
DATETIME
,
`gmt_modified`
DATETIME
,
PRIMARY KEY
(`row_key`
)
,
KEY `idx_branch_id`
(`branch_id`
)
) ENGINE
= InnoDB
DEFAULT CHARSET
= utf8
;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
用个例子演示一下全局锁的工作流程,
假如account_info表中有如下2个账户数据,其中id是主键
id | user_id |
1 | 1001 |
2 | 1002 |
假如说执行如下sql
当进行commit的时候,需要加全局锁,此时根据update语句构建出select语句,找出影响的主键,即1和2
通过主键构建出lockKey=account_info:1,2
lockKey的构建规则如下
如果主键是一列(id列),则形式如下
如果主键是多列(id列+user_id列),则形式如下
当然有可能一个事务中,有可能有多个表,多个表的构建规则如下(中间通过;分隔即可)
加锁
当加锁是会往lock_table中假如如下2个记录(省略无关的列)
row_key | xid |
jdbc:mysql://myhost:3306/db_account_1^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
jdbc:mysql://myhost:3306/db_account_2^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
row_key的构建规则如下
tcc模式下resourceId为@TwoPhaseBusinessAction注解的name属性
而在at和xa模式中resourceId都为数据库连接url
解锁
根据xid和row_key删除记录
查询是否能加锁
根据row_key从lock_table中查询记录,如果记录中的xid和查询传递过来的xid不一致则加锁失败,如果没有记录或者记录的xid和传递过来的xid一致,则加锁成功

源码分析
TC端定义的锁操作接口
public
interface
LockManager {
// 对分支事务的资源加锁
boolean
acquireLock(
BranchSession
branchSession)
throws
TransactionException;
// 对分支事务的资源解锁
boolean
releaseLock(
BranchSession
branchSession)
throws
TransactionException;
// 对全局事务中的所有分支事务的资源解锁
boolean
releaseGlobalSessionLock(
GlobalSession
globalSession)
throws
TransactionException;
// 根据xid resourceId lockKey 查询是否已经加锁
boolean
isLockable(
String
xid,
String
resourceId,
String
lockKey)
throws
TransactionException;
// 清除所有锁
void
cleanAllLocks()
throws
TransactionException;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
老规矩,我们还是只分析db这种存储模式,最终的加解锁操作会交给Locker

加锁
// AbstractLockManager
public
boolean
acquireLock(
BranchSession
branchSession)
throws
TransactionException {
if (
branchSession
==
null) {
throw
new
IllegalArgumentException(
"branchSession can't be null for memory/file locker.");
}
String
lockKey
=
branchSession.
getLockKey();
if (
StringUtils.
isNullOrEmpty(
lockKey)) {
// no lock
return
true;
}
// get locks of branch
// 创建 RowLock 集合,一条加锁记录对应一个RowLock对象
List
<
RowLock
>
locks
=
collectRowLocks(
branchSession);
if (
CollectionUtils.
isEmpty(
locks)) {
// no lock
return
true;
}
return
getLocker(
branchSession).
acquireLock(
locks);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
把需要加锁的资源转换成RowLock集合
protected
List
<
RowLock
>
collectRowLocks(
BranchSession
branchSession) {
List
<
RowLock
>
locks
=
new
ArrayList
<>();
if (
branchSession
==
null
||
StringUtils.
isBlank(
branchSession.
getLockKey())) {
return
locks;
}
String
xid
=
branchSession.
getXid();
// 得到资源id,也就是数据库连接url
String
resourceId
=
branchSession.
getResourceId();
long
transactionId
=
branchSession.
getTransactionId();
String
lockKey
=
branchSession.
getLockKey();
return
collectRowLocks(
lockKey,
resourceId,
xid,
transactionId,
branchSession.
getBranchId());
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
转换RowLock的过程,基本就是对lockKey的解析过程,
protected
List
<
RowLock
>
collectRowLocks(
String
lockKey,
String
resourceId,
String
xid,
Long
transactionId,
Long
branchID) {
List
<
RowLock
>
locks
=
new
ArrayList
<
RowLock
>();
// 对多个记录加锁,中间使用;分隔
String[]
tableGroupedLockKeys
=
lockKey.
split(
";");
for (
String
tableGroupedLockKey :
tableGroupedLockKeys) {
// 表名和记录主键值之间用:分隔
int
idx
=
tableGroupedLockKey.
indexOf(
":");
if (
idx
<
0) {
return
locks;
}
// 要加锁的表名
String
tableName
=
tableGroupedLockKey.
substring(
0,
idx);
// 加锁的记录主键值,如果需要一次加锁多条记录,记录之间用,分隔
String
mergedPKs
=
tableGroupedLockKey.
substring(
idx
+
1);
if (
StringUtils.
isBlank(
mergedPKs)) {
return
locks;
}
String[]
pks
=
mergedPKs.
split(
",");
if (
pks
==
null
||
pks.
length
==
0) {
return
locks;
}
// 一个主键创建一个RowLock对象
for (
String
pk :
pks) {
if (
StringUtils.
isNotBlank(
pk)) {
RowLock
rowLock
=
new
RowLock();
rowLock.
setXid(
xid);
rowLock.
setTransactionId(
transactionId);
rowLock.
setBranchId(
branchID);
rowLock.
setTableName(
tableName);
rowLock.
setPk(
pk);
rowLock.
setResourceId(
resourceId);
locks.
add(
rowLock);
}
}
}
return
locks;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
构造好RowLock集合,接下来就是调用DataBaseLocker执行真正的加锁操作
首先将RowLock转为LockDO
protected
List
<
LockDO
>
convertToLockDO(
List
<
RowLock
>
locks) {
List
<
LockDO
>
lockDOs
=
new
ArrayList
<>();
if (
CollectionUtils.
isEmpty(
locks)) {
return
lockDOs;
}
for (
RowLock
rowLock :
locks) {
LockDO
lockDO
=
new
LockDO();
lockDO.
setBranchId(
rowLock.
getBranchId());
lockDO.
setPk(
rowLock.
getPk());
lockDO.
setResourceId(
rowLock.
getResourceId());
// rowKey = resourceId + "^^^" + tableName + "^^^" + pk
lockDO.
setRowKey(
getRowKey(
rowLock.
getResourceId(),
rowLock.
getTableName(),
rowLock.
getPk()));
lockDO.
setXid(
rowLock.
getXid());
lockDO.
setTransactionId(
rowLock.
getTransactionId());
lockDO.
setTableName(
rowLock.
getTableName());
lockDOs.
add(
lockDO);
}
return
lockDOs;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
// LockStoreDataBaseDAO
public
boolean
acquireLock(
List
<
LockDO
>
lockDOs) {
Connection
conn
=
null;
PreparedStatement
ps
=
null;
ResultSet
rs
=
null;
Set
<
String
>
dbExistedRowKeys
=
new
HashSet
<>();
boolean
originalAutoCommit
=
true;
if (
lockDOs.
size()
>
1) {
// 过滤掉重复的加锁记录
lockDOs
=
lockDOs.
stream().
filter(
LambdaUtils.
distinctByKey(
LockDO::
getRowKey)).
collect(
Collectors.
toList());
}
try {
conn
=
lockStoreDataSource.
getConnection();
if (
originalAutoCommit
=
conn.
getAutoCommit()) {
conn.
setAutoCommit(
false);
}
//check lock
StringJoiner
sj
=
new
StringJoiner(
",");
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
sj.
add(
"?");
}
boolean
canLock
=
true;
//query
// select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
// from lock_table where row_key in ('?', '?', '?')
String
checkLockSQL
=
LockStoreSqlFactory.
getLogStoreSql(
dbType).
getCheckLockableSql(
lockTable,
sj.
toString());
ps
=
conn.
prepareStatement(
checkLockSQL);
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
ps.
setString(
i
+
1,
lockDOs.
get(
i).
getRowKey());
}
rs
=
ps.
executeQuery();
String
currentXID
=
lockDOs.
get(
0).
getXid();
while (
rs.
next()) {
String
dbXID
=
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_XID);
// 查出来的记录的row_key和当前的不一样,则说明被别的事务加锁了
if (
!
StringUtils.
equals(
dbXID,
currentXID)) {
if (
LOGGER.
isInfoEnabled()) {
String
dbPk
=
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_PK);
String
dbTableName
=
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_TABLE_NAME);
Long
dbBranchId
=
rs.
getLong(
ServerTableColumnsName.
LOCK_TABLE_BRANCH_ID);
LOGGER.
info(
"Global lock on [{}:{}] is holding by xid {} branchId {}",
dbTableName,
dbPk,
dbXID,
dbBranchId);
}
canLock
&=
false;
break;
}
// 已经被自己加锁的记录
dbExistedRowKeys.
add(
rs.
getString(
ServerTableColumnsName.
LOCK_TABLE_ROW_KEY));
}
// 有记录已经加过锁,回滚退出
if (
!
canLock) {
conn.
rollback();
return
false;
}
// 此次需要加锁的记录
List
<
LockDO
>
unrepeatedLockDOs
=
null;
if (
CollectionUtils.
isNotEmpty(
dbExistedRowKeys)) {
unrepeatedLockDOs
=
lockDOs.
stream().
filter(
lockDO
->
!
dbExistedRowKeys.
contains(
lockDO.
getRowKey()))
.
collect(
Collectors.
toList());
}
else {
unrepeatedLockDOs
=
lockDOs;
}
if (
CollectionUtils.
isEmpty(
unrepeatedLockDOs)) {
conn.
rollback();
return
true;
}
//lock
// 执行加锁操作
if (
unrepeatedLockDOs.
size()
==
1) {
LockDO
lockDO
=
unrepeatedLockDOs.
get(
0);
if (
!
doAcquireLock(
conn,
lockDO)) {
if (
LOGGER.
isInfoEnabled()) {
LOGGER.
info(
"Global lock acquire failed, xid {} branchId {} pk {}",
lockDO.
getXid(),
lockDO.
getBranchId(),
lockDO.
getPk());
}
conn.
rollback();
return
false;
}
}
else {
if (
!
doAcquireLocks(
conn,
unrepeatedLockDOs)) {
if (
LOGGER.
isInfoEnabled()) {
LOGGER.
info(
"Global lock batch acquire failed, xid {} branchId {} pks {}",
unrepeatedLockDOs.
get(
0).
getXid(),
unrepeatedLockDOs.
get(
0).
getBranchId(),
unrepeatedLockDOs.
stream().
map(
lockDO
->
lockDO.
getPk()).
collect(
Collectors.
toList()));
}
conn.
rollback();
return
false;
}
}
conn.
commit();
return
true;
}
catch (
SQLException
e) {
throw
new
StoreException(
e);
}
finally {
IOUtil.
close(
rs,
ps);
if (
conn
!=
null) {
try {
if (
originalAutoCommit) {
conn.
setAutoCommit(
true);
}
conn.
close();
}
catch (
SQLException
e) {
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
解锁
解锁的逻辑和加锁的逻辑差不多,直接分析最终执行的部分了
public
boolean
unLock(
List
<
LockDO
>
lockDOs) {
Connection
conn
=
null;
PreparedStatement
ps
=
null;
try {
conn
=
lockStoreDataSource.
getConnection();
conn.
setAutoCommit(
true);
StringJoiner
sj
=
new
StringJoiner(
",");
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
sj.
add(
"?");
}
//batch release lock
// delete from lock_table where xid = ? and row_key in (?, ?, ?)
String
batchDeleteSQL
=
LockStoreSqlFactory.
getLogStoreSql(
dbType).
getBatchDeleteLockSql(
lockTable,
sj.
toString());
ps
=
conn.
prepareStatement(
batchDeleteSQL);
ps.
setString(
1,
lockDOs.
get(
0).
getXid());
for (
int
i
=
0;
i
<
lockDOs.
size();
i
++) {
ps.
setString(
i
+
2,
lockDOs.
get(
i).
getRowKey());
}
ps.
executeUpdate();
}
catch (
SQLException
e) {
throw
new
StoreException(
e);
}
finally {
IOUtil.
close(
ps,
conn);
}
return
true;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
参考博客
事务状态逻辑
[1]http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html
边栏推荐
猜你喜欢

DataFrame insert row and column at specified position

ps怎么拼图,自学ps软件photoshop2022,PS制作拼图效果

php fails to write data to mysql

Assembly language (8) x86 inline assembly

DNS 查询原理详解

Comprehensively explain what is the essential difference between GET and POST requests?Turns out I always misunderstood

Code Audit - PHP

吴恩达深度学习deeplearning.ai——第一门课:神经网络与深度学习——第二节:神经网络基础(下)

ECCV 2022 Oral 视频实例分割新SOTA:SeqFormer&IDOL及CVPR 2022 视频实例分割竞赛冠军方案...

【ASM】字节码操作 方法的初始化 Frame
随机推荐
“充钱”也难治快手的“亏亏亏”?
【ASM】字节码操作 方法的初始化 Frame
Random code generation
Neuron Newsletter 2022-07|新增非 A11 驱动、即将支持 OPC DA
love is a sad song
CVPR 2022 | 将X光图片用于垃圾分割,港中大(深圳)探索大规模智能垃圾分类
Overall design and implementation of Kubernetes-based microservice project
苹果官网商店新上架Mophie系列Powerstation Pro、GaN充电头等产品
基于多块信息提取和马氏距离的k近邻故障监测
Going to book tickets tomorrow, ready to go home~~
How to replace colors in ps, self-study ps software photoshop2022, replace one color of a picture in ps with another color
Creo 9.0 基准特征:基准轴
Data source object management Druid and c3p0
NC20164 :最大数MAXNUMBER [线段树]
egg框架
The Secrets of the Six-Year Team Leader | The Eight Most Important Soft Skills of Programmers
请问大佬们 ,使用 Flink SQL CDC 是不是做不到两个数据库的实时同步啊
The difference between beautiful MM and ordinary MM
树状数组模版+例题
Pagoda measurement - building small and medium-sized homestay hotel management source code