Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.c
4 : *
5 : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : * is the process in the standby server that takes charge of receiving
7 : * XLOG records from a primary server during streaming replication.
8 : *
9 : * When the startup process determines that it's time to start streaming,
10 : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : * to the primary server (it will be served by a walsender process
12 : * in the primary server), and then keeps receiving XLOG records and
13 : * writing them to the disk as long as the connection is alive. As XLOG
14 : * records are received and flushed to disk, it updates the
15 : * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 : * process of how far it can proceed with XLOG replay.
17 : *
18 : * A WAL receiver cannot directly load GUC parameters used when establishing
19 : * its connection to the primary. Instead it relies on parameter values
20 : * that are passed down by the startup process when streaming is requested.
21 : * This applies, for example, to the replication slot and the connection
22 : * string to be used for the connection with the primary.
23 : *
24 : * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 : * goes into "waiting" mode, and waits for the startup process to give new
26 : * instructions. The startup process will treat that the same as
27 : * disconnection, and will rescan the archive/pg_wal directory. But when the
28 : * startup process wants to try streaming replication again, it will just
29 : * nudge the existing walreceiver process that's waiting, instead of launching
30 : * a new one.
31 : *
32 : * Normal termination is by SIGTERM, which instructs the walreceiver to
33 : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 : * of the connection and a FATAL error are treated not as a crash but as
36 : * normal operation.
37 : *
38 : * This file contains the server-facing parts of walreceiver. The libpq-
39 : * specific parts are in the libpqwalreceiver module. It's loaded
40 : * dynamically to avoid linking the server with libpq.
41 : *
42 : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
43 : *
44 : *
45 : * IDENTIFICATION
46 : * src/backend/replication/walreceiver.c
47 : *
48 : *-------------------------------------------------------------------------
49 : */
50 : #include "postgres.h"
51 :
52 : #include <unistd.h>
53 :
54 : #include "access/htup_details.h"
55 : #include "access/timeline.h"
56 : #include "access/transam.h"
57 : #include "access/xlog_internal.h"
58 : #include "access/xlogarchive.h"
59 : #include "access/xlogrecovery.h"
60 : #include "catalog/pg_authid.h"
61 : #include "funcapi.h"
62 : #include "libpq/pqformat.h"
63 : #include "libpq/pqsignal.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "postmaster/auxprocess.h"
67 : #include "postmaster/interrupt.h"
68 : #include "replication/walreceiver.h"
69 : #include "replication/walsender.h"
70 : #include "storage/ipc.h"
71 : #include "storage/proc.h"
72 : #include "storage/procarray.h"
73 : #include "storage/procsignal.h"
74 : #include "utils/acl.h"
75 : #include "utils/builtins.h"
76 : #include "utils/guc.h"
77 : #include "utils/pg_lsn.h"
78 : #include "utils/ps_status.h"
79 : #include "utils/timestamp.h"
80 :
81 :
82 : /*
83 : * GUC variables. (Other variables that affect walreceiver are in xlog.c
84 : * because they're passed down from the startup process, for better
85 : * synchronization.)
86 : */
87 : int wal_receiver_status_interval;
88 : int wal_receiver_timeout;
89 : bool hot_standby_feedback;
90 :
91 : /* libpqwalreceiver connection */
92 : static WalReceiverConn *wrconn = NULL;
93 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
94 :
95 : /*
96 : * These variables are used similarly to openLogFile/SegNo,
97 : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
98 : * corresponding the filename of recvFile.
99 : */
100 : static int recvFile = -1;
101 : static TimeLineID recvFileTLI = 0;
102 : static XLogSegNo recvSegNo = 0;
103 :
104 : /*
105 : * LogstreamResult indicates the byte positions that we have already
106 : * written/fsynced.
107 : */
108 : static struct
109 : {
110 : XLogRecPtr Write; /* last byte + 1 written out in the standby */
111 : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
112 : } LogstreamResult;
113 :
114 : /*
115 : * Reasons to wake up and perform periodic tasks.
116 : */
117 : typedef enum WalRcvWakeupReason
118 : {
119 : WALRCV_WAKEUP_TERMINATE,
120 : WALRCV_WAKEUP_PING,
121 : WALRCV_WAKEUP_REPLY,
122 : WALRCV_WAKEUP_HSFEEDBACK,
123 : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
124 : } WalRcvWakeupReason;
125 :
126 : /*
127 : * Wake up times for periodic tasks.
128 : */
129 : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
130 :
131 : static StringInfoData reply_message;
132 :
133 : /* Prototypes for private functions */
134 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
135 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
136 : static void WalRcvDie(int code, Datum arg);
137 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
138 : TimeLineID tli);
139 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
140 : TimeLineID tli);
141 : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
142 : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
143 : static void XLogWalRcvSendReply(bool force, bool requestReply);
144 : static void XLogWalRcvSendHSFeedback(bool immed);
145 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
146 : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
147 :
148 : /*
149 : * Process any interrupts the walreceiver process may have received.
150 : * This should be called any time the process's latch has become set.
151 : *
152 : * Currently, only SIGTERM is of interest. We can't just exit(1) within the
153 : * SIGTERM signal handler, because the signal might arrive in the middle of
154 : * some critical operation, like while we're holding a spinlock. Instead, the
155 : * signal handler sets a flag variable as well as setting the process's latch.
156 : * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
157 : * latch has become set. Operations that could block for a long time, such as
158 : * reading from a remote server, must pay attention to the latch too; see
159 : * libpqrcv_PQgetResult for example.
160 : */
161 : void
162 28320 : ProcessWalRcvInterrupts(void)
163 : {
164 : /*
165 : * Although walreceiver interrupt handling doesn't use the same scheme as
166 : * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
167 : * any incoming signals on Win32, and also to make sure we process any
168 : * barrier events.
169 : */
170 28320 : CHECK_FOR_INTERRUPTS();
171 :
172 28318 : if (ShutdownRequestPending)
173 : {
174 152 : ereport(FATAL,
175 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
176 : errmsg("terminating walreceiver process due to administrator command")));
177 : }
178 28166 : }
179 :
180 :
181 : /* Main entry point for walreceiver process */
182 : void
183 404 : WalReceiverMain(char *startup_data, size_t startup_data_len)
184 : {
185 : char conninfo[MAXCONNINFO];
186 : char *tmp_conninfo;
187 : char slotname[NAMEDATALEN];
188 : bool is_temp_slot;
189 : XLogRecPtr startpoint;
190 : TimeLineID startpointTLI;
191 : TimeLineID primaryTLI;
192 : bool first_stream;
193 : WalRcvData *walrcv;
194 : TimestampTz now;
195 : char *err;
196 404 : char *sender_host = NULL;
197 404 : int sender_port = 0;
198 : char *appname;
199 :
200 : Assert(startup_data_len == 0);
201 :
202 404 : MyBackendType = B_WAL_RECEIVER;
203 404 : AuxiliaryProcessMainCommon();
204 :
205 : /*
206 : * WalRcv should be set up already (if we are a backend, we inherit this
207 : * by fork() or EXEC_BACKEND mechanism from the postmaster).
208 : */
209 404 : walrcv = WalRcv;
210 : Assert(walrcv != NULL);
211 :
212 : /*
213 : * Mark walreceiver as running in shared memory.
214 : *
215 : * Do this as early as possible, so that if we fail later on, we'll set
216 : * state to STOPPED. If we die before this, the startup process will keep
217 : * waiting for us to start up, until it times out.
218 : */
219 404 : SpinLockAcquire(&walrcv->mutex);
220 : Assert(walrcv->pid == 0);
221 404 : switch (walrcv->walRcvState)
222 : {
223 0 : case WALRCV_STOPPING:
224 : /* If we've already been requested to stop, don't start up. */
225 0 : walrcv->walRcvState = WALRCV_STOPPED;
226 : /* fall through */
227 :
228 14 : case WALRCV_STOPPED:
229 14 : SpinLockRelease(&walrcv->mutex);
230 14 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
231 14 : proc_exit(1);
232 : break;
233 :
234 390 : case WALRCV_STARTING:
235 : /* The usual case */
236 390 : break;
237 :
238 0 : case WALRCV_WAITING:
239 : case WALRCV_STREAMING:
240 : case WALRCV_RESTARTING:
241 : default:
242 : /* Shouldn't happen */
243 0 : SpinLockRelease(&walrcv->mutex);
244 0 : elog(PANIC, "walreceiver still running according to shared memory state");
245 : }
246 : /* Advertise our PID so that the startup process can kill us */
247 390 : walrcv->pid = MyProcPid;
248 390 : walrcv->walRcvState = WALRCV_STREAMING;
249 :
250 : /* Fetch information required to start streaming */
251 390 : walrcv->ready_to_display = false;
252 390 : strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
253 390 : strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
254 390 : is_temp_slot = walrcv->is_temp_slot;
255 390 : startpoint = walrcv->receiveStart;
256 390 : startpointTLI = walrcv->receiveStartTLI;
257 :
258 : /*
259 : * At most one of is_temp_slot and slotname can be set; otherwise,
260 : * RequestXLogStreaming messed up.
261 : */
262 : Assert(!is_temp_slot || (slotname[0] == '\0'));
263 :
264 : /* Initialise to a sanish value */
265 390 : now = GetCurrentTimestamp();
266 390 : walrcv->lastMsgSendTime =
267 390 : walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
268 :
269 : /* Report our proc number so that others can wake us up */
270 390 : walrcv->procno = MyProcNumber;
271 :
272 390 : SpinLockRelease(&walrcv->mutex);
273 :
274 390 : pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
275 :
276 : /* Arrange to clean up at walreceiver exit */
277 390 : on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
278 :
279 : /* Properly accept or ignore signals the postmaster might send us */
280 390 : pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
281 : * file */
282 390 : pqsignal(SIGINT, SIG_IGN);
283 390 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
284 : /* SIGQUIT handler was already set up by InitPostmasterChild */
285 390 : pqsignal(SIGALRM, SIG_IGN);
286 390 : pqsignal(SIGPIPE, SIG_IGN);
287 390 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
288 390 : pqsignal(SIGUSR2, SIG_IGN);
289 :
290 : /* Reset some signals that are accepted by postmaster but not here */
291 390 : pqsignal(SIGCHLD, SIG_DFL);
292 :
293 : /* Load the libpq-specific functions */
294 390 : load_file("libpqwalreceiver", false);
295 390 : if (WalReceiverFunctions == NULL)
296 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
297 :
298 : /* Unblock signals (they were blocked when the postmaster forked us) */
299 390 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
300 :
301 : /* Establish the connection to the primary for XLOG streaming */
302 390 : appname = cluster_name[0] ? cluster_name : "walreceiver";
303 390 : wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
304 388 : if (!wrconn)
305 122 : ereport(ERROR,
306 : (errcode(ERRCODE_CONNECTION_FAILURE),
307 : errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
308 : appname, err)));
309 :
310 : /*
311 : * Save user-visible connection string. This clobbers the original
312 : * conninfo, for security. Also save host and port of the sender server
313 : * this walreceiver is connected to.
314 : */
315 266 : tmp_conninfo = walrcv_get_conninfo(wrconn);
316 266 : walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
317 266 : SpinLockAcquire(&walrcv->mutex);
318 266 : memset(walrcv->conninfo, 0, MAXCONNINFO);
319 266 : if (tmp_conninfo)
320 266 : strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
321 :
322 266 : memset(walrcv->sender_host, 0, NI_MAXHOST);
323 266 : if (sender_host)
324 266 : strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
325 :
326 266 : walrcv->sender_port = sender_port;
327 266 : walrcv->ready_to_display = true;
328 266 : SpinLockRelease(&walrcv->mutex);
329 :
330 266 : if (tmp_conninfo)
331 266 : pfree(tmp_conninfo);
332 :
333 266 : if (sender_host)
334 266 : pfree(sender_host);
335 :
336 266 : first_stream = true;
337 : for (;;)
338 0 : {
339 : char *primary_sysid;
340 : char standby_sysid[32];
341 : WalRcvStreamOptions options;
342 :
343 : /*
344 : * Check that we're connected to a valid server using the
345 : * IDENTIFY_SYSTEM replication command.
346 : */
347 266 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
348 :
349 266 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
350 : GetSystemIdentifier());
351 266 : if (strcmp(primary_sysid, standby_sysid) != 0)
352 : {
353 0 : ereport(ERROR,
354 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
355 : errmsg("database system identifier differs between the primary and standby"),
356 : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
357 : primary_sysid, standby_sysid)));
358 : }
359 :
360 : /*
361 : * Confirm that the current timeline of the primary is the same or
362 : * ahead of ours.
363 : */
364 266 : if (primaryTLI < startpointTLI)
365 0 : ereport(ERROR,
366 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
367 : errmsg("highest timeline %u of the primary is behind recovery timeline %u",
368 : primaryTLI, startpointTLI)));
369 :
370 : /*
371 : * Get any missing history files. We do this always, even when we're
372 : * not interested in that timeline, so that if we're promoted to
373 : * become the primary later on, we don't select the same timeline that
374 : * was already used in the current primary. This isn't bullet-proof -
375 : * you'll need some external software to manage your cluster if you
376 : * need to ensure that a unique timeline id is chosen in every case,
377 : * but let's avoid the confusion of timeline id collisions where we
378 : * can.
379 : */
380 266 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
381 :
382 : /*
383 : * Create temporary replication slot if requested, and update slot
384 : * name in shared memory. (Note the slot name cannot already be set
385 : * in this case.)
386 : */
387 266 : if (is_temp_slot)
388 : {
389 0 : snprintf(slotname, sizeof(slotname),
390 : "pg_walreceiver_%lld",
391 0 : (long long int) walrcv_get_backend_pid(wrconn));
392 :
393 0 : walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
394 :
395 0 : SpinLockAcquire(&walrcv->mutex);
396 0 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
397 0 : SpinLockRelease(&walrcv->mutex);
398 : }
399 :
400 : /*
401 : * Start streaming.
402 : *
403 : * We'll try to start at the requested starting point and timeline,
404 : * even if it's different from the server's latest timeline. In case
405 : * we've already reached the end of the old timeline, the server will
406 : * finish the streaming immediately, and we will go back to await
407 : * orders from the startup process. If recovery_target_timeline is
408 : * 'latest', the startup process will scan pg_wal and find the new
409 : * history file, bump recovery target timeline, and ask us to restart
410 : * on the new timeline.
411 : */
412 266 : options.logical = false;
413 266 : options.startpoint = startpoint;
414 266 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
415 266 : options.proto.physical.startpointTLI = startpointTLI;
416 266 : if (walrcv_startstreaming(wrconn, &options))
417 : {
418 266 : if (first_stream)
419 266 : ereport(LOG,
420 : (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
421 : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
422 : else
423 0 : ereport(LOG,
424 : (errmsg("restarted WAL streaming at %X/%X on timeline %u",
425 : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
426 266 : first_stream = false;
427 :
428 : /* Initialize LogstreamResult and buffers for processing messages */
429 266 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
430 266 : initStringInfo(&reply_message);
431 :
432 : /* Initialize nap wakeup times. */
433 266 : now = GetCurrentTimestamp();
434 1330 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
435 1064 : WalRcvComputeNextWakeup(i, now);
436 :
437 : /* Send initial reply/feedback messages. */
438 266 : XLogWalRcvSendReply(true, false);
439 266 : XLogWalRcvSendHSFeedback(true);
440 :
441 : /* Loop until end-of-streaming or error */
442 : for (;;)
443 18880 : {
444 : char *buf;
445 : int len;
446 19146 : bool endofwal = false;
447 19146 : pgsocket wait_fd = PGINVALID_SOCKET;
448 : int rc;
449 : TimestampTz nextWakeup;
450 : long nap;
451 :
452 : /*
453 : * Exit walreceiver if we're not in recovery. This should not
454 : * happen, but cross-check the status here.
455 : */
456 19146 : if (!RecoveryInProgress())
457 0 : ereport(FATAL,
458 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
459 : errmsg("cannot continue WAL streaming, recovery has already ended")));
460 :
461 : /* Process any requests or signals received recently */
462 19146 : ProcessWalRcvInterrupts();
463 :
464 19146 : if (ConfigReloadPending)
465 : {
466 48 : ConfigReloadPending = false;
467 48 : ProcessConfigFile(PGC_SIGHUP);
468 : /* recompute wakeup times */
469 48 : now = GetCurrentTimestamp();
470 240 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
471 192 : WalRcvComputeNextWakeup(i, now);
472 48 : XLogWalRcvSendHSFeedback(true);
473 : }
474 :
475 : /* See if we can read data immediately */
476 19146 : len = walrcv_receive(wrconn, &buf, &wait_fd);
477 19086 : if (len != 0)
478 : {
479 : /*
480 : * Process the received data, and any subsequent data we
481 : * can read without blocking.
482 : */
483 : for (;;)
484 : {
485 190524 : if (len > 0)
486 : {
487 : /*
488 : * Something was received from primary, so adjust
489 : * the ping and terminate wakeup times.
490 : */
491 177494 : now = GetCurrentTimestamp();
492 177494 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
493 : now);
494 177494 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
495 177494 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
496 : startpointTLI);
497 : }
498 13030 : else if (len == 0)
499 12954 : break;
500 76 : else if (len < 0)
501 : {
502 76 : ereport(LOG,
503 : (errmsg("replication terminated by primary server"),
504 : errdetail("End of WAL reached on timeline %u at %X/%X.",
505 : startpointTLI,
506 : LSN_FORMAT_ARGS(LogstreamResult.Write))));
507 76 : endofwal = true;
508 76 : break;
509 : }
510 177494 : len = walrcv_receive(wrconn, &buf, &wait_fd);
511 : }
512 :
513 : /* Let the primary know that we received some data. */
514 13030 : XLogWalRcvSendReply(false, false);
515 :
516 : /*
517 : * If we've written some records, flush them to disk and
518 : * let the startup process and primary server know about
519 : * them.
520 : */
521 13030 : XLogWalRcvFlush(false, startpointTLI);
522 : }
523 :
524 : /* Check if we need to exit the streaming loop. */
525 19082 : if (endofwal)
526 76 : break;
527 :
528 : /* Find the soonest wakeup time, to limit our nap. */
529 19006 : nextWakeup = TIMESTAMP_INFINITY;
530 95030 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
531 76024 : nextWakeup = Min(wakeup[i], nextWakeup);
532 :
533 : /* Calculate the nap time, clamping as necessary. */
534 19006 : now = GetCurrentTimestamp();
535 19006 : nap = TimestampDifferenceMilliseconds(now, nextWakeup);
536 :
537 : /*
538 : * Ideally we would reuse a WaitEventSet object repeatedly
539 : * here to avoid the overheads of WaitLatchOrSocket on epoll
540 : * systems, but we can't be sure that libpq (or any other
541 : * walreceiver implementation) has the same socket (even if
542 : * the fd is the same number, it may have been closed and
543 : * reopened since the last time). In future, if there is a
544 : * function for removing sockets from WaitEventSet, then we
545 : * could add and remove just the socket each time, potentially
546 : * avoiding some system calls.
547 : */
548 : Assert(wait_fd != PGINVALID_SOCKET);
549 19006 : rc = WaitLatchOrSocket(MyLatch,
550 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
551 : WL_TIMEOUT | WL_LATCH_SET,
552 : wait_fd,
553 : nap,
554 : WAIT_EVENT_WAL_RECEIVER_MAIN);
555 19006 : if (rc & WL_LATCH_SET)
556 : {
557 5404 : ResetLatch(MyLatch);
558 5404 : ProcessWalRcvInterrupts();
559 :
560 5278 : if (walrcv->force_reply)
561 : {
562 : /*
563 : * The recovery process has asked us to send apply
564 : * feedback now. Make sure the flag is really set to
565 : * false in shared memory before sending the reply, so
566 : * we don't miss a new request for a reply.
567 : */
568 5196 : walrcv->force_reply = false;
569 5196 : pg_memory_barrier();
570 5196 : XLogWalRcvSendReply(true, false);
571 : }
572 : }
573 18880 : if (rc & WL_TIMEOUT)
574 : {
575 : /*
576 : * We didn't receive anything new. If we haven't heard
577 : * anything from the server for more than
578 : * wal_receiver_timeout / 2, ping the server. Also, if
579 : * it's been longer than wal_receiver_status_interval
580 : * since the last update we sent, send a status update to
581 : * the primary anyway, to report any progress in applying
582 : * WAL.
583 : */
584 8 : bool requestReply = false;
585 :
586 : /*
587 : * Check if time since last receive from primary has
588 : * reached the configured limit.
589 : */
590 8 : now = GetCurrentTimestamp();
591 8 : if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
592 0 : ereport(ERROR,
593 : (errcode(ERRCODE_CONNECTION_FAILURE),
594 : errmsg("terminating walreceiver due to timeout")));
595 :
596 : /*
597 : * If we didn't receive anything new for half of receiver
598 : * replication timeout, then ping the server.
599 : */
600 8 : if (now >= wakeup[WALRCV_WAKEUP_PING])
601 : {
602 0 : requestReply = true;
603 0 : wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
604 : }
605 :
606 8 : XLogWalRcvSendReply(requestReply, requestReply);
607 8 : XLogWalRcvSendHSFeedback(false);
608 : }
609 : }
610 :
611 : /*
612 : * The backend finished streaming. Exit streaming COPY-mode from
613 : * our side, too.
614 : */
615 76 : walrcv_endstreaming(wrconn, &primaryTLI);
616 :
617 : /*
618 : * If the server had switched to a new timeline that we didn't
619 : * know about when we began streaming, fetch its timeline history
620 : * file now.
621 : */
622 24 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
623 : }
624 : else
625 0 : ereport(LOG,
626 : (errmsg("primary server contains no more WAL on requested timeline %u",
627 : startpointTLI)));
628 :
629 : /*
630 : * End of WAL reached on the requested timeline. Close the last
631 : * segment, and await for new orders from the startup process.
632 : */
633 24 : if (recvFile >= 0)
634 : {
635 : char xlogfname[MAXFNAMELEN];
636 :
637 22 : XLogWalRcvFlush(false, startpointTLI);
638 22 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
639 22 : if (close(recvFile) != 0)
640 0 : ereport(PANIC,
641 : (errcode_for_file_access(),
642 : errmsg("could not close WAL segment %s: %m",
643 : xlogfname)));
644 :
645 : /*
646 : * Create .done file forcibly to prevent the streamed segment from
647 : * being archived later.
648 : */
649 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
650 22 : XLogArchiveForceDone(xlogfname);
651 : else
652 0 : XLogArchiveNotify(xlogfname);
653 : }
654 24 : recvFile = -1;
655 :
656 24 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
657 24 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
658 : }
659 : /* not reached */
660 : }
661 :
662 : /*
663 : * Wait for startup process to set receiveStart and receiveStartTLI.
664 : */
665 : static void
666 24 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
667 : {
668 24 : WalRcvData *walrcv = WalRcv;
669 : int state;
670 :
671 24 : SpinLockAcquire(&walrcv->mutex);
672 24 : state = walrcv->walRcvState;
673 24 : if (state != WALRCV_STREAMING)
674 : {
675 0 : SpinLockRelease(&walrcv->mutex);
676 0 : if (state == WALRCV_STOPPING)
677 0 : proc_exit(0);
678 : else
679 0 : elog(FATAL, "unexpected walreceiver state");
680 : }
681 24 : walrcv->walRcvState = WALRCV_WAITING;
682 24 : walrcv->receiveStart = InvalidXLogRecPtr;
683 24 : walrcv->receiveStartTLI = 0;
684 24 : SpinLockRelease(&walrcv->mutex);
685 :
686 24 : set_ps_display("idle");
687 :
688 : /*
689 : * nudge startup process to notice that we've stopped streaming and are
690 : * now waiting for instructions.
691 : */
692 24 : WakeupRecovery();
693 : for (;;)
694 : {
695 48 : ResetLatch(MyLatch);
696 :
697 48 : ProcessWalRcvInterrupts();
698 :
699 24 : SpinLockAcquire(&walrcv->mutex);
700 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
701 : walrcv->walRcvState == WALRCV_WAITING ||
702 : walrcv->walRcvState == WALRCV_STOPPING);
703 24 : if (walrcv->walRcvState == WALRCV_RESTARTING)
704 : {
705 : /*
706 : * No need to handle changes in primary_conninfo or
707 : * primary_slot_name here. Startup process will signal us to
708 : * terminate in case those change.
709 : */
710 0 : *startpoint = walrcv->receiveStart;
711 0 : *startpointTLI = walrcv->receiveStartTLI;
712 0 : walrcv->walRcvState = WALRCV_STREAMING;
713 0 : SpinLockRelease(&walrcv->mutex);
714 0 : break;
715 : }
716 24 : if (walrcv->walRcvState == WALRCV_STOPPING)
717 : {
718 : /*
719 : * We should've received SIGTERM if the startup process wants us
720 : * to die, but might as well check it here too.
721 : */
722 0 : SpinLockRelease(&walrcv->mutex);
723 0 : exit(1);
724 : }
725 24 : SpinLockRelease(&walrcv->mutex);
726 :
727 24 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
728 : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
729 : }
730 :
731 0 : if (update_process_title)
732 : {
733 : char activitymsg[50];
734 :
735 0 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
736 0 : LSN_FORMAT_ARGS(*startpoint));
737 0 : set_ps_display(activitymsg);
738 : }
739 0 : }
740 :
741 : /*
742 : * Fetch any missing timeline history files between 'first' and 'last'
743 : * (inclusive) from the server.
744 : */
745 : static void
746 290 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
747 : {
748 : TimeLineID tli;
749 :
750 626 : for (tli = first; tli <= last; tli++)
751 : {
752 : /* there's no history file for timeline 1 */
753 336 : if (tli != 1 && !existsTimeLineHistory(tli))
754 : {
755 : char *fname;
756 : char *content;
757 : int len;
758 : char expectedfname[MAXFNAMELEN];
759 :
760 22 : ereport(LOG,
761 : (errmsg("fetching timeline history file for timeline %u from primary server",
762 : tli)));
763 :
764 22 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
765 :
766 : /*
767 : * Check that the filename on the primary matches what we
768 : * calculated ourselves. This is just a sanity check, it should
769 : * always match.
770 : */
771 22 : TLHistoryFileName(expectedfname, tli);
772 22 : if (strcmp(fname, expectedfname) != 0)
773 0 : ereport(ERROR,
774 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
775 : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
776 : tli)));
777 :
778 : /*
779 : * Write the file to pg_wal.
780 : */
781 22 : writeTimeLineHistoryFile(tli, content, len);
782 :
783 : /*
784 : * Mark the streamed history file as ready for archiving if
785 : * archive_mode is always.
786 : */
787 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
788 22 : XLogArchiveForceDone(fname);
789 : else
790 0 : XLogArchiveNotify(fname);
791 :
792 22 : pfree(fname);
793 22 : pfree(content);
794 : }
795 : }
796 290 : }
797 :
798 : /*
799 : * Mark us as STOPPED in shared memory at exit.
800 : */
801 : static void
802 390 : WalRcvDie(int code, Datum arg)
803 : {
804 390 : WalRcvData *walrcv = WalRcv;
805 390 : TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
806 :
807 : Assert(*startpointTLI_p != 0);
808 :
809 : /* Ensure that all WAL records received are flushed to disk */
810 390 : XLogWalRcvFlush(true, *startpointTLI_p);
811 :
812 : /* Mark ourselves inactive in shared memory */
813 390 : SpinLockAcquire(&walrcv->mutex);
814 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
815 : walrcv->walRcvState == WALRCV_RESTARTING ||
816 : walrcv->walRcvState == WALRCV_STARTING ||
817 : walrcv->walRcvState == WALRCV_WAITING ||
818 : walrcv->walRcvState == WALRCV_STOPPING);
819 : Assert(walrcv->pid == MyProcPid);
820 390 : walrcv->walRcvState = WALRCV_STOPPED;
821 390 : walrcv->pid = 0;
822 390 : walrcv->procno = INVALID_PROC_NUMBER;
823 390 : walrcv->ready_to_display = false;
824 390 : SpinLockRelease(&walrcv->mutex);
825 :
826 390 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
827 :
828 : /* Terminate the connection gracefully. */
829 390 : if (wrconn != NULL)
830 266 : walrcv_disconnect(wrconn);
831 :
832 : /* Wake up the startup process to notice promptly that we're gone */
833 390 : WakeupRecovery();
834 390 : }
835 :
836 : /*
837 : * Accept the message from XLOG stream, and process it.
838 : */
839 : static void
840 177494 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
841 : {
842 : int hdrlen;
843 : XLogRecPtr dataStart;
844 : XLogRecPtr walEnd;
845 : TimestampTz sendTime;
846 : bool replyRequested;
847 :
848 177494 : switch (type)
849 : {
850 177494 : case 'w': /* WAL records */
851 : {
852 : StringInfoData incoming_message;
853 :
854 177494 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
855 177494 : if (len < hdrlen)
856 0 : ereport(ERROR,
857 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
858 : errmsg_internal("invalid WAL message received from primary")));
859 :
860 : /* initialize a StringInfo with the given buffer */
861 177494 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
862 :
863 : /* read the fields */
864 177494 : dataStart = pq_getmsgint64(&incoming_message);
865 177494 : walEnd = pq_getmsgint64(&incoming_message);
866 177494 : sendTime = pq_getmsgint64(&incoming_message);
867 177494 : ProcessWalSndrMessage(walEnd, sendTime);
868 :
869 177494 : buf += hdrlen;
870 177494 : len -= hdrlen;
871 177494 : XLogWalRcvWrite(buf, len, dataStart, tli);
872 177494 : break;
873 : }
874 0 : case 'k': /* Keepalive */
875 : {
876 : StringInfoData incoming_message;
877 :
878 0 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
879 0 : if (len != hdrlen)
880 0 : ereport(ERROR,
881 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
882 : errmsg_internal("invalid keepalive message received from primary")));
883 :
884 : /* initialize a StringInfo with the given buffer */
885 0 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
886 :
887 : /* read the fields */
888 0 : walEnd = pq_getmsgint64(&incoming_message);
889 0 : sendTime = pq_getmsgint64(&incoming_message);
890 0 : replyRequested = pq_getmsgbyte(&incoming_message);
891 :
892 0 : ProcessWalSndrMessage(walEnd, sendTime);
893 :
894 : /* If the primary requested a reply, send one immediately */
895 0 : if (replyRequested)
896 0 : XLogWalRcvSendReply(true, false);
897 0 : break;
898 : }
899 0 : default:
900 0 : ereport(ERROR,
901 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
902 : errmsg_internal("invalid replication message type %d",
903 : type)));
904 : }
905 177494 : }
906 :
907 : /*
908 : * Write XLOG data to disk.
909 : */
910 : static void
911 177494 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
912 : {
913 : int startoff;
914 : int byteswritten;
915 :
916 : Assert(tli != 0);
917 :
918 356260 : while (nbytes > 0)
919 : {
920 : int segbytes;
921 :
922 : /* Close the current segment if it's completed */
923 178766 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
924 1272 : XLogWalRcvClose(recptr, tli);
925 :
926 178766 : if (recvFile < 0)
927 : {
928 : /* Create/use new log file */
929 1598 : XLByteToSeg(recptr, recvSegNo, wal_segment_size);
930 1598 : recvFile = XLogFileInit(recvSegNo, tli);
931 1598 : recvFileTLI = tli;
932 : }
933 :
934 : /* Calculate the start offset of the received logs */
935 178766 : startoff = XLogSegmentOffset(recptr, wal_segment_size);
936 :
937 178766 : if (startoff + nbytes > wal_segment_size)
938 1272 : segbytes = wal_segment_size - startoff;
939 : else
940 177494 : segbytes = nbytes;
941 :
942 : /* OK to write the logs */
943 178766 : errno = 0;
944 :
945 178766 : byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
946 178766 : if (byteswritten <= 0)
947 : {
948 : char xlogfname[MAXFNAMELEN];
949 : int save_errno;
950 :
951 : /* if write didn't set errno, assume no disk space */
952 0 : if (errno == 0)
953 0 : errno = ENOSPC;
954 :
955 0 : save_errno = errno;
956 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
957 0 : errno = save_errno;
958 0 : ereport(PANIC,
959 : (errcode_for_file_access(),
960 : errmsg("could not write to WAL segment %s "
961 : "at offset %d, length %lu: %m",
962 : xlogfname, startoff, (unsigned long) segbytes)));
963 : }
964 :
965 : /* Update state for write */
966 178766 : recptr += byteswritten;
967 :
968 178766 : nbytes -= byteswritten;
969 178766 : buf += byteswritten;
970 :
971 178766 : LogstreamResult.Write = recptr;
972 : }
973 :
974 : /* Update shared-memory status */
975 177494 : pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
976 :
977 : /*
978 : * Close the current segment if it's fully written up in the last cycle of
979 : * the loop, to create its archive notification file soon. Otherwise WAL
980 : * archiving of the segment will be delayed until any data in the next
981 : * segment is received and written.
982 : */
983 177494 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
984 100 : XLogWalRcvClose(recptr, tli);
985 177494 : }
986 :
987 : /*
988 : * Flush the log to disk.
989 : *
990 : * If we're in the midst of dying, it's unwise to do anything that might throw
991 : * an error, so we skip sending a reply in that case.
992 : */
993 : static void
994 14814 : XLogWalRcvFlush(bool dying, TimeLineID tli)
995 : {
996 : Assert(tli != 0);
997 :
998 14814 : if (LogstreamResult.Flush < LogstreamResult.Write)
999 : {
1000 14230 : WalRcvData *walrcv = WalRcv;
1001 :
1002 14230 : issue_xlog_fsync(recvFile, recvSegNo, tli);
1003 :
1004 14230 : LogstreamResult.Flush = LogstreamResult.Write;
1005 :
1006 : /* Update shared-memory status */
1007 14230 : SpinLockAcquire(&walrcv->mutex);
1008 14230 : if (walrcv->flushedUpto < LogstreamResult.Flush)
1009 : {
1010 14230 : walrcv->latestChunkStart = walrcv->flushedUpto;
1011 14230 : walrcv->flushedUpto = LogstreamResult.Flush;
1012 14230 : walrcv->receivedTLI = tli;
1013 : }
1014 14230 : SpinLockRelease(&walrcv->mutex);
1015 :
1016 : /* Signal the startup process and walsender that new WAL has arrived */
1017 14230 : WakeupRecovery();
1018 14230 : if (AllowCascadeReplication())
1019 14230 : WalSndWakeup(true, false);
1020 :
1021 : /* Report XLOG streaming progress in PS display */
1022 14230 : if (update_process_title)
1023 : {
1024 : char activitymsg[50];
1025 :
1026 14230 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1027 14230 : LSN_FORMAT_ARGS(LogstreamResult.Write));
1028 14230 : set_ps_display(activitymsg);
1029 : }
1030 :
1031 : /* Also let the primary know that we made some progress */
1032 14230 : if (!dying)
1033 : {
1034 14226 : XLogWalRcvSendReply(false, false);
1035 14226 : XLogWalRcvSendHSFeedback(false);
1036 : }
1037 : }
1038 14814 : }
1039 :
1040 : /*
1041 : * Close the current segment.
1042 : *
1043 : * Flush the segment to disk before closing it. Otherwise we have to
1044 : * reopen and fsync it later.
1045 : *
1046 : * Create an archive notification file since the segment is known completed.
1047 : */
1048 : static void
1049 1372 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
1050 : {
1051 : char xlogfname[MAXFNAMELEN];
1052 :
1053 : Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1054 : Assert(tli != 0);
1055 :
1056 : /*
1057 : * fsync() and close current file before we switch to next one. We would
1058 : * otherwise have to reopen this file to fsync it later
1059 : */
1060 1372 : XLogWalRcvFlush(false, tli);
1061 :
1062 1372 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1063 :
1064 : /*
1065 : * XLOG segment files will be re-read by recovery in startup process soon,
1066 : * so we don't advise the OS to release cache pages associated with the
1067 : * file like XLogFileClose() does.
1068 : */
1069 1372 : if (close(recvFile) != 0)
1070 0 : ereport(PANIC,
1071 : (errcode_for_file_access(),
1072 : errmsg("could not close WAL segment %s: %m",
1073 : xlogfname)));
1074 :
1075 : /*
1076 : * Create .done file forcibly to prevent the streamed segment from being
1077 : * archived later.
1078 : */
1079 1372 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1080 1372 : XLogArchiveForceDone(xlogfname);
1081 : else
1082 0 : XLogArchiveNotify(xlogfname);
1083 :
1084 1372 : recvFile = -1;
1085 1372 : }
1086 :
1087 : /*
1088 : * Send reply message to primary, indicating our current WAL locations, oldest
1089 : * xmin and the current time.
1090 : *
1091 : * If 'force' is not set, the message is only sent if enough time has
1092 : * passed since last status update to reach wal_receiver_status_interval.
1093 : * If wal_receiver_status_interval is disabled altogether and 'force' is
1094 : * false, this is a no-op.
1095 : *
1096 : * If 'requestReply' is true, requests the server to reply immediately upon
1097 : * receiving this message. This is used for heartbeats, when approaching
1098 : * wal_receiver_timeout.
1099 : */
1100 : static void
1101 32726 : XLogWalRcvSendReply(bool force, bool requestReply)
1102 : {
1103 : static XLogRecPtr writePtr = 0;
1104 : static XLogRecPtr flushPtr = 0;
1105 : XLogRecPtr applyPtr;
1106 : TimestampTz now;
1107 :
1108 : /*
1109 : * If the user doesn't want status to be reported to the primary, be sure
1110 : * to exit before doing anything at all.
1111 : */
1112 32726 : if (!force && wal_receiver_status_interval <= 0)
1113 0 : return;
1114 :
1115 : /* Get current timestamp. */
1116 32726 : now = GetCurrentTimestamp();
1117 :
1118 : /*
1119 : * We can compare the write and flush positions to the last message we
1120 : * sent without taking any lock, but the apply position requires a spin
1121 : * lock, so we don't check that unless something else has changed or 10
1122 : * seconds have passed. This means that the apply WAL location will
1123 : * appear, from the primary's point of view, to lag slightly, but since
1124 : * this is only for reporting purposes and only on idle systems, that's
1125 : * probably OK.
1126 : */
1127 32726 : if (!force
1128 27264 : && writePtr == LogstreamResult.Write
1129 13028 : && flushPtr == LogstreamResult.Flush
1130 174 : && now < wakeup[WALRCV_WAKEUP_REPLY])
1131 174 : return;
1132 :
1133 : /* Make sure we wake up when it's time to send another reply. */
1134 32552 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
1135 :
1136 : /* Construct a new message */
1137 32552 : writePtr = LogstreamResult.Write;
1138 32552 : flushPtr = LogstreamResult.Flush;
1139 32552 : applyPtr = GetXLogReplayRecPtr(NULL);
1140 :
1141 32552 : resetStringInfo(&reply_message);
1142 32552 : pq_sendbyte(&reply_message, 'r');
1143 32552 : pq_sendint64(&reply_message, writePtr);
1144 32552 : pq_sendint64(&reply_message, flushPtr);
1145 32552 : pq_sendint64(&reply_message, applyPtr);
1146 32552 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1147 32552 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1148 :
1149 : /* Send it */
1150 32552 : elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1151 : LSN_FORMAT_ARGS(writePtr),
1152 : LSN_FORMAT_ARGS(flushPtr),
1153 : LSN_FORMAT_ARGS(applyPtr),
1154 : requestReply ? " (reply requested)" : "");
1155 :
1156 32552 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1157 : }
1158 :
1159 : /*
1160 : * Send hot standby feedback message to primary, plus the current time,
1161 : * in case they don't have a watch.
1162 : *
1163 : * If the user disables feedback, send one final message to tell sender
1164 : * to forget about the xmin on this standby. We also send this message
1165 : * on first connect because a previous connection might have set xmin
1166 : * on a replication slot. (If we're not using a slot it's harmless to
1167 : * send a feedback message explicitly setting InvalidTransactionId).
1168 : */
1169 : static void
1170 14548 : XLogWalRcvSendHSFeedback(bool immed)
1171 : {
1172 : TimestampTz now;
1173 : FullTransactionId nextFullXid;
1174 : TransactionId nextXid;
1175 : uint32 xmin_epoch,
1176 : catalog_xmin_epoch;
1177 : TransactionId xmin,
1178 : catalog_xmin;
1179 :
1180 : /* initially true so we always send at least one feedback message */
1181 : static bool primary_has_standby_xmin = true;
1182 :
1183 : /*
1184 : * If the user doesn't want status to be reported to the primary, be sure
1185 : * to exit before doing anything at all.
1186 : */
1187 14548 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1188 14084 : !primary_has_standby_xmin)
1189 14240 : return;
1190 :
1191 : /* Get current timestamp. */
1192 694 : now = GetCurrentTimestamp();
1193 :
1194 : /* Send feedback at most once per wal_receiver_status_interval. */
1195 694 : if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1196 384 : return;
1197 :
1198 : /* Make sure we wake up when it's time to send feedback again. */
1199 310 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1200 :
1201 : /*
1202 : * If Hot Standby is not yet accepting connections there is nothing to
1203 : * send. Check this after the interval has expired to reduce number of
1204 : * calls.
1205 : *
1206 : * Bailing out here also ensures that we don't send feedback until we've
1207 : * read our own replication slot state, so we don't tell the primary to
1208 : * discard needed xmin or catalog_xmin from any slots that may exist on
1209 : * this replica.
1210 : */
1211 310 : if (!HotStandbyActive())
1212 2 : return;
1213 :
1214 : /*
1215 : * Make the expensive call to get the oldest xmin once we are certain
1216 : * everything else has been checked.
1217 : */
1218 308 : if (hot_standby_feedback)
1219 : {
1220 86 : GetReplicationHorizons(&xmin, &catalog_xmin);
1221 : }
1222 : else
1223 : {
1224 222 : xmin = InvalidTransactionId;
1225 222 : catalog_xmin = InvalidTransactionId;
1226 : }
1227 :
1228 : /*
1229 : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1230 : * the epoch boundary.
1231 : */
1232 308 : nextFullXid = ReadNextFullTransactionId();
1233 308 : nextXid = XidFromFullTransactionId(nextFullXid);
1234 308 : xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1235 308 : catalog_xmin_epoch = xmin_epoch;
1236 308 : if (nextXid < xmin)
1237 0 : xmin_epoch--;
1238 308 : if (nextXid < catalog_xmin)
1239 0 : catalog_xmin_epoch--;
1240 :
1241 308 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1242 : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1243 :
1244 : /* Construct the message and send it. */
1245 308 : resetStringInfo(&reply_message);
1246 308 : pq_sendbyte(&reply_message, 'h');
1247 308 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1248 308 : pq_sendint32(&reply_message, xmin);
1249 308 : pq_sendint32(&reply_message, xmin_epoch);
1250 308 : pq_sendint32(&reply_message, catalog_xmin);
1251 308 : pq_sendint32(&reply_message, catalog_xmin_epoch);
1252 308 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1253 308 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1254 86 : primary_has_standby_xmin = true;
1255 : else
1256 222 : primary_has_standby_xmin = false;
1257 : }
1258 :
1259 : /*
1260 : * Update shared memory status upon receiving a message from primary.
1261 : *
1262 : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1263 : * message, reported by primary.
1264 : */
1265 : static void
1266 177494 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1267 : {
1268 177494 : WalRcvData *walrcv = WalRcv;
1269 177494 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1270 :
1271 : /* Update shared-memory status */
1272 177494 : SpinLockAcquire(&walrcv->mutex);
1273 177494 : if (walrcv->latestWalEnd < walEnd)
1274 10926 : walrcv->latestWalEndTime = sendTime;
1275 177494 : walrcv->latestWalEnd = walEnd;
1276 177494 : walrcv->lastMsgSendTime = sendTime;
1277 177494 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1278 177494 : SpinLockRelease(&walrcv->mutex);
1279 :
1280 177494 : if (message_level_is_interesting(DEBUG2))
1281 : {
1282 : char *sendtime;
1283 : char *receipttime;
1284 : int applyDelay;
1285 :
1286 : /* Copy because timestamptz_to_str returns a static buffer */
1287 766 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1288 766 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1289 766 : applyDelay = GetReplicationApplyDelay();
1290 :
1291 : /* apply delay is not available */
1292 766 : if (applyDelay == -1)
1293 2 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1294 : sendtime,
1295 : receipttime,
1296 : GetReplicationTransferLatency());
1297 : else
1298 764 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1299 : sendtime,
1300 : receipttime,
1301 : applyDelay,
1302 : GetReplicationTransferLatency());
1303 :
1304 766 : pfree(sendtime);
1305 766 : pfree(receipttime);
1306 : }
1307 177494 : }
1308 :
1309 : /*
1310 : * Compute the next wakeup time for a given wakeup reason. Can be called to
1311 : * initialize a wakeup time, to adjust it for the next wakeup, or to
1312 : * reinitialize it when GUCs have changed. We ask the caller to pass in the
1313 : * value of "now" because this frequently avoids multiple calls of
1314 : * GetCurrentTimestamp(). It had better be a reasonably up-to-date value
1315 : * though.
1316 : */
1317 : static void
1318 389106 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1319 : {
1320 389106 : switch (reason)
1321 : {
1322 177808 : case WALRCV_WAKEUP_TERMINATE:
1323 177808 : if (wal_receiver_timeout <= 0)
1324 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1325 : else
1326 177808 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
1327 177808 : break;
1328 177808 : case WALRCV_WAKEUP_PING:
1329 177808 : if (wal_receiver_timeout <= 0)
1330 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1331 : else
1332 177808 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
1333 177808 : break;
1334 624 : case WALRCV_WAKEUP_HSFEEDBACK:
1335 624 : if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
1336 460 : wakeup[reason] = TIMESTAMP_INFINITY;
1337 : else
1338 164 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1339 624 : break;
1340 32866 : case WALRCV_WAKEUP_REPLY:
1341 32866 : if (wal_receiver_status_interval <= 0)
1342 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1343 : else
1344 32866 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1345 32866 : break;
1346 : /* there's intentionally no default: here */
1347 : }
1348 389106 : }
1349 :
1350 : /*
1351 : * Wake up the walreceiver main loop.
1352 : *
1353 : * This is called by the startup process whenever interesting xlog records
1354 : * are applied, so that walreceiver can check if it needs to send an apply
1355 : * notification back to the primary which may be waiting in a COMMIT with
1356 : * synchronous_commit = remote_apply.
1357 : */
1358 : void
1359 5176 : WalRcvForceReply(void)
1360 : {
1361 : ProcNumber procno;
1362 :
1363 5176 : WalRcv->force_reply = true;
1364 : /* fetching the proc number is probably atomic, but don't rely on it */
1365 5176 : SpinLockAcquire(&WalRcv->mutex);
1366 5176 : procno = WalRcv->procno;
1367 5176 : SpinLockRelease(&WalRcv->mutex);
1368 5176 : if (procno != INVALID_PROC_NUMBER)
1369 4896 : SetLatch(&GetPGProcByNumber(procno)->procLatch);
1370 5176 : }
1371 :
1372 : /*
1373 : * Return a string constant representing the state. This is used
1374 : * in system functions and views, and should *not* be translated.
1375 : */
1376 : static const char *
1377 0 : WalRcvGetStateString(WalRcvState state)
1378 : {
1379 0 : switch (state)
1380 : {
1381 0 : case WALRCV_STOPPED:
1382 0 : return "stopped";
1383 0 : case WALRCV_STARTING:
1384 0 : return "starting";
1385 0 : case WALRCV_STREAMING:
1386 0 : return "streaming";
1387 0 : case WALRCV_WAITING:
1388 0 : return "waiting";
1389 0 : case WALRCV_RESTARTING:
1390 0 : return "restarting";
1391 0 : case WALRCV_STOPPING:
1392 0 : return "stopping";
1393 : }
1394 0 : return "UNKNOWN";
1395 : }
1396 :
1397 : /*
1398 : * Returns activity of WAL receiver, including pid, state and xlog locations
1399 : * received from the WAL sender of another server.
1400 : */
1401 : Datum
1402 6 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1403 : {
1404 : TupleDesc tupdesc;
1405 : Datum *values;
1406 : bool *nulls;
1407 : int pid;
1408 : bool ready_to_display;
1409 : WalRcvState state;
1410 : XLogRecPtr receive_start_lsn;
1411 : TimeLineID receive_start_tli;
1412 : XLogRecPtr written_lsn;
1413 : XLogRecPtr flushed_lsn;
1414 : TimeLineID received_tli;
1415 : TimestampTz last_send_time;
1416 : TimestampTz last_receipt_time;
1417 : XLogRecPtr latest_end_lsn;
1418 : TimestampTz latest_end_time;
1419 : char sender_host[NI_MAXHOST];
1420 6 : int sender_port = 0;
1421 : char slotname[NAMEDATALEN];
1422 : char conninfo[MAXCONNINFO];
1423 :
1424 : /* Take a lock to ensure value consistency */
1425 6 : SpinLockAcquire(&WalRcv->mutex);
1426 6 : pid = (int) WalRcv->pid;
1427 6 : ready_to_display = WalRcv->ready_to_display;
1428 6 : state = WalRcv->walRcvState;
1429 6 : receive_start_lsn = WalRcv->receiveStart;
1430 6 : receive_start_tli = WalRcv->receiveStartTLI;
1431 6 : flushed_lsn = WalRcv->flushedUpto;
1432 6 : received_tli = WalRcv->receivedTLI;
1433 6 : last_send_time = WalRcv->lastMsgSendTime;
1434 6 : last_receipt_time = WalRcv->lastMsgReceiptTime;
1435 6 : latest_end_lsn = WalRcv->latestWalEnd;
1436 6 : latest_end_time = WalRcv->latestWalEndTime;
1437 6 : strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1438 6 : strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1439 6 : sender_port = WalRcv->sender_port;
1440 6 : strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1441 6 : SpinLockRelease(&WalRcv->mutex);
1442 :
1443 : /*
1444 : * No WAL receiver (or not ready yet), just return a tuple with NULL
1445 : * values
1446 : */
1447 6 : if (pid == 0 || !ready_to_display)
1448 6 : PG_RETURN_NULL();
1449 :
1450 : /*
1451 : * Read "writtenUpto" without holding a spinlock. Note that it may not be
1452 : * consistent with the other shared variables of the WAL receiver
1453 : * protected by a spinlock, but this should not be used for data integrity
1454 : * checks.
1455 : */
1456 0 : written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1457 :
1458 : /* determine result type */
1459 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1460 0 : elog(ERROR, "return type must be a row type");
1461 :
1462 0 : values = palloc0(sizeof(Datum) * tupdesc->natts);
1463 0 : nulls = palloc0(sizeof(bool) * tupdesc->natts);
1464 :
1465 : /* Fetch values */
1466 0 : values[0] = Int32GetDatum(pid);
1467 :
1468 0 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1469 : {
1470 : /*
1471 : * Only superusers and roles with privileges of pg_read_all_stats can
1472 : * see details. Other users only get the pid value to know whether it
1473 : * is a WAL receiver, but no details.
1474 : */
1475 0 : memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1476 : }
1477 : else
1478 : {
1479 0 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1480 :
1481 0 : if (XLogRecPtrIsInvalid(receive_start_lsn))
1482 0 : nulls[2] = true;
1483 : else
1484 0 : values[2] = LSNGetDatum(receive_start_lsn);
1485 0 : values[3] = Int32GetDatum(receive_start_tli);
1486 0 : if (XLogRecPtrIsInvalid(written_lsn))
1487 0 : nulls[4] = true;
1488 : else
1489 0 : values[4] = LSNGetDatum(written_lsn);
1490 0 : if (XLogRecPtrIsInvalid(flushed_lsn))
1491 0 : nulls[5] = true;
1492 : else
1493 0 : values[5] = LSNGetDatum(flushed_lsn);
1494 0 : values[6] = Int32GetDatum(received_tli);
1495 0 : if (last_send_time == 0)
1496 0 : nulls[7] = true;
1497 : else
1498 0 : values[7] = TimestampTzGetDatum(last_send_time);
1499 0 : if (last_receipt_time == 0)
1500 0 : nulls[8] = true;
1501 : else
1502 0 : values[8] = TimestampTzGetDatum(last_receipt_time);
1503 0 : if (XLogRecPtrIsInvalid(latest_end_lsn))
1504 0 : nulls[9] = true;
1505 : else
1506 0 : values[9] = LSNGetDatum(latest_end_lsn);
1507 0 : if (latest_end_time == 0)
1508 0 : nulls[10] = true;
1509 : else
1510 0 : values[10] = TimestampTzGetDatum(latest_end_time);
1511 0 : if (*slotname == '\0')
1512 0 : nulls[11] = true;
1513 : else
1514 0 : values[11] = CStringGetTextDatum(slotname);
1515 0 : if (*sender_host == '\0')
1516 0 : nulls[12] = true;
1517 : else
1518 0 : values[12] = CStringGetTextDatum(sender_host);
1519 0 : if (sender_port == 0)
1520 0 : nulls[13] = true;
1521 : else
1522 0 : values[13] = Int32GetDatum(sender_port);
1523 0 : if (*conninfo == '\0')
1524 0 : nulls[14] = true;
1525 : else
1526 0 : values[14] = CStringGetTextDatum(conninfo);
1527 : }
1528 :
1529 : /* Returns the record as Datum */
1530 0 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1531 : }
|