当前位置:网站首页>PostgreSQL database replication - background first-class citizen process walreceiver receiving and sending logic

PostgreSQL database replication - background first-class citizen process walreceiver receiving and sending logic

2022-06-11 04:31:00 Tertium ferrugosum

 Insert picture description here

Receiving logic

Let's say the background first-class citizen process WalReceiver The receiving logic should first look at the for Dead cycle , It has errors in the circulation process or end-of-streaming situation ( Stream replication is terminated by the master library ) You're going to quit . The execution process is as follows :

  1. First try walrcv_receive, If the data read len When it equals zero , Need to call WaitLatchOrSocket Blocking monitor WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SET event , That is, we want to wait for the network socket to read or latch Overtime / Set up .
  2. If you try to read out the data len It's not zero , That is, there are data (len > 0) perhaps end-of-streaming(len < 0). Enter the read cycle ( That's what this is for Process the received data ), If data is read , Call XLogWalRcvProcessMsg Function to process . Call again walrcv_receive Function to get the data length , If there is still data reading , Continued to XLogWalRcvProcessMsg Function to process ; Otherwise, in the len <0 Under the circumstances , shows end-of-streaming situation ( Stream replication is terminated by the master library ), Execute end stream copy , Exit stream replication copy Pattern (walrcv_endstreaming); If len Is equal to zero , This indicates that the data cannot be read from the network socket , Need to go to WaitLatchOrSocket Blocking listening Events , Wait for the network to become available .

If data can be received , You need to enter XLogWalRcvProcessMsg Function time , Description received data from the master database , Set up last_recv_timestamp For the current time , That is, the latest receiving time of the updated data , And set up ping_sent by false, That is to say walreciver There is no need to report to the main database walsender Send Reply.

static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) {
    
	int			hdrlen;
	XLogRecPtr	dataStart,	walEnd;
	TimestampTz sendTime;
	bool		replyRequested;
	resetStringInfo(&incoming_message);
	switch (type) {
    
		case 'w':{
    				/* WAL records */							
				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); /* copy message to StringInfo */
				if (len < hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg_internal("invalid WAL message received from primary")));
				appendBinaryStringInfo(&incoming_message, buf, hdrlen); //  take buf The data in is populated with incoming_message Of StringInfo 
				dataStart = pq_getmsgint64(&incoming_message); /* read the fields */  //  The data contains xlog Logical start position 
				walEnd = pq_getmsgint64(&incoming_message);                           //  The data contains xlog Logical end position 
				sendTime = pq_getmsgint64(&incoming_message);                         //  Send time 
				ProcessWalSndrMessage(walEnd, sendTime);
				buf += hdrlen; len -= hdrlen;
				XLogWalRcvWrite(buf, len, dataStart);
				break;
			}
		case 'k': {
    				/* Keepalive */						
				hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);  /* copy message to StringInfo */
				if (len != hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary")));
				appendBinaryStringInfo(&incoming_message, buf, hdrlen);
				/* read the fields */
				walEnd = pq_getmsgint64(&incoming_message);                         //  The last data contains xlog Logical end position 
				sendTime = pq_getmsgint64(&incoming_message);                       //  Send time 
				replyRequested = pq_getmsgbyte(&incoming_message);                  // primary Whether to request mirror echo reply 
				ProcessWalSndrMessage(walEnd, sendTime);
				/* If the primary requested a reply, send one immediately */
				if (replyRequested) XLogWalRcvSendReply(true, false);
				break;
			}
		default:
			ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %d", type)));
	}
}

ProcessWalSndrMessage The function updates the shared memory status after receiving a message from the master node . ‘walEnd’ and ‘sendTime’ Is the end of the latest news WAL And time stamp .

static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) {
    
	WalRcvData *walrcv = WalRcv;
	TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
	/* Update shared-memory status */
	SpinLockAcquire(&walrcv->mutex);
	if (walrcv->latestWalEnd < walEnd) walrcv->latestWalEndTime = sendTime;  //  If the message sent is xlog, The most recent received xlog Time for latestWalEndTime You need to update 
	walrcv->latestWalEnd = walEnd; //  Set the... Contained in the data xlog Logical end position 
	walrcv->lastMsgSendTime = sendTime;
	walrcv->lastMsgReceiptTime = lastMsgReceiptTime;  //  The time when the current data is received and processed 
	SpinLockRelease(&walrcv->mutex);
}

Send logic

XLogWalRcvSendReply Function to send a reply message to the master node , Indicates our current WAL Location 、 One of the earliest xmin And the current time .

  • If not set “force”, Only after enough time has elapsed since the last status update to achieve wal_receiver_status_interval Before sending a message .
  • If not set “force”, Do not meet the above conditions , Also to primary Send a message , Give Way primary Until we receive the data
  • If wal_receiver_status_interval Completely disabled and ‘force’ For false , This is a useless operation .
  • If force It's true , And ‘requestReply’ It's true , Ask for primary The server replies immediately after receiving this message . When approaching wal_receiver_timeout when , This is for the heartbeat .
  • If force It's true , ‘requestReply’ For false , Unwanted primary Server reply , This is startup The process asked us to send apply feedback When using .
static void XLogWalRcvSendReply(bool force, bool requestReply) {
    
	static XLogRecPtr writePtr = 0;
	static XLogRecPtr flushPtr = 0;
	XLogRecPtr	applyPtr;
	static TimestampTz sendTime = 0;
	TimestampTz now;
	/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */
	// wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send 
	if (!force && wal_receiver_status_interval <= 0) return;	
	now = GetCurrentTimestamp(); /* Get current timestamp. */
	/* We can compare the write and flush positions to the last message we sent without taking any lock, but the apply position requires a spin lock, so we don't check that unless something else has changed or 10 seconds have passed. This means that the apply WAL location will appear, from the primary's point of view, to lag slightly, but since this is only for reporting purposes and only on idle systems, that's probably OK. */ //  We can compare the write and refresh locations with the last message we sent without any locking , But the application location requires a spin lock , So we won't check it , Unless something else changes or  10  Seconds passed .  It means , From the perspective of the master node , application  WAL  The position will lag slightly , But since this is for reporting purposes only and only on idle systems , It may be ok 
	if (!force&& writePtr == LogstreamResult.Write&& flushPtr == LogstreamResult.Flush&& !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
	
	sendTime = now;

	/* Construct a new message */
	writePtr = LogstreamResult.Write;
	flushPtr = LogstreamResult.Flush;
	applyPtr = GetXLogReplayRecPtr(NULL);

	resetStringInfo(&reply_message);
	pq_sendbyte(&reply_message, 'r');
	pq_sendint64(&reply_message, writePtr);
	pq_sendint64(&reply_message, flushPtr);
	pq_sendint64(&reply_message, applyPtr);
	pq_sendint64(&reply_message, GetCurrentTimestamp());
	pq_sendbyte(&reply_message, requestReply ? 1 : 0);
	walrcv_send(wrconn, reply_message.data, reply_message.len);
}

XLogWalRcvSendHSFeedback Function to send a hot standby feedback message to the primary node (hot standby feedback message), Add the current time . If the user disables feedback , Then send the last message , tell wal sender Forget... On this standby server xmin. We also send this message when we first connect , Because the previous connection may have been set on the replication slot xmin. ( If we don't use slots , Then send a feedback message to explicitly set InvalidTransactionId It's harmless ).

static void XLogWalRcvSendHSFeedback(bool immed) {
    
	TimestampTz now;
	FullTransactionId nextFullXid;
	TransactionId nextXid, xmin, catalog_xmin;
	uint32		xmin_epoch, catalog_xmin_epoch;
	static TimestampTz sendTime = 0;	
	
	static bool primary_has_standby_xmin = true; /* initially true so we always send at least one feedback message */
	/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */ //  If the user does not want to report the status to the master node , Be sure to exit before doing anything 
	if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && !primary_has_standby_xmin)
		return;
	
	now = GetCurrentTimestamp();  /* Get current timestamp. */
	if (!immed) {
      //  Do not send immediately , Delay to wal_receiver_status_interval Send after time interval 
		/* Send feedback at most once per wal_receiver_status_interval. */  //  every other wal_receiver_status_interval The time interval , Send at least once 
		if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
		sendTime = now;
	}

	/* If Hot Standby is not yet accepting connections there is nothing to send. Check this after the interval has expired to reduce number of calls. * Bailing out here also ensures that we don't send feedback until we've read our own replication slot state, so we don't tell the primary to discard needed xmin or catalog_xmin from any slots that may exist on this replica. */
	//  If  Hot Standby  Connection has not been accepted , Then there is nothing to send .  Check this item after the interval expires to reduce the number of calls . The bail-out here also ensures that we do not send feedback before reading the status of our own replication slots , Therefore, we will not tell the master node to discard the required... From any slots that may exist on the replica  xmin  or  catalog_xmin.
	if (!HotStandbyActive()) return;

	/* Make the expensive call to get the oldest xmin once we are certain everything else has been checked.  Once we make sure everything else has been checked , Get the oldest  xmin. */
	if (hot_standby_feedback){
      //  need hot standby feedback
		GetReplicationHorizons(&xmin, &catalog_xmin);
	}else{
                          //  Otherwise set xmin and catalog_xmin by InvalidTransactionId
		xmin = InvalidTransactionId;
		catalog_xmin = InvalidTransactionId;
	}
	/* Get epoch and adjust if nextXid and oldestXmin are different sides of the epoch boundary.  If  nextXid  and  oldXmin  yes  epoch  Different sides of the boundary , obtain  epoch  And adjust  */
	nextFullXid = ReadNextFullTransactionId();
	nextXid = XidFromFullTransactionId(nextFullXid);
	xmin_epoch = EpochFromFullTransactionId(nextFullXid);
	catalog_xmin_epoch = xmin_epoch;
	if (nextXid < xmin)  xmin_epoch--;
	if (nextXid < catalog_xmin) catalog_xmin_epoch--;
	
	resetStringInfo(&reply_message);  /* Construct the message and send it. */
	pq_sendbyte(&reply_message, 'h');
	pq_sendint64(&reply_message, GetCurrentTimestamp());
	pq_sendint32(&reply_message, xmin);
	pq_sendint32(&reply_message, xmin_epoch);
	pq_sendint32(&reply_message, catalog_xmin);
	pq_sendint32(&reply_message, catalog_xmin_epoch);
	walrcv_send(wrconn, reply_message.data, reply_message.len);  //  send out  
	if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))  
		primary_has_standby_xmin = true;
	else
		primary_has_standby_xmin = false;  //  Unwanted hot standby feedback, No feedback next time 
}

If you are processing heartbeat messages ,primary request mirror echo reply (replyRequested by true), be walreciver Process call XLogWalRcvSendReply(true, false) Send a response . towards primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .
 Insert picture description here
If the inner loop walrcv_receive Read data length is less than or equal to zero , You need to exit the loop , And call XLogWalRcvSendReply(false, false) function ( Give Way primary Ku knows walreceiver Received some data ) and XLogWalRcvFlush(false) function ( Will write XLOG To brush plate , And notify startup Progress of process and main database ).
 Insert picture description here
about XLogWalRcvSendReply(false, false) function , If set, we don't want walreceiver Report the main library (wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send ), The function directly returns , No direction primary Send a response ; If set wal_receiver_status_interval, If no more XLOG Record (LogstreamResult.Write and Flush Equal to the last time we entered this function Write and Flush), And the time to enter the function twice is less than wal_receiver_status_interval, The function directly returns , No direction primary Send a response . In other cases primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply . about XLogWalRcvFlush(false) function , It will also call XLogWalRcvSendReply(false, false) function , Follow the above process . Call again XLogWalRcvSendHSFeedback(false) send out hotstandby feedback news .

static void XLogWalRcvFlush(bool dying) {
    
	if (LogstreamResult.Flush < LogstreamResult.Write) {
    
		WalRcvData *walrcv = WalRcv;
		issue_xlog_fsync(recvFile, recvSegNo);
		LogstreamResult.Flush = LogstreamResult.Write;		
		SpinLockAcquire(&walrcv->mutex);  /* Update shared-memory status */
		if (walrcv->flushedUpto < LogstreamResult.Flush) {
    
			walrcv->latestChunkStart = walrcv->flushedUpto; //  Last flush to disk xlog The logical location of 
			walrcv->flushedUpto = LogstreamResult.Flush;  //  Update current brush to disk xlog The logical location of 
			walrcv->receivedTLI = ThisTimeLineID;
		}
		SpinLockRelease(&walrcv->mutex);
		/* Signal the startup process and walsender that new WAL has arrived */
		WakeupRecovery();  //  Wake up the startup process 
		if (AllowCascadeReplication()) WalSndWakeup(); //  In case of cascading replication , Wake up the wal sender process 
        ...
		
		if (!dying) {
      /* Also let the primary know that we made some progress */
			XLogWalRcvSendReply(false, false);
			XLogWalRcvSendHSFeedback(false);
		}
	}
}

WaitLatchOrSocket The wait has timed out , That means we haven't received anything new . If we are surpassing wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server . Besides , If it's better than... Since we last sent the update wal_receiver_status_interval Long , Send status updates to the master node anyway , To report application WAL Any progress .

					bool		requestReply = false;					
					if (wal_receiver_timeout > 0) {
      /* Check if time since last receive from primary has reached the configured limit. */
						TimestampTz now = GetCurrentTimestamp();  // last_recv_timestamp Update when data is received 
						TimestampTz timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, wal_receiver_timeout);
						if (now >= timeout) //  exceed wal_receiver_timeout No data received 
							ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout")));

						/* We didn't receive anything new, for half of receiver replication timeout. Ping the server. */
						if (!ping_sent){
      //  Set after receiving data ping_sent by false, If set ping_sent Description has gone through the following code 
						    //  If we are surpassing  wal_receiver_timeout / 2  No messages were received from the server within the time period of , Send a packet to  ping  The server 
							timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, (wal_receiver_timeout / 2));
							if (now >= timeout) {
    
								requestReply = true;
								ping_sent = true;
							}
						}
					}
                    //  Carry out the contracting work 
					XLogWalRcvSendReply(requestReply, requestReply);
					XLogWalRcvSendHSFeedback(false);

XLogWalRcvSendReply(true, true) The function is over wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server , And request the main database to reply to the response . towards primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response needs primary reply .XLogWalRcvSendReply(false, false) If the function is set, we don't want walreceiver Report the main library (wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send ), The function directly returns , No direction primary Send a response ; If set wal_receiver_status_interval, If no more XLOG Record (LogstreamResult.Write and Flush Equal to the last time we entered this function Write and Flush), And the time to enter the function twice is less than wal_receiver_status_interval, The function directly returns , No direction primary Send a response . In other cases primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .

WalReceiver process MyLatch By startup The process is set to , Check startup Whether the process requests to force WalReceiver Send a response to the main library .startup The process asked us to send application feedback now . Make sure that before sending a reply , In shared memory fore_reply The flag is indeed set to false, So we don't miss new reply requests .XLogWalRcvSendReply(true, false) Functional direction primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .

				if (rc & WL_LATCH_SET){
    
					ResetLatch(MyLatch);
					ProcessWalRcvInterrupts();
					if (walrcv->force_reply){
    
						/* The recovery process has asked us to send apply feedback now. Make sure the flag is really set to false in shared memory before sending the reply, so we don't miss a new request for a reply. */
						walrcv->force_reply = false;
						pg_memory_barrier();
						XLogWalRcvSendReply(true, false);
					}
				}

WalRcvForceReply Function settings WalRcv->force_reply by true.(WaitForWALToBecomeAvailable/StartupXLOG Function will call WalRcvForceReply function ).

void WalRcvForceReply(void) {
    
	Latch	   *latch;
	WalRcv->force_reply = true;
	/* fetching the latch pointer might not be atomic, so use spinlock */
	SpinLockAcquire(&WalRcv->mutex);
	latch = WalRcv->latch;
	SpinLockRelease(&WalRcv->mutex);
	if (latch)	SetLatch(latch);
}

startup Process settings receiveStart and receiveStartTLI, requirement WalReceiver Stream replication
walReceiver process WalRcvWaitForStartPosition Function to wait for startup Process settings receiveStart and receiveStartTLI

startup Process requirements WalReceiver Send application feedback now
walReceiver Process call XLogWalRcvFlush Function already flush 了 XLOG Logical position wake up startup process

walReceiver Process signal processing function WalRcvDie walRcvStoppedCV
walReceiver process WalReceiverMain Handle walRcvState state walRcvStoppedCV
startup Process call ShutdownWalRcv function ConditionVariableBroadcast walReceiver process walRcvStoppedCV
WalRcvStreaming WalRcvRunning

原网站

版权声明
本文为[Tertium ferrugosum]所创,转载请带上原文链接,感谢
https://yzsam.com/2022/162/202206110417496797.html