当前位置:网站首页>PostgreSQL数据库复制——后台一等公民进程WalReceiver 收发逻辑
PostgreSQL数据库复制——后台一等公民进程WalReceiver 收发逻辑
2022-06-11 04:18:00 【肥叔菌】

接收逻辑
要说后台一等公民进程WalReceiver接收逻辑首先要看图中的for死循环,其在循环流程出现错误或者end-of-streaming情况(流复制由主库终止)才会退出。其执行流程如下所示:
- 首先尝试一把walrcv_receive,如果读取的数据len等于零的时候,需要调用WaitLatchOrSocket阻塞监听
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET事件,也就是我们想要等待网络套接字可读或者latch超时/被设置。 - 如果尝试出来读取的数据len不等于零,也就是有数据(len > 0)或者end-of-streaming(len < 0)。进入读取循环(也就是图中的for处理接收到的数据这层循环),如果读取到了数据,则调用XLogWalRcvProcessMsg函数进行处理。再次调用walrcv_receive函数获取数据长度,如果还有数据读出,则继续XLogWalRcvProcessMsg函数进行处理;否则在len <0的情况下,则说明end-of-streaming情况(流复制由主库终止),执行结束流复制,退出流复制copy模式(walrcv_endstreaming);如果len等于零,说明从网络套接字中读取不出来数据,需要进入WaitLatchOrSocket阻塞监听事件,等待网络可用。
如果能接收到数据,则需要进入XLogWalRcvProcessMsg函数时,说明从主库接收到了数据,设置last_recv_timestamp为当前时间,也就是更新数据最近接收时间,并设置ping_sent为false,也就是walreciver不需要向主库walsender进行发送Reply。
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) {
int hdrlen;
XLogRecPtr dataStart, walEnd;
TimestampTz sendTime;
bool replyRequested;
resetStringInfo(&incoming_message);
switch (type) {
case 'w':{
/* WAL records */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); /* copy message to StringInfo */
if (len < hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg_internal("invalid WAL message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen); // 将buf中的数据填充到incoming_message的StringInfo
dataStart = pq_getmsgint64(&incoming_message); /* read the fields */ // 数据包含的xlog逻辑开始位置
walEnd = pq_getmsgint64(&incoming_message); // 数据包含的xlog逻辑结束位置
sendTime = pq_getmsgint64(&incoming_message); // 发送时间
ProcessWalSndrMessage(walEnd, sendTime);
buf += hdrlen; len -= hdrlen;
XLogWalRcvWrite(buf, len, dataStart);
break;
}
case 'k': {
/* Keepalive */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); /* copy message to StringInfo */
if (len != hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
/* read the fields */
walEnd = pq_getmsgint64(&incoming_message); // 上次数据包含的xlog逻辑结束位置
sendTime = pq_getmsgint64(&incoming_message); // 发送时间
replyRequested = pq_getmsgbyte(&incoming_message); // primary是否请求mirror回送响应
ProcessWalSndrMessage(walEnd, sendTime);
/* If the primary requested a reply, send one immediately */
if (replyRequested) XLogWalRcvSendReply(true, false);
break;
}
default:
ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %d", type)));
}
}
ProcessWalSndrMessage函数在收到来自主节点的消息后更新共享内存状态。 ‘walEnd’ 和 ‘sendTime’ 是最新消息的结束 WAL 和时间戳。
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) {
WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
if (walrcv->latestWalEnd < walEnd) walrcv->latestWalEndTime = sendTime; // 如果发送过来的消息有xlog,则最近接收xlog的时间latestWalEndTime需要更新
walrcv->latestWalEnd = walEnd; // 设置数据包含的xlog逻辑结束位置
walrcv->lastMsgSendTime = sendTime;
walrcv->lastMsgReceiptTime = lastMsgReceiptTime; // 当前数据接收且处理的时间
SpinLockRelease(&walrcv->mutex);
}
发送逻辑
XLogWalRcvSendReply函数向主节点发送回复消息,指示我们当前的 WAL 位置、最早的 xmin 和当前时间。
- 如果未设置“force”,则仅在自上次状态更新后经过足够的时间以达到 wal_receiver_status_interval 时才发送消息。
- 如果未设置“force”,不满足上面的条件,也向primary发送消息,让primary知道我们接收到了数据
- 如果 wal_receiver_status_interval 完全被禁用并且 ‘force’ 为假,这是一个无用操作。
- 如果force为真,且 ‘requestReply’ 为真,则请求primary服务器在收到此消息后立即回复。 当接近 wal_receiver_timeout 时,这用于心跳。
- 如果force为真, ‘requestReply’ 为假,不需要primary服务器回复,这是在startup进程要求我们发送apply feedback时使用。
static void XLogWalRcvSendReply(bool force, bool requestReply) {
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now;
/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */
// wal_receiver_status_interval:备机向主机发送相关复制信息的最小频率,小于等于零说明不需要发送
if (!force && wal_receiver_status_interval <= 0) return;
now = GetCurrentTimestamp(); /* Get current timestamp. */
/* We can compare the write and flush positions to the last message we sent without taking any lock, but the apply position requires a spin lock, so we don't check that unless something else has changed or 10 seconds have passed. This means that the apply WAL location will appear, from the primary's point of view, to lag slightly, but since this is only for reporting purposes and only on idle systems, that's probably OK. */ // 我们可以将写入和刷新位置与我们发送的最后一条消息进行比较而无需任何锁定,但应用位置需要一个自旋锁,因此我们不会检查它,除非其他内容发生了变化或 10 秒过去了。 这意味着,从主节点的角度来看,应用 WAL 位置将出现略微滞后,但由于这仅用于报告目的且仅在空闲系统上,这可能没问题
if (!force&& writePtr == LogstreamResult.Write&& flushPtr == LogstreamResult.Flush&& !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
sendTime = now;
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
walrcv_send(wrconn, reply_message.data, reply_message.len);
}
XLogWalRcvSendHSFeedback函数向主节点发送热备用反馈消息(hot standby feedback message),加上当前时间。如果用户禁用反馈,则发送最后一条消息,告诉wal sender忘记此备用服务器上的 xmin。 我们还在第一次连接时发送此消息,因为先前的连接可能在复制槽上设置了 xmin。 (如果我们不使用插槽,则发送反馈消息明确设置 InvalidTransactionId 是无害的)。
static void XLogWalRcvSendHSFeedback(bool immed) {
TimestampTz now;
FullTransactionId nextFullXid;
TransactionId nextXid, xmin, catalog_xmin;
uint32 xmin_epoch, catalog_xmin_epoch;
static TimestampTz sendTime = 0;
static bool primary_has_standby_xmin = true; /* initially true so we always send at least one feedback message */
/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */ // 如果用户不希望将状态报告给主节点,请务必在执行任何操作之前退出
if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && !primary_has_standby_xmin)
return;
now = GetCurrentTimestamp(); /* Get current timestamp. */
if (!immed) {
// 不立刻发送,延迟到wal_receiver_status_interval时间间隔后发送
/* Send feedback at most once per wal_receiver_status_interval. */ // 每隔wal_receiver_status_interval时间间隔,至少发送一次
if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
sendTime = now;
}
/* If Hot Standby is not yet accepting connections there is nothing to send. Check this after the interval has expired to reduce number of calls. * Bailing out here also ensures that we don't send feedback until we've read our own replication slot state, so we don't tell the primary to discard needed xmin or catalog_xmin from any slots that may exist on this replica. */
// 如果 Hot Standby 尚未接受连接,则没有可发送的内容。 在间隔到期后检查此项以减少呼叫次数。在这里纾困还确保我们在读取自己的复制槽状态之前不会发送反馈,因此我们不会告诉主节点从该副本上可能存在的任何槽中丢弃所需的 xmin 或 catalog_xmin。
if (!HotStandbyActive()) return;
/* Make the expensive call to get the oldest xmin once we are certain everything else has been checked. 一旦我们确定其他所有内容都已检查,获取最旧的 xmin。 */
if (hot_standby_feedback){
// 需要hot standby feedback
GetReplicationHorizons(&xmin, &catalog_xmin);
}else{
// 否则设置xmin和catalog_xmin为InvalidTransactionId
xmin = InvalidTransactionId;
catalog_xmin = InvalidTransactionId;
}
/* Get epoch and adjust if nextXid and oldestXmin are different sides of the epoch boundary. 如果 nextXid 和 oldXmin 是 epoch 边界的不同边,获取 epoch 并调整 */
nextFullXid = ReadNextFullTransactionId();
nextXid = XidFromFullTransactionId(nextFullXid);
xmin_epoch = EpochFromFullTransactionId(nextFullXid);
catalog_xmin_epoch = xmin_epoch;
if (nextXid < xmin) xmin_epoch--;
if (nextXid < catalog_xmin) catalog_xmin_epoch--;
resetStringInfo(&reply_message); /* Construct the message and send it. */
pq_sendbyte(&reply_message, 'h');
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
pq_sendint32(&reply_message, catalog_xmin);
pq_sendint32(&reply_message, catalog_xmin_epoch);
walrcv_send(wrconn, reply_message.data, reply_message.len); // 发送
if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
primary_has_standby_xmin = true;
else
primary_has_standby_xmin = false; // 不需要hot standby feedback,下次不反馈了
}
如果是处理心跳消息,primary请求mirror回送响应(replyRequested为true),则walreciver进程调用XLogWalRcvSendReply(true, false)发送响应。向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。
如果内层循环walrcv_receive读取数据长度小于等于零,需要退出循环,并调用XLogWalRcvSendReply(false, false)函数(让primary库知道walreceiver收到了一些数据)和XLogWalRcvFlush(false)函数(将写入的XLOG进行刷盘,并通知startup进程和主库进展)。
对于XLogWalRcvSendReply(false, false)函数,如果设置了我们不想让walreceiver报告主库(wal_receiver_status_interval:备机向主机发送相关复制信息的最小频率,小于等于零说明不需要发送),则该函数直接返回,不向primary发送响应;如果设置了wal_receiver_status_interval,如果没有再接收到XLOG记录(LogstreamResult.Write和Flush等于上次进该函数时的Write和Flush),且两次进入该函数的时间小于wal_receiver_status_interval,则该函数直接返回,不向primary发送响应。其他情况下向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。对于XLogWalRcvFlush(false)函数,同样会调用XLogWalRcvSendReply(false, false)函数,执行如上流程。再调用XLogWalRcvSendHSFeedback(false)发送hotstandby feedback消息。
static void XLogWalRcvFlush(bool dying) {
if (LogstreamResult.Flush < LogstreamResult.Write) {
WalRcvData *walrcv = WalRcv;
issue_xlog_fsync(recvFile, recvSegNo);
LogstreamResult.Flush = LogstreamResult.Write;
SpinLockAcquire(&walrcv->mutex); /* Update shared-memory status */
if (walrcv->flushedUpto < LogstreamResult.Flush) {
walrcv->latestChunkStart = walrcv->flushedUpto; // 上次刷入磁盘xlog的逻辑位置
walrcv->flushedUpto = LogstreamResult.Flush; // 更新目前刷入磁盘xlog的逻辑位置
walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery(); // 唤醒startup进程
if (AllowCascadeReplication()) WalSndWakeup(); // 如果是级联复制,唤醒wal sender进程
...
if (!dying) {
/* Also let the primary know that we made some progress */
XLogWalRcvSendReply(false, false);
XLogWalRcvSendHSFeedback(false);
}
}
}
WaitLatchOrSocket等待超时了,说明我们没有收到任何新的东西。 如果我们在超过 wal_receiver_timeout / 2 的时间内没有收到来自服务器的任何消息,则发送包去 ping 服务器。 此外,如果自我们上次发送更新以来它比wal_receiver_status_interval 长,无论如何都要向主节点发送状态更新,以报告应用 WAL 的任何进度。
bool requestReply = false;
if (wal_receiver_timeout > 0) {
/* Check if time since last receive from primary has reached the configured limit. */
TimestampTz now = GetCurrentTimestamp(); // last_recv_timestamp接收到数据时就更新
TimestampTz timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, wal_receiver_timeout);
if (now >= timeout) // 超过wal_receiver_timeout没有收到任何数据
ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout")));
/* We didn't receive anything new, for half of receiver replication timeout. Ping the server. */
if (!ping_sent){
// 在接收到数据设置ping_sent为false,如果设置了ping_sent说明已经走过下面的代码
// 如果我们在超过 wal_receiver_timeout / 2 的时间内没有收到来自服务器的任何消息,则发送包去 ping 服务器
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, (wal_receiver_timeout / 2));
if (now >= timeout) {
requestReply = true;
ping_sent = true;
}
}
}
// 进行发包工作
XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback(false);
XLogWalRcvSendReply(true, true)函数在在超过 wal_receiver_timeout / 2 的时间内没有收到来自服务器的任何消息,则发送包去 ping 服务器,并且请求主库回复该响应。向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应需要primary回复。XLogWalRcvSendReply(false, false)函数如果设置了我们不想让walreceiver报告主库(wal_receiver_status_interval:备机向主机发送相关复制信息的最小频率,小于等于零说明不需要发送),则该函数直接返回,不向primary发送响应;如果设置了wal_receiver_status_interval,如果没有再接收到XLOG记录(LogstreamResult.Write和Flush等于上次进该函数时的Write和Flush),且两次进入该函数的时间小于wal_receiver_status_interval,则该函数直接返回,不向primary发送响应。其他情况下向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。
WalReceiver进程MyLatch被startup进程设置了,检查startup进程是否请求强制WalReceiver向主库发送响应。startup进程要求我们现在发送应用反馈。 确保在发送回复之前,共享内存中的fore_reply标志确实设置为 false,这样我们就不会错过新的回复请求。XLogWalRcvSendReply(true, false)函数向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。
if (rc & WL_LATCH_SET){
ResetLatch(MyLatch);
ProcessWalRcvInterrupts();
if (walrcv->force_reply){
/* The recovery process has asked us to send apply feedback now. Make sure the flag is really set to false in shared memory before sending the reply, so we don't miss a new request for a reply. */
walrcv->force_reply = false;
pg_memory_barrier();
XLogWalRcvSendReply(true, false);
}
}
WalRcvForceReply函数设置WalRcv->force_reply为true。(WaitForWALToBecomeAvailable/StartupXLOG函数会调用WalRcvForceReply函数)。
void WalRcvForceReply(void) {
Latch *latch;
WalRcv->force_reply = true;
/* fetching the latch pointer might not be atomic, so use spinlock */
SpinLockAcquire(&WalRcv->mutex);
latch = WalRcv->latch;
SpinLockRelease(&WalRcv->mutex);
if (latch) SetLatch(latch);
}
startup进程设置receiveStart和receiveStartTLI,要求WalReceiver进行流复制
walReceiver进程WalRcvWaitForStartPosition函数等待startup进程设置receiveStart和receiveStartTLI
startup进程要求WalReceiver进行现在发送应用反馈
walReceiver进程调用XLogWalRcvFlush函数已经flush了XLOG逻辑位置唤醒startup进程
walReceiver进程信号处理函数WalRcvDie walRcvStoppedCV
walReceiver进程WalReceiverMain处理walRcvState状态 walRcvStoppedCV
startup进程调用ShutdownWalRcv函数ConditionVariableBroadcast walReceiver进程walRcvStoppedCV
WalRcvStreaming WalRcvRunning
边栏推荐
- Eth relay interface
- 正大国际琪貨:交易市场
- 谷歌的代码覆盖率最佳实践
- JVM (6): slot variable slot, operand stack, code trace, stack top cache technology
- JVM (4): active and passive use of classes, internal structure of runtime data area, JVM thread description, PC register
- Embedded basic interface -spi
- 把所有单词拆分成单个字词,删选适合公司得产品词库
- Embedded basic interface-i2s
- [laser principle and application-2]: key domestic laser brands
- 一款自适应的聊天网站-匿名在线聊天室PHP源码
猜你喜欢

Seven easy-to-use decorators

强大新UI装逼神器微信小程序源码+多模板支持多种流量主模式

Unity Editor Extension save location

The live broadcast helped Hangzhou e-commerce Unicorn impact the listing, and the ledger system restructured the new pattern of e-commerce transactions

零时科技 | Discover 闪电贷攻击事件分析

Unity item model rotating display

司马炎爷爷 告诉你什么叫做内卷!

写给通信年轻人的27个忠告

Ultra simple cameraX face recognition effect package

Statistical knowledge required by data analysts
随机推荐
Guanghetong officially released the annual theme of 2022 5g Huanxin: Five Forces co drive · Jizhi future
从初代播种到落地生花,5G商用三周年“催生万物”
The live broadcast helped Hangzhou e-commerce Unicorn impact the listing, and the ledger system restructured the new pattern of e-commerce transactions
谷歌的代码覆盖率最佳实践
Embedded basic interface-i2s
图像检测相关模型数据格式
Eth relay interface
Guanghetong LTE Cat4 module l716 is upgraded to provide affordable and universal wireless applications for the IOT industry
零时科技 | Discover 闪电贷攻击事件分析
使用工具类按一定规则读取Excel文件
如何检查域名解析是否生效?
【服务器数据恢复】同友存储raid5崩溃的数据恢复案例
Unity 地图映射
Unity item model rotating display
国际琪貨:做正大主帐户风险有那些
CES 2022 𞓜 guanghetong LTE module fm101 has obtained two important ce/fcc certifications with excellent performance
Unity creates rivers on uneven terrain
Vulkan official example interpretation raytracing
国际期货黄金手续费怎么算?
Emlog新版导航源码/带用户中心