当前位置:网站首页>PostgreSQL database replication - background first-class citizen process walreceiver receiving and sending logic
PostgreSQL database replication - background first-class citizen process walreceiver receiving and sending logic
2022-06-11 04:31:00 【Tertium ferrugosum】

Receiving logic
Let's say the background first-class citizen process WalReceiver The receiving logic should first look at the for Dead cycle , It has errors in the circulation process or end-of-streaming situation ( Stream replication is terminated by the master library ) You're going to quit . The execution process is as follows :
- First try walrcv_receive, If the data read len When it equals zero , Need to call WaitLatchOrSocket Blocking monitor
WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE | WL_TIMEOUT | WL_LATCH_SETevent , That is, we want to wait for the network socket to read or latch Overtime / Set up . - If you try to read out the data len It's not zero , That is, there are data (len > 0) perhaps end-of-streaming(len < 0). Enter the read cycle ( That's what this is for Process the received data ), If data is read , Call XLogWalRcvProcessMsg Function to process . Call again walrcv_receive Function to get the data length , If there is still data reading , Continued to XLogWalRcvProcessMsg Function to process ; Otherwise, in the len <0 Under the circumstances , shows end-of-streaming situation ( Stream replication is terminated by the master library ), Execute end stream copy , Exit stream replication copy Pattern (walrcv_endstreaming); If len Is equal to zero , This indicates that the data cannot be read from the network socket , Need to go to WaitLatchOrSocket Blocking listening Events , Wait for the network to become available .
If data can be received , You need to enter XLogWalRcvProcessMsg Function time , Description received data from the master database , Set up last_recv_timestamp For the current time , That is, the latest receiving time of the updated data , And set up ping_sent by false, That is to say walreciver There is no need to report to the main database walsender Send Reply.
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len) {
int hdrlen;
XLogRecPtr dataStart, walEnd;
TimestampTz sendTime;
bool replyRequested;
resetStringInfo(&incoming_message);
switch (type) {
case 'w':{
/* WAL records */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64); /* copy message to StringInfo */
if (len < hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION),errmsg_internal("invalid WAL message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen); // take buf The data in is populated with incoming_message Of StringInfo
dataStart = pq_getmsgint64(&incoming_message); /* read the fields */ // The data contains xlog Logical start position
walEnd = pq_getmsgint64(&incoming_message); // The data contains xlog Logical end position
sendTime = pq_getmsgint64(&incoming_message); // Send time
ProcessWalSndrMessage(walEnd, sendTime);
buf += hdrlen; len -= hdrlen;
XLogWalRcvWrite(buf, len, dataStart);
break;
}
case 'k': {
/* Keepalive */
hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char); /* copy message to StringInfo */
if (len != hdrlen) ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid keepalive message received from primary")));
appendBinaryStringInfo(&incoming_message, buf, hdrlen);
/* read the fields */
walEnd = pq_getmsgint64(&incoming_message); // The last data contains xlog Logical end position
sendTime = pq_getmsgint64(&incoming_message); // Send time
replyRequested = pq_getmsgbyte(&incoming_message); // primary Whether to request mirror echo reply
ProcessWalSndrMessage(walEnd, sendTime);
/* If the primary requested a reply, send one immediately */
if (replyRequested) XLogWalRcvSendReply(true, false);
break;
}
default:
ereport(ERROR,(errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg_internal("invalid replication message type %d", type)));
}
}
ProcessWalSndrMessage The function updates the shared memory status after receiving a message from the master node . ‘walEnd’ and ‘sendTime’ Is the end of the latest news WAL And time stamp .
static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime) {
WalRcvData *walrcv = WalRcv;
TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
/* Update shared-memory status */
SpinLockAcquire(&walrcv->mutex);
if (walrcv->latestWalEnd < walEnd) walrcv->latestWalEndTime = sendTime; // If the message sent is xlog, The most recent received xlog Time for latestWalEndTime You need to update
walrcv->latestWalEnd = walEnd; // Set the... Contained in the data xlog Logical end position
walrcv->lastMsgSendTime = sendTime;
walrcv->lastMsgReceiptTime = lastMsgReceiptTime; // The time when the current data is received and processed
SpinLockRelease(&walrcv->mutex);
}
Send logic
XLogWalRcvSendReply Function to send a reply message to the master node , Indicates our current WAL Location 、 One of the earliest xmin And the current time .
- If not set “force”, Only after enough time has elapsed since the last status update to achieve wal_receiver_status_interval Before sending a message .
- If not set “force”, Do not meet the above conditions , Also to primary Send a message , Give Way primary Until we receive the data
- If wal_receiver_status_interval Completely disabled and ‘force’ For false , This is a useless operation .
- If force It's true , And ‘requestReply’ It's true , Ask for primary The server replies immediately after receiving this message . When approaching wal_receiver_timeout when , This is for the heartbeat .
- If force It's true , ‘requestReply’ For false , Unwanted primary Server reply , This is startup The process asked us to send apply feedback When using .
static void XLogWalRcvSendReply(bool force, bool requestReply) {
static XLogRecPtr writePtr = 0;
static XLogRecPtr flushPtr = 0;
XLogRecPtr applyPtr;
static TimestampTz sendTime = 0;
TimestampTz now;
/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */
// wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send
if (!force && wal_receiver_status_interval <= 0) return;
now = GetCurrentTimestamp(); /* Get current timestamp. */
/* We can compare the write and flush positions to the last message we sent without taking any lock, but the apply position requires a spin lock, so we don't check that unless something else has changed or 10 seconds have passed. This means that the apply WAL location will appear, from the primary's point of view, to lag slightly, but since this is only for reporting purposes and only on idle systems, that's probably OK. */ // We can compare the write and refresh locations with the last message we sent without any locking , But the application location requires a spin lock , So we won't check it , Unless something else changes or 10 Seconds passed . It means , From the perspective of the master node , application WAL The position will lag slightly , But since this is for reporting purposes only and only on idle systems , It may be ok
if (!force&& writePtr == LogstreamResult.Write&& flushPtr == LogstreamResult.Flush&& !TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
sendTime = now;
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
applyPtr = GetXLogReplayRecPtr(NULL);
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
pq_sendint64(&reply_message, writePtr);
pq_sendint64(&reply_message, flushPtr);
pq_sendint64(&reply_message, applyPtr);
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendbyte(&reply_message, requestReply ? 1 : 0);
walrcv_send(wrconn, reply_message.data, reply_message.len);
}
XLogWalRcvSendHSFeedback Function to send a hot standby feedback message to the primary node (hot standby feedback message), Add the current time . If the user disables feedback , Then send the last message , tell wal sender Forget... On this standby server xmin. We also send this message when we first connect , Because the previous connection may have been set on the replication slot xmin. ( If we don't use slots , Then send a feedback message to explicitly set InvalidTransactionId It's harmless ).
static void XLogWalRcvSendHSFeedback(bool immed) {
TimestampTz now;
FullTransactionId nextFullXid;
TransactionId nextXid, xmin, catalog_xmin;
uint32 xmin_epoch, catalog_xmin_epoch;
static TimestampTz sendTime = 0;
static bool primary_has_standby_xmin = true; /* initially true so we always send at least one feedback message */
/* If the user doesn't want status to be reported to the primary, be sure to exit before doing anything at all. */ // If the user does not want to report the status to the master node , Be sure to exit before doing anything
if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) && !primary_has_standby_xmin)
return;
now = GetCurrentTimestamp(); /* Get current timestamp. */
if (!immed) {
// Do not send immediately , Delay to wal_receiver_status_interval Send after time interval
/* Send feedback at most once per wal_receiver_status_interval. */ // every other wal_receiver_status_interval The time interval , Send at least once
if (!TimestampDifferenceExceeds(sendTime, now, wal_receiver_status_interval * 1000)) return;
sendTime = now;
}
/* If Hot Standby is not yet accepting connections there is nothing to send. Check this after the interval has expired to reduce number of calls. * Bailing out here also ensures that we don't send feedback until we've read our own replication slot state, so we don't tell the primary to discard needed xmin or catalog_xmin from any slots that may exist on this replica. */
// If Hot Standby Connection has not been accepted , Then there is nothing to send . Check this item after the interval expires to reduce the number of calls . The bail-out here also ensures that we do not send feedback before reading the status of our own replication slots , Therefore, we will not tell the master node to discard the required... From any slots that may exist on the replica xmin or catalog_xmin.
if (!HotStandbyActive()) return;
/* Make the expensive call to get the oldest xmin once we are certain everything else has been checked. Once we make sure everything else has been checked , Get the oldest xmin. */
if (hot_standby_feedback){
// need hot standby feedback
GetReplicationHorizons(&xmin, &catalog_xmin);
}else{
// Otherwise set xmin and catalog_xmin by InvalidTransactionId
xmin = InvalidTransactionId;
catalog_xmin = InvalidTransactionId;
}
/* Get epoch and adjust if nextXid and oldestXmin are different sides of the epoch boundary. If nextXid and oldXmin yes epoch Different sides of the boundary , obtain epoch And adjust */
nextFullXid = ReadNextFullTransactionId();
nextXid = XidFromFullTransactionId(nextFullXid);
xmin_epoch = EpochFromFullTransactionId(nextFullXid);
catalog_xmin_epoch = xmin_epoch;
if (nextXid < xmin) xmin_epoch--;
if (nextXid < catalog_xmin) catalog_xmin_epoch--;
resetStringInfo(&reply_message); /* Construct the message and send it. */
pq_sendbyte(&reply_message, 'h');
pq_sendint64(&reply_message, GetCurrentTimestamp());
pq_sendint32(&reply_message, xmin);
pq_sendint32(&reply_message, xmin_epoch);
pq_sendint32(&reply_message, catalog_xmin);
pq_sendint32(&reply_message, catalog_xmin_epoch);
walrcv_send(wrconn, reply_message.data, reply_message.len); // send out
if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
primary_has_standby_xmin = true;
else
primary_has_standby_xmin = false; // Unwanted hot standby feedback, No feedback next time
}
If you are processing heartbeat messages ,primary request mirror echo reply (replyRequested by true), be walreciver Process call XLogWalRcvSendReply(true, false) Send a response . towards primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .
If the inner loop walrcv_receive Read data length is less than or equal to zero , You need to exit the loop , And call XLogWalRcvSendReply(false, false) function ( Give Way primary Ku knows walreceiver Received some data ) and XLogWalRcvFlush(false) function ( Will write XLOG To brush plate , And notify startup Progress of process and main database ).
about XLogWalRcvSendReply(false, false) function , If set, we don't want walreceiver Report the main library (wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send ), The function directly returns , No direction primary Send a response ; If set wal_receiver_status_interval, If no more XLOG Record (LogstreamResult.Write and Flush Equal to the last time we entered this function Write and Flush), And the time to enter the function twice is less than wal_receiver_status_interval, The function directly returns , No direction primary Send a response . In other cases primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply . about XLogWalRcvFlush(false) function , It will also call XLogWalRcvSendReply(false, false) function , Follow the above process . Call again XLogWalRcvSendHSFeedback(false) send out hotstandby feedback news .
static void XLogWalRcvFlush(bool dying) {
if (LogstreamResult.Flush < LogstreamResult.Write) {
WalRcvData *walrcv = WalRcv;
issue_xlog_fsync(recvFile, recvSegNo);
LogstreamResult.Flush = LogstreamResult.Write;
SpinLockAcquire(&walrcv->mutex); /* Update shared-memory status */
if (walrcv->flushedUpto < LogstreamResult.Flush) {
walrcv->latestChunkStart = walrcv->flushedUpto; // Last flush to disk xlog The logical location of
walrcv->flushedUpto = LogstreamResult.Flush; // Update current brush to disk xlog The logical location of
walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
/* Signal the startup process and walsender that new WAL has arrived */
WakeupRecovery(); // Wake up the startup process
if (AllowCascadeReplication()) WalSndWakeup(); // In case of cascading replication , Wake up the wal sender process
...
if (!dying) {
/* Also let the primary know that we made some progress */
XLogWalRcvSendReply(false, false);
XLogWalRcvSendHSFeedback(false);
}
}
}
WaitLatchOrSocket The wait has timed out , That means we haven't received anything new . If we are surpassing wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server . Besides , If it's better than... Since we last sent the update wal_receiver_status_interval Long , Send status updates to the master node anyway , To report application WAL Any progress .
bool requestReply = false;
if (wal_receiver_timeout > 0) {
/* Check if time since last receive from primary has reached the configured limit. */
TimestampTz now = GetCurrentTimestamp(); // last_recv_timestamp Update when data is received
TimestampTz timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, wal_receiver_timeout);
if (now >= timeout) // exceed wal_receiver_timeout No data received
ereport(ERROR,(errcode(ERRCODE_CONNECTION_FAILURE), errmsg("terminating walreceiver due to timeout")));
/* We didn't receive anything new, for half of receiver replication timeout. Ping the server. */
if (!ping_sent){
// Set after receiving data ping_sent by false, If set ping_sent Description has gone through the following code
// If we are surpassing wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server
timeout = TimestampTzPlusMilliseconds(last_recv_timestamp, (wal_receiver_timeout / 2));
if (now >= timeout) {
requestReply = true;
ping_sent = true;
}
}
}
// Carry out the contracting work
XLogWalRcvSendReply(requestReply, requestReply);
XLogWalRcvSendHSFeedback(false);
XLogWalRcvSendReply(true, true) The function is over wal_receiver_timeout / 2 No messages were received from the server within the time period of , Send a packet to ping The server , And request the main database to reply to the response . towards primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response needs primary reply .XLogWalRcvSendReply(false, false) If the function is set, we don't want walreceiver Report the main library (wal_receiver_status_interval: The minimum frequency at which the standby sends related replication information to the host , Less than or equal to zero means no need to send ), The function directly returns , No direction primary Send a response ; If set wal_receiver_status_interval, If no more XLOG Record (LogstreamResult.Write and Flush Equal to the last time we entered this function Write and Flush), And the time to enter the function twice is less than wal_receiver_status_interval, The function directly returns , No direction primary Send a response . In other cases primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .
WalReceiver process MyLatch By startup The process is set to , Check startup Whether the process requests to force WalReceiver Send a response to the main library .startup The process asked us to send application feedback now . Make sure that before sending a reply , In shared memory fore_reply The flag is indeed set to false, So we don't miss new reply requests .XLogWalRcvSendReply(true, false) Functional direction primary send out walReceiver The process has write and flush Of XLOG Logical location and startup The process has apply Of XLOG Logical location and current mirror Time for , And set that the response does not need primary reply .
if (rc & WL_LATCH_SET){
ResetLatch(MyLatch);
ProcessWalRcvInterrupts();
if (walrcv->force_reply){
/* The recovery process has asked us to send apply feedback now. Make sure the flag is really set to false in shared memory before sending the reply, so we don't miss a new request for a reply. */
walrcv->force_reply = false;
pg_memory_barrier();
XLogWalRcvSendReply(true, false);
}
}
WalRcvForceReply Function settings WalRcv->force_reply by true.(WaitForWALToBecomeAvailable/StartupXLOG Function will call WalRcvForceReply function ).
void WalRcvForceReply(void) {
Latch *latch;
WalRcv->force_reply = true;
/* fetching the latch pointer might not be atomic, so use spinlock */
SpinLockAcquire(&WalRcv->mutex);
latch = WalRcv->latch;
SpinLockRelease(&WalRcv->mutex);
if (latch) SetLatch(latch);
}
startup Process settings receiveStart and receiveStartTLI, requirement WalReceiver Stream replication
walReceiver process WalRcvWaitForStartPosition Function to wait for startup Process settings receiveStart and receiveStartTLI
startup Process requirements WalReceiver Send application feedback now
walReceiver Process call XLogWalRcvFlush Function already flush 了 XLOG Logical position wake up startup process
walReceiver Process signal processing function WalRcvDie walRcvStoppedCV
walReceiver process WalReceiverMain Handle walRcvState state walRcvStoppedCV
startup Process call ShutdownWalRcv function ConditionVariableBroadcast walReceiver process walRcvStoppedCV
WalRcvStreaming WalRcvRunning
边栏推荐
- mysql存储过程
- JVM(5):虚拟机栈、栈异常、栈的存储结果和运行原理、栈内部结构、局部变量表
- Unity map mapping
- 谷歌的代码覆盖率最佳实践
- Exness: liquidity series - order block, imbalance (II)
- 司马炎爷爷 告诉你什么叫做内卷!
- Talk about data center network again
- Given a project, how will you conduct performance testing?
- Do you know the difference between mallbook ledger and bank ledger?
- Unity MonoSingleton
猜你喜欢

Game Mathematics: calculate the points on the plane in the screen points (God's perspective)

Guanghetong won the "science and Technology Collaboration Award" of Hello travel, driving two rounds of green industries to embrace digital intelligence transformation

图的最短路径问题 详细分解版

Data type conversion and conditional control statements

meedu知识付费解决方案 v4.5.4源码

Unity music playback Manager

Unity 高級背包系統

Do you know the difference between mallbook ledger and bank ledger?

JVM(2):内存结构、类的加载过程

JVM(4):类的主动使用与被动使用、运行时数据区域内部结构、JVM线程说明、PC寄存器
随机推荐
Fundamentals of embedded audio processing
JVM(2):内存结构、类的加载过程
Unity 编辑器扩展 保存位置
超简单 CameraX 人脸识别效果封装
Record an ES accident
JVM(1):介绍、结构、运行和生命周期
传说使用Shader的ID来设置Shader属性能提高效率:)
The third small class discussion on the fundamentals of information and communication
What is the time-consuming domain name resolution? What are the influencing factors of domain name resolution time?
Overview of construction knowledge of Fuzhou mask clean workshop
Unity 伤害值的显示
福州化工实验室建设注意隐患分析
Guanghetong officially released the sc126 series of intelligent modules to promote more intelligent connection
图的最短路径问题 详细分解版
MySQL stored procedure
Esp32 development -lvgl display picture
2021 5g aiot annual innovation achievements! release!
一款自适应的聊天网站-匿名在线聊天室PHP源码
Unity creates rivers on uneven terrain
MySQL锁总结