当前位置:网站首页>seata源码解析:事务状态及全局锁的存储
seata源码解析:事务状态及全局锁的存储
2022-08-05 09:01:00 【51CTO】
事务状态的存储
在seata中,无论你使用哪种事务模式,都会将全局事务状态和分支事务状态存储下来。有三种存储模式可供你选择,db,redis,file。
本文只会详细分析db这一种存储模式,其他的类似。
当使用db这一种模式时,seata server都会将全局事务的状态存在global_table表中,将分支事务的状态存在branch_table表中,如下所示
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
`xid` VARCHAR ( 128 ) NOT NULL ,
`transaction_id` BIGINT ,
`status` TINYINT NOT NULL ,
`application_id` VARCHAR ( 32 ) ,
`transaction_service_group` VARCHAR ( 32 ) ,
`transaction_name` VARCHAR ( 128 ) ,
`timeout` INT ,
`begin_time` BIGINT ,
`application_data` VARCHAR ( 2000 ) ,
`gmt_create` DATETIME ,
`gmt_modified` DATETIME ,
PRIMARY KEY (`xid` ) ,
KEY `idx_gmt_modified_status` (`gmt_modified` , `status` ) ,
KEY `idx_transaction_id` (`transaction_id` )
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 ;
-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
`branch_id` BIGINT NOT NULL ,
`xid` VARCHAR ( 128 ) NOT NULL ,
`transaction_id` BIGINT ,
`resource_group_id` VARCHAR ( 32 ) ,
`resource_id` VARCHAR ( 256 ) ,
`branch_type` VARCHAR ( 8 ) ,
`status` TINYINT ,
`client_id` VARCHAR ( 64 ) ,
`application_data` VARCHAR ( 2000 ) ,
`gmt_create` DATETIME ( 6 ) ,
`gmt_modified` DATETIME ( 6 ) ,
PRIMARY KEY (`branch_id` ) ,
KEY `idx_xid` (`xid` )
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 ;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
global_table表中status表示的含义为全局事务的状态,对应的枚举类为GlobalStatus
branch_table表中status表示的含义为分支事务的状态,对应的枚举类为BranchStatus
当事务发生问题时,我们就可以通过global_table和branch_table的status字段判断事务是哪一步执行失败了。
全局事务的状态表
分支事务的状态表
状态解释来自官网:http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html
源码分析
前面的文章我们说过当执行如下命令时,会调用Server.java中的main方法
public class Server {
public static void main( String[] args) throws IOException {
// 省略部分代码...
// SessionHolder负责事务日志的持久化存储
// 设置存储模式,有三种可选类型,file,db,redis
SessionHolder. init( parameterParser. getStoreMode());
// 创建事务协调器
DefaultCoordinator coordinator = new DefaultCoordinator( nettyRemotingServer);
// 初始化5个定时任务
coordinator. init();
// 省略部分代码...
System. exit( 0);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
SessionHolder#init根据传入的存储模式初始化4个不同的SessionManager,SessionManager是用来存储全局事务和分支事务状态的。其中全局事务用GlobalSession来表示,分支事务用BranchSession来表示,一个GlobalSession包含多个BranchSession
// 保存了所有的GlobalSession
private static SessionManager ROOT_SESSION_MANAGER;
// 需要异步commit的GlobalSession
private static SessionManager ASYNC_COMMITTING_SESSION_MANAGER;
// 需要重试commit的GlobalSession
private static SessionManager RETRY_COMMITTING_SESSION_MANAGER;
// 需要重试roollback的GlobalSession
private static SessionManager RETRY_ROLLBACKING_SESSION_MANAGER;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
在init方法中通过SPI的方式实例化对应的类
// SessionHolder
public static void init( String mode) {
if ( StringUtils. isBlank( mode)) {
mode = CONFIG. getConfig( ConfigurationKeys. STORE_MODE);
}
StoreMode storeMode = StoreMode. get( mode);
if ( StoreMode. DB. equals( storeMode)) {
// 通过spi加载SessionManager
ROOT_SESSION_MANAGER = EnhancedServiceLoader. load( SessionManager. class, StoreMode. DB. getName());
ASYNC_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader. load( SessionManager. class, StoreMode. DB. getName(),
new Object[] { ASYNC_COMMITTING_SESSION_MANAGER_NAME});
RETRY_COMMITTING_SESSION_MANAGER = EnhancedServiceLoader. load( SessionManager. class, StoreMode. DB. getName(),
new Object[] { RETRY_COMMITTING_SESSION_MANAGER_NAME});
RETRY_ROLLBACKING_SESSION_MANAGER = EnhancedServiceLoader. load( SessionManager. class, StoreMode. DB. getName(),
new Object[] { RETRY_ROLLBACKING_SESSION_MANAGER_NAME});
} else if ( StoreMode. FILE. equals( storeMode)) {
// 省略其他存储方式的加载逻辑
}
// 删除已经完成的GlobalSession
reload( storeMode);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
4种类型的SessionManager都是同一个实例,只是调用的构造方法不同而已。以DataBaseSessionManager为例,ROOT_SESSION_MANAGER调用了无参数构造函数,而其他SessionManager传入了taskName属性
为什么要搞4个SessionManager?
其实就是用不同的SessionManager管理不同状态的任务,这样逻辑比较清晰。
当不同的SessionManager调用allSessions方法时,返回的就是对应状态的GlobalSession,逻辑比较清晰
DataBaseSessionManager#allSessions
我们来看一下SessionManager的继承关系
SessionLifecycleListener看接口名字就是基于观察者模式设计的,当GlobalSession状态发生改变的时候,会发布通知给监听者,然后监听者做相应动作。目前SessionLifecycleListener接口的实现类只有各种SessionManager,当收到状态改变的通知时,将其状态存储下来
AbstractSessionManager则是一个抽象类,当SessionLifecycleListener接口方法被回调时,调用SessionManager定义的动作方法
public abstract class AbstractSessionManager implements SessionManager, SessionLifecycleListener {
protected TransactionStoreManager transactionStoreManager;
// 省略部分代码
// 重写了SessionManager接口方法
@Override
public void addGlobalSession( GlobalSession session) throws TransactionException {
if ( LOGGER. isDebugEnabled()) {
LOGGER. debug( "MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation. GLOBAL_ADD);
}
writeSession( LogOperation. GLOBAL_ADD, session);
}
// 重写了SessionManager接口方法
@Override
public void updateGlobalSessionStatus( GlobalSession session, GlobalStatus status) throws TransactionException {
if ( LOGGER. isDebugEnabled()) {
LOGGER. debug( "MANAGER[" + name + "] SESSION[" + session + "] " + LogOperation. GLOBAL_UPDATE);
}
writeSession( LogOperation. GLOBAL_UPDATE, session);
}
// 重写了SessionLifecycleListener接口方法
@Override
public void onBegin( GlobalSession globalSession) throws TransactionException {
addGlobalSession( globalSession);
}
// 重写了SessionLifecycleListener接口方法
@Override
public void onStatusChange( GlobalSession globalSession, GlobalStatus status) throws TransactionException {
updateGlobalSessionStatus( globalSession, status);
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
可以看到AbstractSessionManager并没有实现SessionManager接口的方法,而是直接抛出异常,说明具体的存储逻辑交给子类来实现了
AbstractSessionManager有3个实现类,说明seata支持3种存储模式。而最终存储的工作是交给TransactionStoreManager来实现的
这些增加事务和事务状态变化的持久化操作非常简单,就是执行插入sql和更新sql
说回我们的启动流程,在DefaultCoordinator#init方法中,初始化5个定时任务
- retryRollbacking:分支事务回滚失败时,不断重试
- retryCommitting:分支事务提交失败时,不断重试
- asyncCommitting:执行异步commit,用在at模式,因为at模式的commit操作其实就是删除undolog,可以异步执行
- timeoutCheck:当事务处于开始状态,将状态设置为超时回滚,将其放入重试回滚管理器,让其回滚全局事务
- undoLogDelete:向rm端发送请求,删除7天(默认)之前的undolog
这些重试操作执行的逻辑和全局事务的提交/回滚逻辑一致,就不介绍了
// DefaultCoordinator
public void init() {
// 重试rollback定时任务
retryRollbacking. scheduleAtFixedRate(() -> {
boolean lock = SessionHolder. retryRollbackingLock();
if ( lock) {
try {
handleRetryRollbacking();
} catch ( Exception e) {
LOGGER. info( "Exception retry rollbacking ... ", e);
} finally {
SessionHolder. unRetryRollbackingLock();
}
}
}, 0, ROLLBACKING_RETRY_PERIOD, TimeUnit. MILLISECONDS);
// 重试commit定时任务
retryCommitting. scheduleAtFixedRate(() -> {
boolean lock = SessionHolder. retryCommittingLock();
if ( lock) {
try {
handleRetryCommitting();
} catch ( Exception e) {
LOGGER. info( "Exception retry committing ... ", e);
} finally {
SessionHolder. unRetryCommittingLock();
}
}
}, 0, COMMITTING_RETRY_PERIOD, TimeUnit. MILLISECONDS);
// 省略部分代码
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
全局锁
在前面的文章中我们说过,在Seata AT模式中,我们用全局锁来避免脏写,同时也可以用全局锁将默认的隔离级别从读未提交升高为读已提交。
全局锁是存在TC端的,所以在TC端要提供相应的接口,来进行加锁,解锁,相应的资源是否被加锁等操作!
当分支事务提交的时候,需要把修改的资源锁定。当全局事务提交后才会把相应的资源解锁。
各种事务的加锁状态会存在lock_table表中
-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
`row_key` VARCHAR ( 128 ) NOT NULL , -- resource_id+table_name+pk的组合
`xid` VARCHAR ( 128 ) ,
`transaction_id` BIGINT ,
`branch_id` BIGINT NOT NULL ,
`resource_id` VARCHAR ( 256 ) ,
`table_name` VARCHAR ( 32 ) ,
`pk` VARCHAR ( 36 ) ,
`gmt_create` DATETIME ,
`gmt_modified` DATETIME ,
PRIMARY KEY (`row_key` ) ,
KEY `idx_branch_id` (`branch_id` )
) ENGINE = InnoDB
DEFAULT CHARSET = utf8 ;
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
用个例子演示一下全局锁的工作流程,
假如account_info表中有如下2个账户数据,其中id是主键
id | user_id |
1 | 1001 |
2 | 1002 |
假如说执行如下sql
当进行commit的时候,需要加全局锁,此时根据update语句构建出select语句,找出影响的主键,即1和2
通过主键构建出lockKey=account_info:1,2
lockKey的构建规则如下
如果主键是一列(id列),则形式如下
如果主键是多列(id列+user_id列),则形式如下
当然有可能一个事务中,有可能有多个表,多个表的构建规则如下(中间通过;分隔即可)
加锁
当加锁是会往lock_table中假如如下2个记录(省略无关的列)
row_key | xid |
jdbc:mysql://myhost:3306/db_account_1^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
jdbc:mysql://myhost:3306/db_account_2^^^account_info^^^1 | 127.21.0.14:18091:6449339005964652705 |
row_key的构建规则如下
tcc模式下resourceId为@TwoPhaseBusinessAction注解的name属性
而在at和xa模式中resourceId都为数据库连接url
解锁
根据xid和row_key删除记录
查询是否能加锁
根据row_key从lock_table中查询记录,如果记录中的xid和查询传递过来的xid不一致则加锁失败,如果没有记录或者记录的xid和传递过来的xid一致,则加锁成功
源码分析
TC端定义的锁操作接口
public interface LockManager {
// 对分支事务的资源加锁
boolean acquireLock( BranchSession branchSession) throws TransactionException;
// 对分支事务的资源解锁
boolean releaseLock( BranchSession branchSession) throws TransactionException;
// 对全局事务中的所有分支事务的资源解锁
boolean releaseGlobalSessionLock( GlobalSession globalSession) throws TransactionException;
// 根据xid resourceId lockKey 查询是否已经加锁
boolean isLockable( String xid, String resourceId, String lockKey) throws TransactionException;
// 清除所有锁
void cleanAllLocks() throws TransactionException;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
老规矩,我们还是只分析db这种存储模式,最终的加解锁操作会交给Locker
加锁
// AbstractLockManager
public boolean acquireLock( BranchSession branchSession) throws TransactionException {
if ( branchSession == null) {
throw new IllegalArgumentException( "branchSession can't be null for memory/file locker.");
}
String lockKey = branchSession. getLockKey();
if ( StringUtils. isNullOrEmpty( lockKey)) {
// no lock
return true;
}
// get locks of branch
// 创建 RowLock 集合,一条加锁记录对应一个RowLock对象
List < RowLock > locks = collectRowLocks( branchSession);
if ( CollectionUtils. isEmpty( locks)) {
// no lock
return true;
}
return getLocker( branchSession). acquireLock( locks);
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
把需要加锁的资源转换成RowLock集合
protected List < RowLock > collectRowLocks( BranchSession branchSession) {
List < RowLock > locks = new ArrayList <>();
if ( branchSession == null || StringUtils. isBlank( branchSession. getLockKey())) {
return locks;
}
String xid = branchSession. getXid();
// 得到资源id,也就是数据库连接url
String resourceId = branchSession. getResourceId();
long transactionId = branchSession. getTransactionId();
String lockKey = branchSession. getLockKey();
return collectRowLocks( lockKey, resourceId, xid, transactionId, branchSession. getBranchId());
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
转换RowLock的过程,基本就是对lockKey的解析过程,
protected List < RowLock > collectRowLocks( String lockKey, String resourceId, String xid, Long transactionId,
Long branchID) {
List < RowLock > locks = new ArrayList < RowLock >();
// 对多个记录加锁,中间使用;分隔
String[] tableGroupedLockKeys = lockKey. split( ";");
for ( String tableGroupedLockKey : tableGroupedLockKeys) {
// 表名和记录主键值之间用:分隔
int idx = tableGroupedLockKey. indexOf( ":");
if ( idx < 0) {
return locks;
}
// 要加锁的表名
String tableName = tableGroupedLockKey. substring( 0, idx);
// 加锁的记录主键值,如果需要一次加锁多条记录,记录之间用,分隔
String mergedPKs = tableGroupedLockKey. substring( idx + 1);
if ( StringUtils. isBlank( mergedPKs)) {
return locks;
}
String[] pks = mergedPKs. split( ",");
if ( pks == null || pks. length == 0) {
return locks;
}
// 一个主键创建一个RowLock对象
for ( String pk : pks) {
if ( StringUtils. isNotBlank( pk)) {
RowLock rowLock = new RowLock();
rowLock. setXid( xid);
rowLock. setTransactionId( transactionId);
rowLock. setBranchId( branchID);
rowLock. setTableName( tableName);
rowLock. setPk( pk);
rowLock. setResourceId( resourceId);
locks. add( rowLock);
}
}
}
return locks;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
构造好RowLock集合,接下来就是调用DataBaseLocker执行真正的加锁操作
首先将RowLock转为LockDO
protected List < LockDO > convertToLockDO( List < RowLock > locks) {
List < LockDO > lockDOs = new ArrayList <>();
if ( CollectionUtils. isEmpty( locks)) {
return lockDOs;
}
for ( RowLock rowLock : locks) {
LockDO lockDO = new LockDO();
lockDO. setBranchId( rowLock. getBranchId());
lockDO. setPk( rowLock. getPk());
lockDO. setResourceId( rowLock. getResourceId());
// rowKey = resourceId + "^^^" + tableName + "^^^" + pk
lockDO. setRowKey( getRowKey( rowLock. getResourceId(), rowLock. getTableName(), rowLock. getPk()));
lockDO. setXid( rowLock. getXid());
lockDO. setTransactionId( rowLock. getTransactionId());
lockDO. setTableName( rowLock. getTableName());
lockDOs. add( lockDO);
}
return lockDOs;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
// LockStoreDataBaseDAO
public boolean acquireLock( List < LockDO > lockDOs) {
Connection conn = null;
PreparedStatement ps = null;
ResultSet rs = null;
Set < String > dbExistedRowKeys = new HashSet <>();
boolean originalAutoCommit = true;
if ( lockDOs. size() > 1) {
// 过滤掉重复的加锁记录
lockDOs = lockDOs. stream(). filter( LambdaUtils. distinctByKey( LockDO:: getRowKey)). collect( Collectors. toList());
}
try {
conn = lockStoreDataSource. getConnection();
if ( originalAutoCommit = conn. getAutoCommit()) {
conn. setAutoCommit( false);
}
//check lock
StringJoiner sj = new StringJoiner( ",");
for ( int i = 0; i < lockDOs. size(); i ++) {
sj. add( "?");
}
boolean canLock = true;
//query
// select xid, transaction_id, branch_id, resource_id, table_name, pk, row_key, gmt_create, gmt_modified
// from lock_table where row_key in ('?', '?', '?')
String checkLockSQL = LockStoreSqlFactory. getLogStoreSql( dbType). getCheckLockableSql( lockTable, sj. toString());
ps = conn. prepareStatement( checkLockSQL);
for ( int i = 0; i < lockDOs. size(); i ++) {
ps. setString( i + 1, lockDOs. get( i). getRowKey());
}
rs = ps. executeQuery();
String currentXID = lockDOs. get( 0). getXid();
while ( rs. next()) {
String dbXID = rs. getString( ServerTableColumnsName. LOCK_TABLE_XID);
// 查出来的记录的row_key和当前的不一样,则说明被别的事务加锁了
if ( ! StringUtils. equals( dbXID, currentXID)) {
if ( LOGGER. isInfoEnabled()) {
String dbPk = rs. getString( ServerTableColumnsName. LOCK_TABLE_PK);
String dbTableName = rs. getString( ServerTableColumnsName. LOCK_TABLE_TABLE_NAME);
Long dbBranchId = rs. getLong( ServerTableColumnsName. LOCK_TABLE_BRANCH_ID);
LOGGER. info( "Global lock on [{}:{}] is holding by xid {} branchId {}", dbTableName, dbPk, dbXID,
dbBranchId);
}
canLock &= false;
break;
}
// 已经被自己加锁的记录
dbExistedRowKeys. add( rs. getString( ServerTableColumnsName. LOCK_TABLE_ROW_KEY));
}
// 有记录已经加过锁,回滚退出
if ( ! canLock) {
conn. rollback();
return false;
}
// 此次需要加锁的记录
List < LockDO > unrepeatedLockDOs = null;
if ( CollectionUtils. isNotEmpty( dbExistedRowKeys)) {
unrepeatedLockDOs = lockDOs. stream(). filter( lockDO -> ! dbExistedRowKeys. contains( lockDO. getRowKey()))
. collect( Collectors. toList());
} else {
unrepeatedLockDOs = lockDOs;
}
if ( CollectionUtils. isEmpty( unrepeatedLockDOs)) {
conn. rollback();
return true;
}
//lock
// 执行加锁操作
if ( unrepeatedLockDOs. size() == 1) {
LockDO lockDO = unrepeatedLockDOs. get( 0);
if ( ! doAcquireLock( conn, lockDO)) {
if ( LOGGER. isInfoEnabled()) {
LOGGER. info( "Global lock acquire failed, xid {} branchId {} pk {}", lockDO. getXid(), lockDO. getBranchId(), lockDO. getPk());
}
conn. rollback();
return false;
}
} else {
if ( ! doAcquireLocks( conn, unrepeatedLockDOs)) {
if ( LOGGER. isInfoEnabled()) {
LOGGER. info( "Global lock batch acquire failed, xid {} branchId {} pks {}", unrepeatedLockDOs. get( 0). getXid(),
unrepeatedLockDOs. get( 0). getBranchId(), unrepeatedLockDOs. stream(). map( lockDO -> lockDO. getPk()). collect( Collectors. toList()));
}
conn. rollback();
return false;
}
}
conn. commit();
return true;
} catch ( SQLException e) {
throw new StoreException( e);
} finally {
IOUtil. close( rs, ps);
if ( conn != null) {
try {
if ( originalAutoCommit) {
conn. setAutoCommit( true);
}
conn. close();
} catch ( SQLException e) {
}
}
}
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
- 28.
- 29.
- 30.
- 31.
- 32.
- 33.
- 34.
- 35.
- 36.
- 37.
- 38.
- 39.
- 40.
- 41.
- 42.
- 43.
- 44.
- 45.
- 46.
- 47.
- 48.
- 49.
- 50.
- 51.
- 52.
- 53.
- 54.
- 55.
- 56.
- 57.
- 58.
- 59.
- 60.
- 61.
- 62.
- 63.
- 64.
- 65.
- 66.
- 67.
- 68.
- 69.
- 70.
- 71.
- 72.
- 73.
- 74.
- 75.
- 76.
- 77.
- 78.
- 79.
- 80.
- 81.
- 82.
- 83.
- 84.
- 85.
- 86.
- 87.
- 88.
- 89.
- 90.
- 91.
- 92.
- 93.
- 94.
- 95.
- 96.
- 97.
- 98.
- 99.
- 100.
- 101.
- 102.
- 103.
- 104.
- 105.
解锁
解锁的逻辑和加锁的逻辑差不多,直接分析最终执行的部分了
public boolean unLock( List < LockDO > lockDOs) {
Connection conn = null;
PreparedStatement ps = null;
try {
conn = lockStoreDataSource. getConnection();
conn. setAutoCommit( true);
StringJoiner sj = new StringJoiner( ",");
for ( int i = 0; i < lockDOs. size(); i ++) {
sj. add( "?");
}
//batch release lock
// delete from lock_table where xid = ? and row_key in (?, ?, ?)
String batchDeleteSQL = LockStoreSqlFactory. getLogStoreSql( dbType). getBatchDeleteLockSql( lockTable, sj. toString());
ps = conn. prepareStatement( batchDeleteSQL);
ps. setString( 1, lockDOs. get( 0). getXid());
for ( int i = 0; i < lockDOs. size(); i ++) {
ps. setString( i + 2, lockDOs. get( i). getRowKey());
}
ps. executeUpdate();
} catch ( SQLException e) {
throw new StoreException( e);
} finally {
IOUtil. close( ps, conn);
}
return true;
}
- 1.
- 2.
- 3.
- 4.
- 5.
- 6.
- 7.
- 8.
- 9.
- 10.
- 11.
- 12.
- 13.
- 14.
- 15.
- 16.
- 17.
- 18.
- 19.
- 20.
- 21.
- 22.
- 23.
- 24.
- 25.
- 26.
- 27.
参考博客
事务状态逻辑
[1]http://seata.io/zh-cn/docs/user/appendix/global-transaction-status.html
边栏推荐
猜你喜欢
只有一台交换机,如何实现主从自动切换之nqa
Chapter 12 贝叶斯网络
15.1.1、md—md的基础语法,快速的写文本备忘录
What is a good movie to watch on Qixi Festival?Crawl movie ratings and save to csv file
工程制图试题
pnpm 是凭什么对 npm 和 yarn 降维打击的
Overall design and implementation of Kubernetes-based microservice project
IT研发/开发流程规范效能的思考总结
复现一次循环和两次循环
Comprehensively explain what is the essential difference between GET and POST requests?Turns out I always misunderstood
随机推荐
基因数据平台
Controller-----controller
512-color chromatogram
七夕给自己new一个chatRobot当对象
DataFrame insert row and column at specified position
请问大佬们 ,使用 Flink SQL CDC 是不是做不到两个数据库的实时同步啊
tear apart loneliness
让程序员崩溃的N个瞬间(非程序员误入)
Data source object management Druid and c3p0
thinkPHP5 realizes clicks (data increment/decrement)
ECCV 2022 Oral 视频实例分割新SOTA:SeqFormer&IDOL及CVPR 2022 视频实例分割竞赛冠军方案...
Three solutions to solve cross-domain in egg framework
【Excel实战】--图表联动demo_001
周报2022-8-4
Luogu P1966: [NOIP2013 提高组] 火柴排队 [树状数组+逆序对]
Beautifully painted MM set
The difference between beautiful MM and ordinary MM
MQTT X Newsletter 2022-07 | 自动更新、MQTT X CLI 支持 MQTT 5.0、新增 conn 命令…
【 a daily topic 】 1403. The increasing order of the sequence, boy
基于 Kubernetes 的微服务项目整体设计与实现