当前位置:网站首页>PostgreSQL database replication - background first-class citizen process walreceiver receiving and sending logic
PostgreSQL database replication - background first-class citizen process walreceiver receiving and sending logic
2022-06-11 04:31:00 【Tertium ferrugosum】

Receiving logic
Let's say the background first-class citizen process WalReceiver The receiving logic should first look at the for Dead cycle , It has errors in the circulation process or end-of-streaming situation ( Stream replication is terminated by the master library ) You're going to quit . The execution process is as follows :
- First try walrcv_receive, If the data read len When it equals zero , Need to call WaitLatchOrSocket Blocking monitor
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SETevent , That is, we want to wait for the network socket to read or latch Overtime / Set up . - If you try to read out the data len It's not zero , That is, there are data (len > 0) perhaps end-of-streaming(len < 0). Enter the read cycle ( That's what this is for Process the received data ), If data is read , Call XLogWalRcvProcessMsg Function to process . Call again walrcv_receive Function to get the data length , If there is still data reading , Continued to XLogWalRcvProcessMsg Function to process ; Otherwise, in the len <0 Under the circumstances , shows end-of-streaming situation ( Stream replication is terminated by the master library ), Execute end stream copy , Exit stream replication copy Pattern (walrcv_endstreaming); If len Is equal to zero , This indicates that the data cannot be read from the network socket , Need to go to WaitLatchOrSocket Blocking listening Events , Wait for the network to become available .
If data can be received , You need to enter XLogWalRcvProcessMsg Function time , Description received data from the master database , Set up last_recv_timestamp For the current time , That is, the latest receiving time of the updated data , And set up ping_sent by false, That is to say walreciver There is no need to report to the main database walsender Send Reply.
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) {
int hdrlen;
XLogRecPtr dataStart, walEnd;
TimestampTz sendTime;
bool replyRequested;
resetStringInfo(&incoming_message);
switch (type) {
case 'w':{
/* WAL records */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); /* copy message to StringInfo */
if (len < hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg_internal("invalid WAL message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen); // take buf The data in is populated with incoming_message Of StringInfo
dataStart = pq_getmsgint64(&incoming_message); /* read the fields */ // The data contains xlog Logical start position
walEnd = pq_getmsgint64(&incoming_message); // The data contains xlog Logical end position
sendTime = pq_getmsgint64(&incoming_message); // Send time
ProcessWalSndrMessage(walEnd, sendTime);
buf += hdrlen; len -= hdrlen;
XLogWalRcvWrite(buf, len, dataStart);
break;
}
case 'k': {
/* Keepalive */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); /* copy message to StringInfo */
if (len != hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
/* read the fields */
walEnd = pq_getmsgint64(&incoming_message); // The last data contains xlog Logical end position
sendTime = pq_getmsgint64(&incoming_message); // Send time
replyRequested = pq_getmsgbyte(&incoming_message); // primary Whether to request mirror echo reply
ProcessWalSndrMessage(walEnd, sendTime);
/* If the primary requested a reply, send one immediately */
if (replyRequested) XLogWalRcvSendReply(true, false);
break;
}
default:
ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %d", type)));
}
}
ProcessWalSndrMessage The function updates the shared memory status after receiving a message from the master node . ‘walEnd’ and ‘sendTime’ Is the end of the latest news WAL And time stamp .
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) {
WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
if (walrcv->latestWalEnd < walEnd) walrcv->latestWalEndTime = sendTime; // If the message sent is xlog, The most recent received xlog Time for latestWalEndTime You need to update
walrcv->latestWalEnd = walEnd; // Set the... Contained in the data xlog Logical end position
walrcv->lastMsgSendTime = sendTime;
walrcv->lastMsgReceiptTime = lastMsgReceiptTime; // The time when the current data is received and processed
SpinLockRelease(&walrcv->mutex);
}
Send logic
XLogWalRcvSendReply Function to send a reply message to the master node , Indicates our current WAL Location 、 One of the earliest xmin And the current time .
- If not set “force”, Only after enough time has elapsed since the last status update to achieve wal_receiver_status_interval Before sending a message .
- If not set “force”, Do not meet the above conditions , Also to primary Send a message , Give Way primary Until we receive the data
- If wal_receiver_status_interval Completely disabled and ‘force’ For false , This is a useless operation .
- If force It's true , And ‘requestReply’ It's true , Ask for primary The server replies immediately after receiving this message . When approaching wal_receiver_timeout when , This is for the heartbeat .
- If force It's true , ‘requestReply’ For false , Unwanted primary Server reply , This is startup The process asked us to send apply feedback When using .
static void XLogWalRcvSendReply(bool force, bool requestReply) {
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now;
/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */
// wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send
if (!force && wal_receiver_status_interval <= 0) return;
now = GetCurrentTimestamp(); /* Get current timestamp. */
/* We can compare the write and flush positions to the last message we sent without taking any lock, but the apply position requires a spin lock, so we don't check that unless something else has changed or 10 seconds have passed. This means that the apply WAL location will appear, from the primary's point of view, to lag slightly, but since this is only for reporting purposes and only on idle systems, that's probably OK. */ // We can compare the write and refresh locations with the last message we sent without any locking , But the application location requires a spin lock , So we won't check it , Unless something else changes or 10 Seconds passed . It means , From the perspective of the master node , application WAL The position will lag slightly , But since this is for reporting purposes only and only on idle systems , It may be ok
if (!force&& writePtr == LogstreamResult.Write&& flushPtr == LogstreamResult.Flush&& !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
sendTime = now;
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
walrcv_send(wrconn, reply_message.data, reply_message.len);
}
XLogWalRcvSendHSFeedback Function to send a hot standby feedback message to the primary node (hot standby feedback message), Add the current time . If the user disables feedback , Then send the last message , tell wal sender Forget... On this standby server xmin. We also send this message when we first connect , Because the previous connection may have been set on the replication slot xmin. ( If we don't use slots , Then send a feedback message to explicitly set InvalidTransactionId It's harmless ).
static void XLogWalRcvSendHSFeedback(bool immed) {
TimestampTz now;
FullTransactionId nextFullXid;
TransactionId nextXid, xmin, catalog_xmin;
uint32 xmin_epoch, catalog_xmin_epoch;
static TimestampTz sendTime = 0;
static bool primary_has_standby_xmin = true; /* initially true so we always send at least one feedback message */
/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */ // If the user does not want to report the status to the master node , Be sure to exit before doing anything
if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && !primary_has_standby_xmin)
return;
now = GetCurrentTimestamp(); /* Get current timestamp. */
if (!immed) {
// Do not send immediately , Delay to wal_receiver_status_interval Send after time interval
/* Send feedback at most once per wal_receiver_status_interval. */ // every other wal_receiver_status_interval The time interval , Send at least once
if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
sendTime = now;
}
/* If Hot Standby is not yet accepting connections there is nothing to send. Check this after the interval has expired to reduce number of calls. * Bailing out here also ensures that we don't send feedback until we've read our own replication slot state, so we don't tell the primary to discard needed xmin or catalog_xmin from any slots that may exist on this replica. */
// If Hot Standby Connection has not been accepted , Then there is nothing to send . Check this item after the interval expires to reduce the number of calls . The bail-out here also ensures that we do not send feedback before reading the status of our own replication slots , Therefore, we will not tell the master node to discard the required... From any slots that may exist on the replica xmin or catalog_xmin.
if (!HotStandbyActive()) return;
/* Make the expensive call to get the oldest xmin once we are certain everything else has been checked. Once we make sure everything else has been checked , Get the oldest xmin. */
if (hot_standby_feedback){
// need hot standby feedback
GetReplicationHorizons(&xmin, &catalog_xmin);
}else{
// Otherwise set xmin and catalog_xmin by InvalidTransactionId
xmin = InvalidTransactionId;
catalog_xmin = InvalidTransactionId;
}
/* Get epoch and adjust if nextXid and oldestXmin are different sides of the epoch boundary. If nextXid and oldXmin yes epoch Different sides of the boundary , obtain epoch And adjust */
nextFullXid = ReadNextFullTransactionId();
nextXid = XidFromFullTransactionId(nextFullXid);
xmin_epoch = EpochFromFullTransactionId(nextFullXid);
catalog_xmin_epoch = xmin_epoch;
if (nextXid < xmin) xmin_epoch--;
if (nextXid < catalog_xmin) catalog_xmin_epoch--;
resetStringInfo(&reply_message); /* Construct the message and send it. */
pq_sendbyte(&reply_message, 'h');
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
pq_sendint32(&reply_message, catalog_xmin);
pq_sendint32(&reply_message, catalog_xmin_epoch);
walrcv_send(wrconn, reply_message.data, reply_message.len); // send out
if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
primary_has_standby_xmin = true;
else
primary_has_standby_xmin = false; // Unwanted hot standby feedback, No feedback next time
}
If you are processing heartbeat messages ,primary request mirror echo reply (replyRequested by true), be walreciver Process call XLogWalRcvSendReply(true, false) Send a response . towards primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .
If the inner loop walrcv_receive Read data length is less than or equal to zero , You need to exit the loop , And call XLogWalRcvSendReply(false, false) function ( Give Way primary Ku knows walreceiver Received some data ) and XLogWalRcvFlush(false) function ( Will write XLOG To brush plate , And notify startup Progress of process and main database ).
about XLogWalRcvSendReply(false, false) function , If set, we don't want walreceiver Report the main library (wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send ), The function directly returns , No direction primary Send a response ; If set wal_receiver_status_interval, If no more XLOG Record (LogstreamResult.Write and Flush Equal to the last time we entered this function Write and Flush), And the time to enter the function twice is less than wal_receiver_status_interval, The function directly returns , No direction primary Send a response . In other cases primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply . about XLogWalRcvFlush(false) function , It will also call XLogWalRcvSendReply(false, false) function , Follow the above process . Call again XLogWalRcvSendHSFeedback(false) send out hotstandby feedback news .
static void XLogWalRcvFlush(bool dying) {
if (LogstreamResult.Flush < LogstreamResult.Write) {
WalRcvData *walrcv = WalRcv;
issue_xlog_fsync(recvFile, recvSegNo);
LogstreamResult.Flush = LogstreamResult.Write;
SpinLockAcquire(&walrcv->mutex); /* Update shared-memory status */
if (walrcv->flushedUpto < LogstreamResult.Flush) {
walrcv->latestChunkStart = walrcv->flushedUpto; // Last flush to disk xlog The logical location of
walrcv->flushedUpto = LogstreamResult.Flush; // Update current brush to disk xlog The logical location of
walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery(); // Wake up the startup process
if (AllowCascadeReplication()) WalSndWakeup(); // In case of cascading replication , Wake up the wal sender process
...
if (!dying) {
/* Also let the primary know that we made some progress */
XLogWalRcvSendReply(false, false);
XLogWalRcvSendHSFeedback(false);
}
}
}
WaitLatchOrSocket The wait has timed out , That means we haven't received anything new . If we are surpassing wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server . Besides , If it's better than... Since we last sent the update wal_receiver_status_interval Long , Send status updates to the master node anyway , To report application WAL Any progress .
bool requestReply = false;
if (wal_receiver_timeout > 0) {
/* Check if time since last receive from primary has reached the configured limit. */
TimestampTz now = GetCurrentTimestamp(); // last_recv_timestamp Update when data is received
TimestampTz timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, wal_receiver_timeout);
if (now >= timeout) // exceed wal_receiver_timeout No data received
ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout")));
/* We didn't receive anything new, for half of receiver replication timeout. Ping the server. */
if (!ping_sent){
// Set after receiving data ping_sent by false, If set ping_sent Description has gone through the following code
// If we are surpassing wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, (wal_receiver_timeout / 2));
if (now >= timeout) {
requestReply = true;
ping_sent = true;
}
}
}
// Carry out the contracting work
XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback(false);
XLogWalRcvSendReply(true, true) The function is over wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server , And request the main database to reply to the response . towards primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response needs primary reply .XLogWalRcvSendReply(false, false) If the function is set, we don't want walreceiver Report the main library (wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send ), The function directly returns , No direction primary Send a response ; If set wal_receiver_status_interval, If no more XLOG Record (LogstreamResult.Write and Flush Equal to the last time we entered this function Write and Flush), And the time to enter the function twice is less than wal_receiver_status_interval, The function directly returns , No direction primary Send a response . In other cases primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .
WalReceiver process MyLatch By startup The process is set to , Check startup Whether the process requests to force WalReceiver Send a response to the main library .startup The process asked us to send application feedback now . Make sure that before sending a reply , In shared memory fore_reply The flag is indeed set to false, So we don't miss new reply requests .XLogWalRcvSendReply(true, false) Functional direction primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .
if (rc & WL_LATCH_SET){
ResetLatch(MyLatch);
ProcessWalRcvInterrupts();
if (walrcv->force_reply){
/* The recovery process has asked us to send apply feedback now. Make sure the flag is really set to false in shared memory before sending the reply, so we don't miss a new request for a reply. */
walrcv->force_reply = false;
pg_memory_barrier();
XLogWalRcvSendReply(true, false);
}
}
WalRcvForceReply Function settings WalRcv->force_reply by true.(WaitForWALToBecomeAvailable/StartupXLOG Function will call WalRcvForceReply function ).
void WalRcvForceReply(void) {
Latch *latch;
WalRcv->force_reply = true;
/* fetching the latch pointer might not be atomic, so use spinlock */
SpinLockAcquire(&WalRcv->mutex);
latch = WalRcv->latch;
SpinLockRelease(&WalRcv->mutex);
if (latch) SetLatch(latch);
}
startup Process settings receiveStart and receiveStartTLI, requirement WalReceiver Stream replication
walReceiver process WalRcvWaitForStartPosition Function to wait for startup Process settings receiveStart and receiveStartTLI
startup Process requirements WalReceiver Send application feedback now
walReceiver Process call XLogWalRcvFlush Function already flush 了 XLOG Logical position wake up startup process
walReceiver Process signal processing function WalRcvDie walRcvStoppedCV
walReceiver process WalReceiverMain Handle walRcvState state walRcvStoppedCV
startup Process call ShutdownWalRcv function ConditionVariableBroadcast walReceiver process walRcvStoppedCV
WalRcvStreaming WalRcvRunning
边栏推荐
- AI helps release legal potential energy! Release of iterms contract intelligent review system
- Do you know the difference between mallbook ledger and bank ledger?
- Unity 地图映射
- Sharing of precautions for the construction of dioxin laboratory in Meizhou
- Guanghetong officially released the sc126 series of intelligent modules to promote more intelligent connection
- USB转232 转TTL概述
- 给你一个项目,你将如何开展性能测试工作?
- 特征工程 特征降维
- Matter protocol
- Unity creates rivers on uneven terrain
猜你喜欢

JVM(4):类的主动使用与被动使用、运行时数据区域内部结构、JVM线程说明、PC寄存器

从初代播种到落地生花,5G商用三周年“催生万物”

无刷电机调试经验与可靠性设计

数据中台和数据仓库有什么异同?

Unity advanced backpack system

详解 | 晶振的构造及工作原理

Guanghetong launched a new generation of 3GPP R16 industrial 5g module fg160 engineering sample

USB转232 转TTL概述

Feature selection algorithm based on bare bones particleswarm optimization

JVM(5):虚拟机栈、栈异常、栈的存储结果和运行原理、栈内部结构、局部变量表
随机推荐
Guanghetong 5g module fg650-cn and fm650-cn series are produced in full scale, accelerating the efficient implementation of 5g UWB applications
新UI学法减分专业版34235道题库学法减分专业版小程序源码
【服务器数据恢复】同友存储raid5崩溃的数据恢复案例
Record an ES accident
谷歌的代码覆盖率最佳实践
Guanghetong "carbon" seeking green sharing 5g/lte module solution for two rounds of travel
Unity 高级背包系统
How to check whether domain name resolution is effective?
L'avenir est venu, l'ère 5G - Advanced s'ouvre
PHP话费充值通道网站完整运营源码/全解密无授权/对接免签约支付接口
SQL optimization
[laser principle and application-2]: key domestic laser brands
MindManager22专业版思维导图工具
福州口罩洁净厂房建设知识概述
Personalized use of QT log module
决策树(Hunt、ID3、C4.5、CART)
Given a project, how will you conduct performance testing?
司马炎爷爷 告诉你什么叫做内卷!
hiredis 判断主节点
MySQL锁总结