当前位置:网站首页>PostgreSQL数据库复制——后台一等公民进程WalReceiver 收发逻辑

PostgreSQL数据库复制——后台一等公民进程WalReceiver 收发逻辑

2022-06-11 04:18:00 肥叔菌

在这里插入图片描述

接收逻辑

要说后台一等公民进程WalReceiver接收逻辑首先要看图中的for死循环,其在循环流程出现错误或者end-of-streaming情况(流复制由主库终止)才会退出。其执行流程如下所示:

  1. 首先尝试一把walrcv_receive,如果读取的数据len等于零的时候,需要调用WaitLatchOrSocket阻塞监听WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET事件,也就是我们想要等待网络套接字可读或者latch超时/被设置。
  2. 如果尝试出来读取的数据len不等于零,也就是有数据(len > 0)或者end-of-streaming(len < 0)。进入读取循环(也就是图中的for处理接收到的数据这层循环),如果读取到了数据,则调用XLogWalRcvProcessMsg函数进行处理。再次调用walrcv_receive函数获取数据长度,如果还有数据读出,则继续XLogWalRcvProcessMsg函数进行处理;否则在len <0的情况下,则说明end-of-streaming情况(流复制由主库终止),执行结束流复制,退出流复制copy模式(walrcv_endstreaming);如果len等于零,说明从网络套接字中读取不出来数据,需要进入WaitLatchOrSocket阻塞监听事件,等待网络可用。

如果能接收到数据,则需要进入XLogWalRcvProcessMsg函数时,说明从主库接收到了数据,设置last_recv_timestamp为当前时间,也就是更新数据最近接收时间,并设置ping_sent为false,也就是walreciver不需要向主库walsender进行发送Reply。

static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) {
    
	int			hdrlen;
	XLogRecPtr	dataStart,	walEnd;
	TimestampTz sendTime;
	bool		replyRequested;
	resetStringInfo(&incoming_message);
	switch (type) {
    
		case 'w':{
    				/* WAL records */							
				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); /* copy message to StringInfo */
				if (len < hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg_internal("invalid WAL message received from primary")));
				appendBinaryStringInfo(&incoming_message, buf, hdrlen); // 将buf中的数据填充到incoming_message的StringInfo 
				dataStart = pq_getmsgint64(&incoming_message); /* read the fields */  // 数据包含的xlog逻辑开始位置
				walEnd = pq_getmsgint64(&incoming_message);                           // 数据包含的xlog逻辑结束位置
				sendTime = pq_getmsgint64(&incoming_message);                         // 发送时间
				ProcessWalSndrMessage(walEnd, sendTime);
				buf += hdrlen; len -= hdrlen;
				XLogWalRcvWrite(buf, len, dataStart);
				break;
			}
		case 'k': {
    				/* Keepalive */						
				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);  /* copy message to StringInfo */
				if (len != hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary")));
				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
				/* read the fields */
				walEnd = pq_getmsgint64(&incoming_message);                         // 上次数据包含的xlog逻辑结束位置
				sendTime = pq_getmsgint64(&incoming_message);                       // 发送时间
				replyRequested = pq_getmsgbyte(&incoming_message);                  // primary是否请求mirror回送响应
				ProcessWalSndrMessage(walEnd, sendTime);
				/* If the primary requested a reply, send one immediately */
				if (replyRequested) XLogWalRcvSendReply(true, false);
				break;
			}
		default:
			ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %d", type)));
	}
}

ProcessWalSndrMessage函数在收到来自主节点的消息后更新共享内存状态。 ‘walEnd’ 和 ‘sendTime’ 是最新消息的结束 WAL 和时间戳。

static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) {
    
	WalRcvData *walrcv = WalRcv;
	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
	/* Update shared-memory status */
	SpinLockAcquire(&walrcv->mutex);
	if (walrcv->latestWalEnd < walEnd) walrcv->latestWalEndTime = sendTime;  // 如果发送过来的消息有xlog,则最近接收xlog的时间latestWalEndTime需要更新
	walrcv->latestWalEnd = walEnd; // 设置数据包含的xlog逻辑结束位置
	walrcv->lastMsgSendTime = sendTime;
	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;  // 当前数据接收且处理的时间
	SpinLockRelease(&walrcv->mutex);
}

发送逻辑

XLogWalRcvSendReply函数向主节点发送回复消息,指示我们当前的 WAL 位置、最早的 xmin 和当前时间。

  • 如果未设置“force”,则仅在自上次状态更新后经过足够的时间以达到 wal_receiver_status_interval 时才发送消息。
  • 如果未设置“force”,不满足上面的条件,也向primary发送消息,让primary知道我们接收到了数据
  • 如果 wal_receiver_status_interval 完全被禁用并且 ‘force’ 为假,这是一个无用操作。
  • 如果force为真,且 ‘requestReply’ 为真,则请求primary服务器在收到此消息后立即回复。 当接近 wal_receiver_timeout 时,这用于心跳。
  • 如果force为真, ‘requestReply’ 为假,不需要primary服务器回复,这是在startup进程要求我们发送apply feedback时使用。
static void XLogWalRcvSendReply(bool force, bool requestReply) {
    
	static XLogRecPtr writePtr = 0;
	static XLogRecPtr flushPtr = 0;
	XLogRecPtr	applyPtr;
	static TimestampTz sendTime = 0;
	TimestampTz now;
	/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */
	// wal_receiver_status_interval:备机向主机发送相关复制信息的最小频率,小于等于零说明不需要发送
	if (!force && wal_receiver_status_interval <= 0) return;	
	now = GetCurrentTimestamp(); /* Get current timestamp. */
	/* We can compare the write and flush positions to the last message we sent without taking any lock, but the apply position requires a spin lock, so we don't check that unless something else has changed or 10 seconds have passed. This means that the apply WAL location will appear, from the primary's point of view, to lag slightly, but since this is only for reporting purposes and only on idle systems, that's probably OK. */ // 我们可以将写入和刷新位置与我们发送的最后一条消息进行比较而无需任何锁定,但应用位置需要一个自旋锁,因此我们不会检查它,除非其他内容发生了变化或 10 秒过去了。 这意味着,从主节点的角度来看,应用 WAL 位置将出现略微滞后,但由于这仅用于报告目的且仅在空闲系统上,这可能没问题
	if (!force&& writePtr == LogstreamResult.Write&& flushPtr == LogstreamResult.Flush&& !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
	
	sendTime = now;

	/* Construct a new message */
	writePtr = LogstreamResult.Write;
	flushPtr = LogstreamResult.Flush;
	applyPtr = GetXLogReplayRecPtr(NULL);

	resetStringInfo(&reply_message);
	pq_sendbyte(&reply_message, 'r');
	pq_sendint64(&reply_message, writePtr);
	pq_sendint64(&reply_message, flushPtr);
	pq_sendint64(&reply_message, applyPtr);
	pq_sendint64(&reply_message, GetCurrentTimestamp());
	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
	walrcv_send(wrconn, reply_message.data, reply_message.len);
}

XLogWalRcvSendHSFeedback函数向主节点发送热备用反馈消息(hot standby feedback message),加上当前时间。如果用户禁用反馈,则发送最后一条消息,告诉wal sender忘记此备用服务器上的 xmin。 我们还在第一次连接时发送此消息,因为先前的连接可能在复制槽上设置了 xmin。 (如果我们不使用插槽,则发送反馈消息明确设置 InvalidTransactionId 是无害的)。

static void XLogWalRcvSendHSFeedback(bool immed) {
    
	TimestampTz now;
	FullTransactionId nextFullXid;
	TransactionId nextXid, xmin, catalog_xmin;
	uint32		xmin_epoch, catalog_xmin_epoch;
	static TimestampTz sendTime = 0;	
	
	static bool primary_has_standby_xmin = true; /* initially true so we always send at least one feedback message */
	/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */ // 如果用户不希望将状态报告给主节点,请务必在执行任何操作之前退出
	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && !primary_has_standby_xmin)
		return;
	
	now = GetCurrentTimestamp();  /* Get current timestamp. */
	if (!immed) {
      // 不立刻发送,延迟到wal_receiver_status_interval时间间隔后发送
		/* Send feedback at most once per wal_receiver_status_interval. */  // 每隔wal_receiver_status_interval时间间隔,至少发送一次
		if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
		sendTime = now;
	}

	/* If Hot Standby is not yet accepting connections there is nothing to send. Check this after the interval has expired to reduce number of calls. * Bailing out here also ensures that we don't send feedback until we've read our own replication slot state, so we don't tell the primary to discard needed xmin or catalog_xmin from any slots that may exist on this replica. */
	// 如果 Hot Standby 尚未接受连接,则没有可发送的内容。 在间隔到期后检查此项以减少呼叫次数。在这里纾困还确保我们在读取自己的复制槽状态之前不会发送反馈,因此我们不会告诉主节点从该副本上可能存在的任何槽中丢弃所需的 xmin 或 catalog_xmin。
	if (!HotStandbyActive()) return;

	/* Make the expensive call to get the oldest xmin once we are certain everything else has been checked. 一旦我们确定其他所有内容都已检查,获取最旧的 xmin。 */
	if (hot_standby_feedback){
      // 需要hot standby feedback
		GetReplicationHorizons(&xmin, &catalog_xmin);
	}else{
                          // 否则设置xmin和catalog_xmin为InvalidTransactionId
		xmin = InvalidTransactionId;
		catalog_xmin = InvalidTransactionId;
	}
	/* Get epoch and adjust if nextXid and oldestXmin are different sides of the epoch boundary. 如果 nextXid 和 oldXmin 是 epoch 边界的不同边,获取 epoch 并调整 */
	nextFullXid = ReadNextFullTransactionId();
	nextXid = XidFromFullTransactionId(nextFullXid);
	xmin_epoch = EpochFromFullTransactionId(nextFullXid);
	catalog_xmin_epoch = xmin_epoch;
	if (nextXid < xmin)  xmin_epoch--;
	if (nextXid < catalog_xmin) catalog_xmin_epoch--;
	
	resetStringInfo(&reply_message);  /* Construct the message and send it. */
	pq_sendbyte(&reply_message, 'h');
	pq_sendint64(&reply_message, GetCurrentTimestamp());
	pq_sendint32(&reply_message, xmin);
	pq_sendint32(&reply_message, xmin_epoch);
	pq_sendint32(&reply_message, catalog_xmin);
	pq_sendint32(&reply_message, catalog_xmin_epoch);
	walrcv_send(wrconn, reply_message.data, reply_message.len);  // 发送 
	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))  
		primary_has_standby_xmin = true;
	else
		primary_has_standby_xmin = false;  // 不需要hot standby feedback,下次不反馈了
}

如果是处理心跳消息,primary请求mirror回送响应(replyRequested为true),则walreciver进程调用XLogWalRcvSendReply(true, false)发送响应。向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。
在这里插入图片描述
如果内层循环walrcv_receive读取数据长度小于等于零,需要退出循环,并调用XLogWalRcvSendReply(false, false)函数(让primary库知道walreceiver收到了一些数据)和XLogWalRcvFlush(false)函数(将写入的XLOG进行刷盘,并通知startup进程和主库进展)。
在这里插入图片描述
对于XLogWalRcvSendReply(false, false)函数,如果设置了我们不想让walreceiver报告主库(wal_receiver_status_interval:备机向主机发送相关复制信息的最小频率,小于等于零说明不需要发送),则该函数直接返回,不向primary发送响应;如果设置了wal_receiver_status_interval,如果没有再接收到XLOG记录(LogstreamResult.Write和Flush等于上次进该函数时的Write和Flush),且两次进入该函数的时间小于wal_receiver_status_interval,则该函数直接返回,不向primary发送响应。其他情况下向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。对于XLogWalRcvFlush(false)函数,同样会调用XLogWalRcvSendReply(false, false)函数,执行如上流程。再调用XLogWalRcvSendHSFeedback(false)发送hotstandby feedback消息。

static void XLogWalRcvFlush(bool dying) {
    
	if (LogstreamResult.Flush < LogstreamResult.Write) {
    
		WalRcvData *walrcv = WalRcv;
		issue_xlog_fsync(recvFile, recvSegNo);
		LogstreamResult.Flush = LogstreamResult.Write;		
		SpinLockAcquire(&walrcv->mutex);  /* Update shared-memory status */
		if (walrcv->flushedUpto < LogstreamResult.Flush) {
    
			walrcv->latestChunkStart = walrcv->flushedUpto; // 上次刷入磁盘xlog的逻辑位置
			walrcv->flushedUpto = LogstreamResult.Flush;  // 更新目前刷入磁盘xlog的逻辑位置
			walrcv->receivedTLI = ThisTimeLineID;
		}
		SpinLockRelease(&walrcv->mutex);
		/* Signal the startup process and walsender that new WAL has arrived */
		WakeupRecovery();  // 唤醒startup进程
		if (AllowCascadeReplication()) WalSndWakeup(); // 如果是级联复制,唤醒wal sender进程
        ...
		
		if (!dying) {
      /* Also let the primary know that we made some progress */
			XLogWalRcvSendReply(false, false);
			XLogWalRcvSendHSFeedback(false);
		}
	}
}

WaitLatchOrSocket等待超时了,说明我们没有收到任何新的东西。 如果我们在超过 wal_receiver_timeout / 2 的时间内没有收到来自服务器的任何消息,则发送包去 ping 服务器。 此外,如果自我们上次发送更新以来它比wal_receiver_status_interval 长,无论如何都要向主节点发送状态更新,以报告应用 WAL 的任何进度。

					bool		requestReply = false;					
					if (wal_receiver_timeout > 0) {
      /* Check if time since last receive from primary has reached the configured limit. */
						TimestampTz now = GetCurrentTimestamp();  // last_recv_timestamp接收到数据时就更新
						TimestampTz timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, wal_receiver_timeout);
						if (now >= timeout) // 超过wal_receiver_timeout没有收到任何数据
							ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout")));

						/* We didn't receive anything new, for half of receiver replication timeout. Ping the server. */
						if (!ping_sent){
      // 在接收到数据设置ping_sent为false,如果设置了ping_sent说明已经走过下面的代码
						    // 如果我们在超过 wal_receiver_timeout / 2 的时间内没有收到来自服务器的任何消息,则发送包去 ping 服务器
							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, (wal_receiver_timeout / 2));
							if (now >= timeout) {
    
								requestReply = true;
								ping_sent = true;
							}
						}
					}
                    // 进行发包工作
					XLogWalRcvSendReply(requestReply, requestReply);
					XLogWalRcvSendHSFeedback(false);

XLogWalRcvSendReply(true, true)函数在在超过 wal_receiver_timeout / 2 的时间内没有收到来自服务器的任何消息,则发送包去 ping 服务器,并且请求主库回复该响应。向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应需要primary回复。XLogWalRcvSendReply(false, false)函数如果设置了我们不想让walreceiver报告主库(wal_receiver_status_interval:备机向主机发送相关复制信息的最小频率,小于等于零说明不需要发送),则该函数直接返回,不向primary发送响应;如果设置了wal_receiver_status_interval,如果没有再接收到XLOG记录(LogstreamResult.Write和Flush等于上次进该函数时的Write和Flush),且两次进入该函数的时间小于wal_receiver_status_interval,则该函数直接返回,不向primary发送响应。其他情况下向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。

WalReceiver进程MyLatch被startup进程设置了,检查startup进程是否请求强制WalReceiver向主库发送响应。startup进程要求我们现在发送应用反馈。 确保在发送回复之前,共享内存中的fore_reply标志确实设置为 false,这样我们就不会错过新的回复请求。XLogWalRcvSendReply(true, false)函数向primary发送walReceiver进程已经write和flush的XLOG逻辑位置和startup进程已经apply的XLOG逻辑位置和当前mirror的时间,并设置该响应不需要primary回复。

				if (rc & WL_LATCH_SET){
    
					ResetLatch(MyLatch);
					ProcessWalRcvInterrupts();
					if (walrcv->force_reply){
    
						/* The recovery process has asked us to send apply feedback now. Make sure the flag is really set to false in shared memory before sending the reply, so we don't miss a new request for a reply. */
						walrcv->force_reply = false;
						pg_memory_barrier();
						XLogWalRcvSendReply(true, false);
					}
				}

WalRcvForceReply函数设置WalRcv->force_reply为true。(WaitForWALToBecomeAvailable/StartupXLOG函数会调用WalRcvForceReply函数)。

void WalRcvForceReply(void) {
    
	Latch	   *latch;
	WalRcv->force_reply = true;
	/* fetching the latch pointer might not be atomic, so use spinlock */
	SpinLockAcquire(&WalRcv->mutex);
	latch = WalRcv->latch;
	SpinLockRelease(&WalRcv->mutex);
	if (latch)	SetLatch(latch);
}

startup进程设置receiveStart和receiveStartTLI,要求WalReceiver进行流复制
walReceiver进程WalRcvWaitForStartPosition函数等待startup进程设置receiveStart和receiveStartTLI

startup进程要求WalReceiver进行现在发送应用反馈
walReceiver进程调用XLogWalRcvFlush函数已经flush了XLOG逻辑位置唤醒startup进程

walReceiver进程信号处理函数WalRcvDie walRcvStoppedCV
walReceiver进程WalReceiverMain处理walRcvState状态 walRcvStoppedCV
startup进程调用ShutdownWalRcv函数ConditionVariableBroadcast walReceiver进程walRcvStoppedCV
WalRcvStreaming WalRcvRunning

原网站

版权声明
本文为[肥叔菌]所创,转载请带上原文链接,感谢
https://feishujun.blog.csdn.net/article/details/125201105