当前位置:网站首页>postgresql源码学习(十八)—— MVCC③-创建(获取)快照
postgresql源码学习(十八)—— MVCC③-创建(获取)快照
2022-06-11 02:48:00 【Hehuyi_In】
一、 GetTransactionSnapshot函数
GetTransactionSnapshot函数中,通过FirstSnapshotSet标志来判断当前要获得的是不是事务的第一个快照。如果是,则通过GetSnapshotData获得快照并将快照缓存。在已提交读隔离级别下,直接返回获得的快照;在可重复读及串行化隔离级别下,返回缓存的快照(就是前一篇介绍的源码实现方法)。
函数主要流程图如下:

Snapshot
GetTransactionSnapshot(void)
{
/*
* 如果是逻辑解码,返回对应的HistoricSnapshot
*/
if (HistoricSnapshotActive())
{
Assert(!FirstSnapshotSet);
return HistoricSnapshot;
}关于下面这段代码,先简单解释一下:
- FirstSnapshotSet初始值是false(尚无快照或者不是事务的第一个快照),if中再加!表示如果是事务的第一个快照就进到if内
- IsolationUsesXactSnapshot()的定义是隔离级别>=可重复读
- IsolationIsSerializable()的定义是隔离级别=串行化
// xact.h文件中
#define IsolationUsesXactSnapshot() (XactIsoLevel >= XACT_REPEATABLE_READ)
#define IsolationIsSerializable() (XactIsoLevel == XACT_SERIALIZABLE)继续看函数后面的代码
/* First call in transaction? 事务中第一次调用?*/
if (!FirstSnapshotSet)
{
/*
* Don't allow catalog snapshot to be older than xact snapshot. Must
* do this first to allow the empty-heap Assert to succeed. 不允许catalog快照比事务快照更旧,必须首先执行该函数以确保empty-heap验证是成功的
*/
InvalidateCatalogSnapshot(); //失效catalog快照
Assert(pairingheap_is_empty(&RegisteredSnapshots));
Assert(FirstXactSnapshot == NULL);
//如果在并行模式下,返回报错
if (IsInParallelMode())
elog(ERROR,
"cannot take query snapshot during a parallel operation");
/*
* 如果是可重复读或串行化隔离级别,则整个事务都使用同一个快照
*/
if (IsolationUsesXactSnapshot())
{
/* 串行化隔离级别除了获得快照,还需要初始化SSI所需的各种结构,因此它调用自己专有的函数 */
if (IsolationIsSerializable())
CurrentSnapshot = GetSerializableTransactionSnapshot(&CurrentSnapshotData);
// 可重复读隔离级别,调用GetSnapshotData函数获取当前快照
else
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
/* Make a saved copy,因为两者都是整个事务都使用同一个快照,这里将获得的快照复制一份,作为第一个快照FirstXactSnapshot */
CurrentSnapshot = CopySnapshot(CurrentSnapshot);
FirstXactSnapshot = CurrentSnapshot;
/* Mark it as "registered" in FirstXactSnapshot,在FirstXactSnapshot中标记该快照已注册*/
FirstXactSnapshot->regd_count++;
pairingheap_add(&RegisteredSnapshots, &FirstXactSnapshot->ph_node);
}
else // 如果是已提交读读隔离级别,调用GetSnapshotData函数获取当前快照
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
// 修改标记,表示是第一次获得的快照,下次事务再调用该函数,就不会进到这层if了
FirstSnapshotSet = true;
return CurrentSnapshot;
}
//如果不是事务中第一次调用(已经有第一个快照了)
//可重复读或串行化隔离级别,返回第一个快照的复制品
if (IsolationUsesXactSnapshot())
return CurrentSnapshot;
//已提交读隔离级别
/* 失效catalog快照 */
InvalidateCatalogSnapshot();
//再次调用GetSnapshotData函数获取当前快照,返回最新的快照
CurrentSnapshot = GetSnapshotData(&CurrentSnapshotData);
return CurrentSnapshot;
}二、 GetSnapshotData函数
GetSnapshotData函数主要的作用是确定快照的xmin,xmax,xip,其中xmin的确定较其他两个复杂一些,下面画了一个流程图。
在早期版本中,相关变量都保存在PGPROC结构体,因此PGPROC结构体非常大。由于获取快照需要遍历PGPROC数组(即所有进程),且获取快照是高频操作,因此不同进程会频繁遍历PGPROC,这样就容易产生cache miss。因此pg将一些变量从PGPROC中抽取出来,组成了PGXACT结构体。
即便如此,在高并发场景下,获取快照仍是pg的瓶颈点,因此pg 14又增加了一些新特性,对xmin和xid进行优化:
- 实现了一套GlobalVis*系列函数,判断元组是否可清理
- 在ProcGlobal(PROC_HDR结构体)对PGPROC中的xid做镜像,每个PGPROC都含有一个pgxactoff变量,用户可以通过ProcGlobal->xids[PGPROC->pgxactoff]来获得活跃事务id。这样,当前活跃的事务id都紧凑地保存在一个数组中,可以避免读取整个PGPROC而产生cache miss
- 原本在已提交读模式下,事务块中的每个命令都应重新获取快照,但如果两次快照间没有事务状态发生变化,则它们的快照应该是相同的,因此可以重用前一次的快照。
Snapshot
GetSnapshotData(Snapshot snapshot)
{
ProcArrayStruct *arrayP = procArray;
TransactionId *other_xids = ProcGlobal->xids;
TransactionId xmin;
TransactionId xmax;
int count = 0;
int subcount = 0;
bool suboverflowed = false;
FullTransactionId latest_completed;
TransactionId oldestxid;
int mypgxactoff;
TransactionId myxid;
uint64 curXactCompletionCount;
TransactionId replication_slot_xmin = InvalidTransactionId;
TransactionId replication_slot_catalog_xmin = InvalidTransactionId;
Assert(snapshot != NULL);
/*
* 如果snapshot->xip还为空,首先创建该数组
*/
if (snapshot->xip == NULL)
{
/*
* First call for this snapshot. Snapshot is same size whether or not
* we are in recovery, see later comments.
*/
snapshot->xip = (TransactionId *)
malloc(GetMaxSnapshotXidCount() * sizeof(TransactionId));
if (snapshot->xip == NULL) // 如果创建完还为空,说明由于OOM创建失败了
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
Assert(snapshot->subxip == NULL);
snapshot->subxip = (TransactionId *) // 子事务同理
malloc(GetMaxSnapshotSubxidCount() * sizeof(TransactionId));
if (snapshot->subxip == NULL)
ereport(ERROR,
(errcode(ERRCODE_OUT_OF_MEMORY),
errmsg("out of memory")));
}
/*
* It is sufficient to get shared lock on ProcArrayLock, even if we are
* going to set MyProc->xmin. 遍历ProcArrayLock数组,加共享锁即可
*/
LWLockAcquire(ProcArrayLock, LW_SHARED);
// 判断能否重用快照,如果可以,直接返回
if (GetSnapshotDataReuse(snapshot))
{
LWLockRelease(ProcArrayLock);
return snapshot;
}
//获取已提交事务的最大事务id
latest_completed = ShmemVariableCache->latestCompletedXid;
//获得当前事务id。虽然PGPROC中保存了xid,但为了降低cache miss,会从ProcGlobal->xids中获取xid(看函数最开始的变量定义)。MyProc->pgxactoff是PGPROC在xids数组中对应的下标
mypgxactoff = MyProc->pgxactoff;
myxid = other_xids[mypgxactoff];
Assert(myxid == MyProc->xid);
//最老的frozen xid
oldestxid = ShmemVariableCache->oldestXid;
//递增计数,作为判断能否重用快照的状态值(参考GetSnapshotDataReuse函数)
curXactCompletionCount = ShmemVariableCache->xactCompletionCount;
/* xmax is always latestCompletedXid + 1 */
xmax = XidFromFullTransactionId(latest_completed);
TransactionIdAdvance(xmax);
Assert(TransactionIdIsNormal(xmax));从下面开始,本质上就是一路比较哪个活跃事务ID最小,找到后赋值给xmin即可。NormalTransactionIdPrecedes函数实质就是一个比大小的函数,算法可参考pg事务篇(二)—— 事务ID回卷与事务冻结(freeze)_Hehuyi_In的博客-CSDN博客_datfrozenxid
/* compare two XIDs already known to be normal; this is a macro for speed */
//比较两个已知是常规事务的XIDs;宏定义是为了性能考虑.
#define NormalTransactionIdPrecedes(id1, id2) \
(AssertMacro(TransactionIdIsNormal(id1) && TransactionIdIsNormal(id2)), \
(int32) ((id1) - (id2)) < 0)主要流程如下:

/* initialize xmin calculation with xmax,初始化xmin=xmax,后面会遍历ProcGlobal->xids数组(即other_xids数组),获得真正的xmin */
xmin = xmax;
/* take own xid into account, saves a check inside the loop,优先考虑当前事务,避免循环查找。TransactionIdIsNormal排除系统保留的xid;TransactionIdPrecedes是一个比较大小的函数,xid越小的事务越旧,如果当前事务id更小,就将xmin设为当前事务id(snapshot->xmin是当前所有活跃事务中最小的事务xid) */
if (TransactionIdIsNormal(myxid) && NormalTransactionIdPrecedes(myxid, xmin))
xmin = myxid;
//是否在Recovery中,或者是否是从库
snapshot->takenDuringRecovery = RecoveryInProgress();
if (!snapshot->takenDuringRecovery)
{
int numProcs = arrayP->numProcs;
TransactionId *xip = snapshot->xip;
int *pgprocnos = arrayP->pgprocnos;
XidCacheStatus *subxidStates = ProcGlobal->subxidStates;
uint8 *allStatusFlags = ProcGlobal->statusFlags;
/*
* First collect set of pgxactoff/xids that need to be included in the
* snapshot.遍历other_xids数组获取真正的xmin */
for (int pgxactoff = 0; pgxactoff < numProcs; pgxactoff++)
{
/* Fetch xid just once - see GetNewTransactionId,获得事务id */
TransactionId xid = UINT32_ACCESS_ONCE(other_xids[pgxactoff]);
uint8 statusFlags;
Assert(allProcs[arrayP->pgprocnos[pgxactoff]].pgxactoff == pgxactoff);
…
/*
* If the XID is >= xmax, we can skip it; such transactions will
* be treated as running anyway (and any sub-XIDs will also be >=
* xmax). 还是比大小,若xid>=xmax,一定是活跃事务,直接跳过,因为xmin不会大于xmax
*/
if (!NormalTransactionIdPrecedes(xid, xmax))
continue;
/*
* Skip over backends doing logical decoding which manages xmin
* separately (check below) and ones running LAZY VACUUM.跳过逻辑解码和vacuum会话(PGPROC)中的xid
*/
statusFlags = allStatusFlags[pgxactoff];
if (statusFlags & (PROC_IN_LOGICAL_DECODING | PROC_IN_VACUUM))
continue;
//还是比大小,如果数组中的活跃事务xid<xmin,将xmin设为xid,一路循环直到找到最小的
if (NormalTransactionIdPrecedes(xid, xmin))
xmin = xid;
/* Add XID to snapshot.将事务id加入xip活跃事务列表 */
xip[count++] = xid;
//获得子事务活跃事务表,如果子事务过多,则设置溢出标记(略)
if (!suboverflowed)
{ …
}
}
}
else //Recovery中,或者是从库中的处理(略)
{
…
// 事务槽中保存了数据xmin和catalog xmin,防止备库所需的元组被回收
replication_slot_xmin = procArray->replication_slot_xmin;
replication_slot_catalog_xmin = procArray->replication_slot_catalog_xmin;
//设置第一次生成快照时的xmin
if (!TransactionIdIsValid(MyProc->xmin))
MyProc->xmin = TransactionXmin = xmin;
LWLockRelease(ProcArrayLock);
…
//一大堆GlobalVis*函数,判断元组是否可清理
}
RecentXmin = xmin;
Assert(TransactionIdPrecedesOrEquals(TransactionXmin, RecentXmin));
// 填充快照的各个变量
snapshot->xmin = xmin;
snapshot->xmax = xmax;
snapshot->xcnt = count;
snapshot->subxcnt = subcount;
snapshot->suboverflowed = suboverflowed;
snapshot->snapXactCompletionCount = curXactCompletionCount;
snapshot->curcid = GetCurrentCommandId(false);
/*
* This is a new snapshot, so set both refcounts are zero, and mark it as
* not copied in persistent memory.
*/
snapshot->active_count = 0;
snapshot->regd_count = 0;
snapshot->copied = false;
GetSnapshotDataInitOldSnapshot(snapshot);
return snapshot;
}参考
《PostgreSQL技术内幕:事务处理深度探索》第3章
边栏推荐
- sonarqube平台基础使用
- CPT 102_ LEC 16
- The two departments jointly issued the nine provisions on fire safety management of off campus training institutions
- Win10 安装Office 2016出现错误代码30204-44怎么处理?
- B_QuRT_User_Guide(17)
- pip 安装 qt5 。
- WinDbg-虚拟机-双机调试-驱动文件的调试
- Go 语言的优势和学习路线图
- 巴歇尔槽流量计远程采集物联网关在明渠流量监测的应用
- Go quick start of go language (I): the first go program
猜你喜欢

GraphAcademy 課程講解:《Neo4j 圖數據科學基礎》

org. apache. solr. common. SolrException:Could not load core configuration for core hotel

【大咖秀】博睿数据眼中的AIOps,选择正确的赛道正确的人

The Google search console webmaster tool cannot read the sitemap?

Unity项目优化详解(持续补充ing)

Go quick start of go language (I): the first go program

MySQL学习笔记:JSON嵌套数组查询

B_QuRT_User_Guide(17)

Chapter VII introduction to common protocols (1)
![[big guy show] aiops in the eyes of Borui data, choosing the right track and the right people](/img/a6/61d125326fc81532a56858c384460f.jpg)
[big guy show] aiops in the eyes of Borui data, choosing the right track and the right people
随机推荐
VMware虚拟机IP,网关设置。虚拟机ping不通外网
Problèmes de classe d'outils JDBC
怎样确保消息的可靠性投递?
C语言数组的简单认识
Why did those who left Beijing, Shanghai and Guangzhou with a smile cry in the end?
B_QuRT_User_Guide(17)
已解决: JDBC连接Mysql失败报错: 'The last packet sent successfully to the server was 0 milliseconds ago. '
Construction of Flink development environment and wordcount
[long time series prediction] aotoformer code detailed [3] model overall architecture analysis
数据库唯一索引和普通索引的区别?
B_ QuRT_ User_ Guide(20)
判断一串数字是否是快速排序某一次的结果
2022年熔化焊接与热切割操作证考试题库及答案
B_QuRT_User_Guide(16)
Forest v1.5.22 release! Kotlin support
Go quick start of go language (I): the first go program
VMware virtual machine IP, gateway settings. The virtual machine cannot be pinged to the Internet
The Google search console webmaster tool cannot read the sitemap?
OpenJudge NOI 1.13 17:文字排版
GraphAcademy 课程讲解:《Neo4j 图数据科学基础》