当前位置:网站首页>postgresql源码学习(23)—— 事务日志④-日志组装
postgresql源码学习(23)—— 事务日志④-日志组装
2022-06-29 15:05:00 【Hehuyi_In】
一、 日志组装简介
前篇最后提到,日志注册之后XLOG填充进度为(红色暂无数据、绿色已有数据):
XLogRecord+XLogRecordBlockHeader+RelFileNode+BlockNumber + mainrdata_len(XLogRecordDataHeaderShort 或 XLogRecordDataHeaderLong) +
xl_heap_header(block data) + 实际元组数据 + xl_heap_insert(main data)
日志组装函数XLogRecordAssemble需要负责填充红色部分,并将以上所有数据组装成XLogRecData链表,即完整的XLOG数据。

先不看源码,先看这个组装流程图,对照前面的日志格式。可以看到,它主要对registered_buffers数组中的数据进行二次加工,例如判是否需要做FPW、是否需要压缩页面等,然后对应填入日志头可选值。





代码核心流程图(只保留了组装的过程,去掉了前面一大截检查的部分)

二、 XLogRecordAssemble 函数源码学习
首先看这个函数的返回类型 XLogRecData,这就是上一节注册函数中提到的rdatas中数组(存放WAL日志数据)元素的类型。

/*
* Assemble a WAL record from the registered data and buffers into an
* XLogRecData chain, ready for insertion with XLogInsertRecord().
* 将registered_buffers数组中注册好的数据组装到XLogRecData链表,为后续XLogInsertRecord函数将日志插入到WAL buffer中做准备
*/
static XLogRecData *
XLogRecordAssemble(RmgrId rmid, uint8 info,
XLogRecPtr RedoRecPtr, bool doPageWrites,
XLogRecPtr *fpw_lsn, int *num_fpi)
{
XLogRecData *rdt; // XLogRecData指针
uint32 total_len = 0; // XLOG Record大小
int block_id; // 块id
pg_crc32c rdata_crc; // CRC
registered_buffer *prev_regbuf = NULL; // registered_buffer元素指针
XLogRecData *rdt_datas_last; // 尾指针
XLogRecord *rechdr; // 记录日志头部的临时缓存地址
char *scratch = hdr_scratch; // 记录头部的临时缓存地址
/*
* Note: this function can be called multiple times for the same record.
* All the modifications we do to the rdata chains below must handle that.
* 该函数可以对同一条记录多次调用,下面我们对rdata chains做的所有修改都必须处理这种情况
*/
/* The record begins with the fixed-size header,XLOG Record的头部大小是固定的*/
rechdr = (XLogRecord *) scratch;
/* 因此scratch指针可以直接跳过这一段 */
scratch += SizeOfXLogRecord;
/* 初始化头部的XLogRecordData数据,rdt_datas_last指向日志数据链尾部,hdr_rdt指向日志数据链的头部 8*/
hdr_rdt.next = NULL;
rdt_datas_last = &hdr_rdt;
hdr_rdt.data = hdr_scratch;
/*
* Enforce consistency checks for this record if user is looking for it.
* Do this before at the beginning of this routine to give the possibility
* for callers of XLogInsert() to pass XLR_CHECK_CONSISTENCY directly for
* a record.
* 如果用户当前正在搜索此记录,则强制对其进行一致性检查
* 在该处理过程开始前执行此项检查,以便XLogInsert()的调用者可以直接传递XLR_CHECK_CONSISTENCY给XLOG Record
*/
if (wal_consistency_checking[rmid])
info |= XLR_CHECK_CONSISTENCY;
/*
* 逐个处理XLogRegisterBuffer函数注册的各个Block(registerd_buffers数组元素).
*/
*fpw_lsn = InvalidXLogRecPtr;
for (block_id = 0; block_id < max_registered_block_id; block_id++)
{
registered_buffer *regbuf = ®istered_buffers[block_id];
bool needs_backup; // 是否做FPW
bool needs_data;
XLogRecordBlockHeader bkpb; // 通用的Block的Header信息
XLogRecordBlockImageHeader bimg; // 如果做FPW,则需要这个Header信息
XLogRecordBlockCompressHeader cbimg = {0}; // 做FPW且需要页面压缩
bool samerel; // 日志记录的前一个页面是不是和本日志记录是同一个表的
bool is_compressed = false; // 页面是否已经压缩
bool include_image; // 也是FPW的一个标志
if (!regbuf->in_use) // XLogRegisterBuffer注册时会设置这个变量,如果该页没有被注册,直接跳到数组的下一个页
continue;
/* Determine if this block needs to be backed up,是否需要做FPW,优先根据flag信息判断,否则根据GUC参数和是否处于backup状态判断,最终根据LSN判断 */
if (regbuf->flags & REGBUF_FORCE_IMAGE)
needs_backup = true;
else if (regbuf->flags & REGBUF_NO_IMAGE)
needs_backup = false;
else if (!doPageWrites)
needs_backup = false;
else
{
/*
* We assume page LSN is first data on *every* page that can be
* passed to XLogInsert, whether it has the standard page layout
* or not.
*/
XLogRecPtr page_lsn = PageGetLSN(regbuf->page);
needs_backup = (page_lsn <= RedoRecPtr);
if (!needs_backup)
{
if (*fpw_lsn == InvalidXLogRecPtr || page_lsn < *fpw_lsn)
*fpw_lsn = page_lsn;
}
}
/* Determine if the buffer data needs to included,是否保存页面数据*/
if (regbuf->rdata_len == 0) // 页面没有数据
needs_data = false;
else if ((regbuf->flags & REGBUF_KEEP_DATA) != 0) // 页面明确指出了需要保存数据
needs_data = true;
else // 如果没有指定,则根据是否做FPW来决定是否保存数据
needs_data = !needs_backup;
//组装XLogRecordBlockHeader
bkpb.id = block_id;
bkpb.fork_flags = regbuf->forkno;
bkpb.data_length = 0;
if ((regbuf->flags & REGBUF_WILL_INIT) == REGBUF_WILL_INIT)
bkpb.fork_flags |= BKPBLOCK_WILL_INIT;
/*
* If needs_backup is true or WAL checking is enabled for current
* resource manager, log a full-page write for the current block.
* 如果要做FPW,则需要保存页面的备份,如果在回放时要检查日志的一致性,则需要做页面的备份
*/
include_image = needs_backup || (info & XLR_CHECK_CONSISTENCY) != 0;
if (include_image)
{
Page page = regbuf->page;
uint16 compressed_len = 0;
/*
* The page needs to be backed up, so calculate its hole length
* and offset.标准页面中在pd_lower和pd_upper之间会有一个空洞,这部分没有数据,可以考虑裁剪掉,提高存储空间的利用率
*/
if (regbuf->flags & REGBUF_STANDARD)
{
/* Assume we can omit data between pd_lower and pd_upper,如果有空洞,则记录空洞的位置及长度*/
uint16 lower = ((PageHeader) page)->pd_lower;
uint16 upper = ((PageHeader) page)->pd_upper;
if (lower >= SizeOfPageHeaderData &&
upper > lower &&
upper <= BLCKSZ)
{
bimg.hole_offset = lower;
cbimg.hole_length = upper - lower;
}
else
{
/* No "hole" to remove,没有空洞 */
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
}
else
{
/* Not a standard page header, don't try to eliminate "hole",不是标准的页头,不尝试估算空洞 */
bimg.hole_offset = 0;
cbimg.hole_length = 0;
}
/*
* Try to compress a block image if wal_compression is enabled,如果开启了wal_compression参数,则会对保存进日志记录的数据页面进行压缩
*/
if (wal_compression)
{
is_compressed =
XLogCompressBackupBlock(page, bimg.hole_offset,
cbimg.hole_length,
regbuf->compressed_page,
&compressed_len);
}
/*
* Fill in the remaining fields in the XLogRecordBlockHeader struct
*/
bkpb.fork_flags |= BKPBLOCK_HAS_IMAGE;
/* Report a full page image constructed for the WAL record */
*num_fpi += 1;
/*
* Construct XLogRecData entries for the page content.当前阶段已经不适宜再调用Register系列函数,所以开始使用registered_buffer里面的临时槽位
*/
rdt_datas_last->next = ®buf->bkp_rdatas[0];
rdt_datas_last = rdt_datas_last->next;
bimg.bimg_info = (cbimg.hole_length == 0) ? 0 : BKPIMAGE_HAS_HOLE;
/*
* If WAL consistency checking is enabled for the resource manager
* of this WAL record, a full-page image is included in the record
* for the block modified. During redo, the full-page is replayed
* only if BKPIMAGE_APPLY is set. 如WAL一致性检查已启用,被更新的block已在XLOG Record中包含了FPI.在redo期间,在设置了BKPIMAGE_APPLY标记的情况下full-page才会回放
*/
if (needs_backup)
bimg.bimg_info |= BKPIMAGE_APPLY;
if (is_compressed) // 情况1:如果是压缩页面,则空洞信息已经包含在其中
{
bimg.length = compressed_len;
bimg.bimg_info |= BKPIMAGE_IS_COMPRESSED;
rdt_datas_last->data = regbuf->compressed_page;
rdt_datas_last->len = compressed_len;
}
else
{
bimg.length = BLCKSZ - cbimg.hole_length;
if (cbimg.hole_length == 0) // 情况2:如果空洞长度是0,则直接记录整个页面
{
rdt_datas_last->data = page;
rdt_datas_last->len = BLCKSZ;
}
else // 情况3:如果未压缩且有空洞,则需要借用registered_buffer里面的两个槽位
{
/* must skip the hole */
rdt_datas_last->data = page;
rdt_datas_last->len = bimg.hole_offset;
rdt_datas_last->next = ®buf->bkp_rdatas[1];
rdt_datas_last = rdt_datas_last->next;
rdt_datas_last->data =
page + (bimg.hole_offset + cbimg.hole_length);
rdt_datas_last->len =
BLCKSZ - (bimg.hole_offset + cbimg.hole_length);
}
}
total_len += bimg.length;
}情况2

情况3

// 把XLogRegisterBufData注册到registered_buffer中的数据链接进数组中
// 此操作通常和FPW相反
// 因为FPW会记录整个页面,所以如果做了FPW通常不会记录日志修改的数据信息
// 但在有些情况下除外,例如逻辑日志解析可能需要数据信息
if (needs_data)
{
/*
* Link the caller-supplied rdata chain for this buffer to the
* overall list.
*/
bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
bkpb.data_length = regbuf->rdata_len;
total_len += regbuf->rdata_len;
rdt_datas_last->next = regbuf->rdata_head;
rdt_datas_last = regbuf->rdata_tail;
}
// 如果连续的两个日志都是同一个表中的日志记录,则可以省略一个filenode的空间
// 这里做个标记,下面会根据这个标记做对应的操作
if (prev_regbuf && RelFileNodeEquals(regbuf->rnode, prev_regbuf->rnode))
{
samerel = true;
bkpb.fork_flags |= BKPBLOCK_SAME_REL;
}
else
samerel = false;
prev_regbuf = regbuf;下面是核心部分,真正开始组装,前面大都是准备工作
/* Ok, copy the header to the scratch buffer,正式组装,复制多个Block相关的Header到hdr_scratch */
// 1. 复制XLogRecordBlockHeader信息
memcpy(scratch, &bkpb, SizeOfXLogRecordBlockHeader);
scratch += SizeOfXLogRecordBlockHeader;
if (include_image)
{
// 2. 复制XLogRecordBlockImageHeader信息
memcpy(scratch, &bimg, SizeOfXLogRecordBlockImageHeader);
scratch += SizeOfXLogRecordBlockImageHeader;
if (cbimg.hole_length != 0 && is_compressed)
{
// 3. 复制XLogRecordBlockCompressHeader信息
memcpy(scratch, &cbimg,
SizeOfXLogRecordBlockCompressHeader);
scratch += SizeOfXLogRecordBlockCompressHeader;
}
}
if (!samerel) // 是否可以节省一个filenode空间
{
// 4. 复制RelFileNode,这个数据是从之前注册的regbuf->rnode中获取的
memcpy(scratch, ®buf->rnode, sizeof(RelFileNode));
scratch += sizeof(RelFileNode);
}
// 5. 复制BlockNumber,这部分是必有的
memcpy(scratch, ®buf->block, sizeof(BlockNumber));
scratch += sizeof(BlockNumber);
}
/* followed by the record's origin, if any */
if ((curinsert_flags & XLOG_INCLUDE_ORIGIN) &&
replorigin_session_origin != InvalidRepOriginId)
{
*(scratch++) = (char) XLR_BLOCK_ID_ORIGIN;
memcpy(scratch, &replorigin_session_origin, sizeof(replorigin_session_origin));
scratch += sizeof(replorigin_session_origin);
}
/* followed by toplevel XID, if not already included in previous record */
if (IsSubTransactionAssignmentPending())
{
TransactionId xid = GetTopTransactionIdIfAny();
/* update the flag (later used by XLogResetInsertion) */
XLogSetRecordFlags(XLOG_INCLUDE_XID);
*(scratch++) = (char) XLR_BLOCK_ID_TOPLEVEL_XID;
memcpy(scratch, &xid, sizeof(TransactionId));
scratch += sizeof(TransactionId);
}
/* followed by main data, if any,记录maindata的长度,实际上maindata的主要内容会保存在hdr_rdt对应的数据链中*/
// 6. 复制mainrdata_len(short或long)
if (mainrdata_len > 0)
{
if (mainrdata_len > 255)
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_LONG; // 用4字节表示长度
memcpy(scratch, &mainrdata_len, sizeof(uint32));
scratch += sizeof(uint32);
}
else
{
*(scratch++) = (char) XLR_BLOCK_ID_DATA_SHORT; // 用1字节表示长度
*(scratch++) = (uint8) mainrdata_len;
}
rdt_datas_last->next = mainrdata_head;
rdt_datas_last = mainrdata_last;
total_len += mainrdata_len;
}
rdt_datas_last->next = NULL;
hdr_rdt.len = (scratch - hdr_scratch);
total_len += hdr_rdt.len;数据CRC校验
/*
* Calculate CRC of the data
*
* Note that the record header isn't added into the CRC initially since we
* don't know the prev-link yet. Thus, the CRC will represent the CRC of
* the whole record in the order: rdata, then backup blocks, then record
* header.
*/
INIT_CRC32C(rdata_crc);
COMP_CRC32C(rdata_crc, hdr_scratch + SizeOfXLogRecord, hdr_rdt.len - SizeOfXLogRecord);
for (rdt = hdr_rdt.next; rdt != NULL; rdt = rdt->next)
COMP_CRC32C(rdata_crc, rdt->data, rdt->len);
/*
* Fill in the fields in the record header. Prev-link is filled in later,
* once we know where in the WAL the record will be inserted. The CRC does
* not include the record header yet.
*/
rechdr->xl_xid = GetCurrentTransactionIdIfAny();
rechdr->xl_tot_len = total_len;
rechdr->xl_info = info;
rechdr->xl_rmid = rmid;
rechdr->xl_prev = InvalidXLogRecPtr;
rechdr->xl_crc = rdata_crc;
return &hdr_rdt;
}
三、 如何将XLOG各部分串链
XLogRecordAssemble函数最终返回hdr_rdt,所以我们需要观察函数是如何操作hdr_rdt的。
1. 初始化
hdr_rdt将作为链表的链头,所以这里使用rdt_datas_last指针指向链头。
hdr_rdt.next = NULL; //初始化next的指针
rdt_datas_last = &hdr_rdt; //指向链头2. XLOG头加入链表
当前hdr_rdt为链头,所以直接将XLOG头的buffer赋值给data,构建好XLOG头之后,再计算XLOG头的长度。
hdr_rdt.data = hdr_scratch;
//中间代码省略
hdr_rdt.len = (scratch - hdr_scratch);3. xl_heap_header、元组具体数据加入链表
在注册阶段,我们知道xl_heap_header和元组具体数据都存放在regbuf的XLogRecData链表中,并且xl_heap_header在前、元组具体数据在后(xl_heap_header先注册)。所以直接将regbuf的XLogRecData链表头,添加到hdr_rdt中即可。
if (needs_data)
{
/*
* Link the caller-supplied rdata chain for this buffer to the
* overall list.
*/
bkpb.fork_flags |= BKPBLOCK_HAS_DATA;
bkpb.data_length = regbuf->rdata_len;
total_len += regbuf->rdata_len;
//串链
rdt_datas_last->next = regbuf->rdata_head;
rdt_datas_last = regbuf->rdata_tail;
}4. xl_heap_insert加入链表
在组装mainrdata_len部分
rdt_datas_last->next = mainrdata_head;
rdt_datas_last = mainrdata_last;
total_len += mainrdata_len;参考
PostgreSQL数据库WAL——日志合成XLogRecordAssemble_肥叔菌的博客-CSDN博客
PostgreSQL预写式日志的内核实现详解-wal记录写入 - 知乎
https://www.geek-share.com/detail/2799289354.html
PostgreSQL Source Code: XLogRecord Struct Reference
Postgresql源碼(21)update生成XLOG過程&內容解析
PostgreSQL Source Code: XLogRecord Struct Reference
https://www.jianshu.com/p/2c6c29a01eda
边栏推荐
- LeetCode笔记:Biweekly Contest 81
- File常用工具类, 流相关运用 (记录)
- MCS:多元随机变量——离散随机变量
- Is it reliable to invest in REITs funds? Is REITs funds safe
- Lumiprobe deoxyribonucleic acid phosphate CpG 1000 solid carrier
- Ink drop typesetting
- Sofaregistry source code | data synchronization module analysis
- CKS CKA ckad change terminal to remote desktop
- 使用自定义注解实现Redis分布式锁
- 西北工业大学遭境外电邮攻击
猜你喜欢

Lumiprobe 点击化学丨非荧光炔烃:己酸STP酯

curl: (56) Recv failure: Connection reset by peer

Basic use of text preprocessing library Spacy (quick start)

Solution to the problem that the assembly drawing cannot be recognized after the storage position of SolidWorks part drawing is changed

SOFARegistry 源码|数据同步模块解析

Unity C # basic review 26 - first acquaintance Commission (p447)

Take another picture of cloud redis' improvement path

Northwestern Polytechnic University attacked by overseas e-mail

Real software testers = "half product + Half development"?

Lumiprobe reactive dye carboxylic acid: sulfo cyanine7.5 carboxylic acid
随机推荐
Unity C# 基础复习29——泛型委托(P451)
Northwestern Polytechnic University attacked by overseas e-mail
MySQL 数据库命名规范.PDF
Unity C# 基础复习26——初识委托(P447)
中序和后序遍历构建二叉树[递归划分区间与回溯拼接子树+中后序和中前序的相似与不同]
高分三号卫星(GF-3)简介
Differential equations of satellite motion
Const usage
Is it safe to open a stock account at present? Can I open an account online directly
Pytorch two-dimensional multi-channel convolution operation method
Lumiprobe 活性染料丨氨基染料:花青5胺
BFD原理与配置
MySQL开发规范.pdf
Unity C basic review 28 - delegation with return (p449)
复数卷积神经网络:CV-CNN
目前股票开户安全吗?可以直接网上开户吗
信息学奥赛一本通1002:输出第二个整数
curl: (56) Recv failure: Connection reset by peer
打造一个 API 快速开发平台,牛逼!
Chapter IX app project test (the end of this chapter)