Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.c
4 : *
5 : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : * is the process in the standby server that takes charge of receiving
7 : * XLOG records from a primary server during streaming replication.
8 : *
9 : * When the startup process determines that it's time to start streaming,
10 : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : * to the primary server (it will be served by a walsender process
12 : * in the primary server), and then keeps receiving XLOG records and
13 : * writing them to the disk as long as the connection is alive. As XLOG
14 : * records are received and flushed to disk, it updates the
15 : * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 : * process of how far it can proceed with XLOG replay.
17 : *
18 : * A WAL receiver cannot directly load GUC parameters used when establishing
19 : * its connection to the primary. Instead it relies on parameter values
20 : * that are passed down by the startup process when streaming is requested.
21 : * This applies, for example, to the replication slot and the connection
22 : * string to be used for the connection with the primary.
23 : *
24 : * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 : * goes into "waiting" mode, and waits for the startup process to give new
26 : * instructions. The startup process will treat that the same as
27 : * disconnection, and will rescan the archive/pg_wal directory. But when the
28 : * startup process wants to try streaming replication again, it will just
29 : * nudge the existing walreceiver process that's waiting, instead of launching
30 : * a new one.
31 : *
32 : * Normal termination is by SIGTERM, which instructs the walreceiver to
33 : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 : * of the connection and a FATAL error are treated not as a crash but as
36 : * normal operation.
37 : *
38 : * This file contains the server-facing parts of walreceiver. The libpq-
39 : * specific parts are in the libpqwalreceiver module. It's loaded
40 : * dynamically to avoid linking the server with libpq.
41 : *
42 : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
43 : *
44 : *
45 : * IDENTIFICATION
46 : * src/backend/replication/walreceiver.c
47 : *
48 : *-------------------------------------------------------------------------
49 : */
50 : #include "postgres.h"
51 :
52 : #include <unistd.h>
53 :
54 : #include "access/htup_details.h"
55 : #include "access/timeline.h"
56 : #include "access/transam.h"
57 : #include "access/xlog_internal.h"
58 : #include "access/xlogarchive.h"
59 : #include "access/xlogrecovery.h"
60 : #include "catalog/pg_authid.h"
61 : #include "funcapi.h"
62 : #include "libpq/pqformat.h"
63 : #include "libpq/pqsignal.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "postmaster/auxprocess.h"
67 : #include "postmaster/interrupt.h"
68 : #include "replication/walreceiver.h"
69 : #include "replication/walsender.h"
70 : #include "storage/ipc.h"
71 : #include "storage/proc.h"
72 : #include "storage/procarray.h"
73 : #include "storage/procsignal.h"
74 : #include "utils/acl.h"
75 : #include "utils/builtins.h"
76 : #include "utils/guc.h"
77 : #include "utils/pg_lsn.h"
78 : #include "utils/ps_status.h"
79 : #include "utils/timestamp.h"
80 :
81 :
82 : /*
83 : * GUC variables. (Other variables that affect walreceiver are in xlog.c
84 : * because they're passed down from the startup process, for better
85 : * synchronization.)
86 : */
87 : int wal_receiver_status_interval;
88 : int wal_receiver_timeout;
89 : bool hot_standby_feedback;
90 :
91 : /* libpqwalreceiver connection */
92 : static WalReceiverConn *wrconn = NULL;
93 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
94 :
95 : /*
96 : * These variables are used similarly to openLogFile/SegNo,
97 : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
98 : * corresponding the filename of recvFile.
99 : */
100 : static int recvFile = -1;
101 : static TimeLineID recvFileTLI = 0;
102 : static XLogSegNo recvSegNo = 0;
103 :
104 : /*
105 : * LogstreamResult indicates the byte positions that we have already
106 : * written/fsynced.
107 : */
108 : static struct
109 : {
110 : XLogRecPtr Write; /* last byte + 1 written out in the standby */
111 : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
112 : } LogstreamResult;
113 :
114 : /*
115 : * Reasons to wake up and perform periodic tasks.
116 : */
117 : typedef enum WalRcvWakeupReason
118 : {
119 : WALRCV_WAKEUP_TERMINATE,
120 : WALRCV_WAKEUP_PING,
121 : WALRCV_WAKEUP_REPLY,
122 : WALRCV_WAKEUP_HSFEEDBACK,
123 : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
124 : } WalRcvWakeupReason;
125 :
126 : /*
127 : * Wake up times for periodic tasks.
128 : */
129 : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
130 :
131 : static StringInfoData reply_message;
132 :
133 : /* Prototypes for private functions */
134 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
135 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
136 : static void WalRcvDie(int code, Datum arg);
137 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
138 : TimeLineID tli);
139 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
140 : TimeLineID tli);
141 : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
142 : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
143 : static void XLogWalRcvSendReply(bool force, bool requestReply);
144 : static void XLogWalRcvSendHSFeedback(bool immed);
145 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
146 : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
147 :
148 : /*
149 : * Process any interrupts the walreceiver process may have received.
150 : * This should be called any time the process's latch has become set.
151 : *
152 : * Currently, only SIGTERM is of interest. We can't just exit(1) within the
153 : * SIGTERM signal handler, because the signal might arrive in the middle of
154 : * some critical operation, like while we're holding a spinlock. Instead, the
155 : * signal handler sets a flag variable as well as setting the process's latch.
156 : * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
157 : * latch has become set. Operations that could block for a long time, such as
158 : * reading from a remote server, must pay attention to the latch too; see
159 : * libpqrcv_PQgetResult for example.
160 : */
161 : void
162 33752 : ProcessWalRcvInterrupts(void)
163 : {
164 : /*
165 : * Although walreceiver interrupt handling doesn't use the same scheme as
166 : * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
167 : * any incoming signals on Win32, and also to make sure we process any
168 : * barrier events.
169 : */
170 33752 : CHECK_FOR_INTERRUPTS();
171 :
172 33746 : if (ShutdownRequestPending)
173 : {
174 164 : ereport(FATAL,
175 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
176 : errmsg("terminating walreceiver process due to administrator command")));
177 : }
178 33582 : }
179 :
180 :
181 : /* Main entry point for walreceiver process */
182 : void
183 442 : WalReceiverMain(const void *startup_data, size_t startup_data_len)
184 : {
185 : char conninfo[MAXCONNINFO];
186 : char *tmp_conninfo;
187 : char slotname[NAMEDATALEN];
188 : bool is_temp_slot;
189 : XLogRecPtr startpoint;
190 : TimeLineID startpointTLI;
191 : TimeLineID primaryTLI;
192 : bool first_stream;
193 : WalRcvData *walrcv;
194 : TimestampTz now;
195 : char *err;
196 442 : char *sender_host = NULL;
197 442 : int sender_port = 0;
198 : char *appname;
199 :
200 : Assert(startup_data_len == 0);
201 :
202 442 : MyBackendType = B_WAL_RECEIVER;
203 442 : AuxiliaryProcessMainCommon();
204 :
205 : /*
206 : * WalRcv should be set up already (if we are a backend, we inherit this
207 : * by fork() or EXEC_BACKEND mechanism from the postmaster).
208 : */
209 442 : walrcv = WalRcv;
210 : Assert(walrcv != NULL);
211 :
212 : /*
213 : * Mark walreceiver as running in shared memory.
214 : *
215 : * Do this as early as possible, so that if we fail later on, we'll set
216 : * state to STOPPED. If we die before this, the startup process will keep
217 : * waiting for us to start up, until it times out.
218 : */
219 442 : SpinLockAcquire(&walrcv->mutex);
220 : Assert(walrcv->pid == 0);
221 442 : switch (walrcv->walRcvState)
222 : {
223 0 : case WALRCV_STOPPING:
224 : /* If we've already been requested to stop, don't start up. */
225 0 : walrcv->walRcvState = WALRCV_STOPPED;
226 : /* fall through */
227 :
228 6 : case WALRCV_STOPPED:
229 6 : SpinLockRelease(&walrcv->mutex);
230 6 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
231 6 : proc_exit(1);
232 : break;
233 :
234 436 : case WALRCV_STARTING:
235 : /* The usual case */
236 436 : break;
237 :
238 0 : case WALRCV_WAITING:
239 : case WALRCV_STREAMING:
240 : case WALRCV_RESTARTING:
241 : default:
242 : /* Shouldn't happen */
243 0 : SpinLockRelease(&walrcv->mutex);
244 0 : elog(PANIC, "walreceiver still running according to shared memory state");
245 : }
246 : /* Advertise our PID so that the startup process can kill us */
247 436 : walrcv->pid = MyProcPid;
248 436 : walrcv->walRcvState = WALRCV_STREAMING;
249 :
250 : /* Fetch information required to start streaming */
251 436 : walrcv->ready_to_display = false;
252 436 : strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
253 436 : strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
254 436 : is_temp_slot = walrcv->is_temp_slot;
255 436 : startpoint = walrcv->receiveStart;
256 436 : startpointTLI = walrcv->receiveStartTLI;
257 :
258 : /*
259 : * At most one of is_temp_slot and slotname can be set; otherwise,
260 : * RequestXLogStreaming messed up.
261 : */
262 : Assert(!is_temp_slot || (slotname[0] == '\0'));
263 :
264 : /* Initialise to a sanish value */
265 436 : now = GetCurrentTimestamp();
266 436 : walrcv->lastMsgSendTime =
267 436 : walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
268 :
269 : /* Report our proc number so that others can wake us up */
270 436 : walrcv->procno = MyProcNumber;
271 :
272 436 : SpinLockRelease(&walrcv->mutex);
273 :
274 436 : pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
275 :
276 : /* Arrange to clean up at walreceiver exit */
277 436 : on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
278 :
279 : /* Properly accept or ignore signals the postmaster might send us */
280 436 : pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
281 : * file */
282 436 : pqsignal(SIGINT, SIG_IGN);
283 436 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
284 : /* SIGQUIT handler was already set up by InitPostmasterChild */
285 436 : pqsignal(SIGALRM, SIG_IGN);
286 436 : pqsignal(SIGPIPE, SIG_IGN);
287 436 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
288 436 : pqsignal(SIGUSR2, SIG_IGN);
289 :
290 : /* Reset some signals that are accepted by postmaster but not here */
291 436 : pqsignal(SIGCHLD, SIG_DFL);
292 :
293 : /* Load the libpq-specific functions */
294 436 : load_file("libpqwalreceiver", false);
295 436 : if (WalReceiverFunctions == NULL)
296 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
297 :
298 : /* Unblock signals (they were blocked when the postmaster forked us) */
299 436 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
300 :
301 : /* Establish the connection to the primary for XLOG streaming */
302 436 : appname = cluster_name[0] ? cluster_name : "walreceiver";
303 436 : wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
304 432 : if (!wrconn)
305 156 : ereport(ERROR,
306 : (errcode(ERRCODE_CONNECTION_FAILURE),
307 : errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
308 : appname, err)));
309 :
310 : /*
311 : * Save user-visible connection string. This clobbers the original
312 : * conninfo, for security. Also save host and port of the sender server
313 : * this walreceiver is connected to.
314 : */
315 276 : tmp_conninfo = walrcv_get_conninfo(wrconn);
316 276 : walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
317 276 : SpinLockAcquire(&walrcv->mutex);
318 276 : memset(walrcv->conninfo, 0, MAXCONNINFO);
319 276 : if (tmp_conninfo)
320 276 : strlcpy(walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
321 :
322 276 : memset(walrcv->sender_host, 0, NI_MAXHOST);
323 276 : if (sender_host)
324 276 : strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
325 :
326 276 : walrcv->sender_port = sender_port;
327 276 : walrcv->ready_to_display = true;
328 276 : SpinLockRelease(&walrcv->mutex);
329 :
330 276 : if (tmp_conninfo)
331 276 : pfree(tmp_conninfo);
332 :
333 276 : if (sender_host)
334 276 : pfree(sender_host);
335 :
336 276 : first_stream = true;
337 : for (;;)
338 0 : {
339 : char *primary_sysid;
340 : char standby_sysid[32];
341 : WalRcvStreamOptions options;
342 :
343 : /*
344 : * Check that we're connected to a valid server using the
345 : * IDENTIFY_SYSTEM replication command.
346 : */
347 276 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
348 :
349 276 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
350 : GetSystemIdentifier());
351 276 : if (strcmp(primary_sysid, standby_sysid) != 0)
352 : {
353 0 : ereport(ERROR,
354 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
355 : errmsg("database system identifier differs between the primary and standby"),
356 : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
357 : primary_sysid, standby_sysid)));
358 : }
359 :
360 : /*
361 : * Confirm that the current timeline of the primary is the same or
362 : * ahead of ours.
363 : */
364 276 : if (primaryTLI < startpointTLI)
365 0 : ereport(ERROR,
366 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
367 : errmsg("highest timeline %u of the primary is behind recovery timeline %u",
368 : primaryTLI, startpointTLI)));
369 :
370 : /*
371 : * Get any missing history files. We do this always, even when we're
372 : * not interested in that timeline, so that if we're promoted to
373 : * become the primary later on, we don't select the same timeline that
374 : * was already used in the current primary. This isn't bullet-proof -
375 : * you'll need some external software to manage your cluster if you
376 : * need to ensure that a unique timeline id is chosen in every case,
377 : * but let's avoid the confusion of timeline id collisions where we
378 : * can.
379 : */
380 276 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
381 :
382 : /*
383 : * Create temporary replication slot if requested, and update slot
384 : * name in shared memory. (Note the slot name cannot already be set
385 : * in this case.)
386 : */
387 276 : if (is_temp_slot)
388 : {
389 0 : snprintf(slotname, sizeof(slotname),
390 : "pg_walreceiver_%lld",
391 0 : (long long int) walrcv_get_backend_pid(wrconn));
392 :
393 0 : walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
394 :
395 0 : SpinLockAcquire(&walrcv->mutex);
396 0 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
397 0 : SpinLockRelease(&walrcv->mutex);
398 : }
399 :
400 : /*
401 : * Start streaming.
402 : *
403 : * We'll try to start at the requested starting point and timeline,
404 : * even if it's different from the server's latest timeline. In case
405 : * we've already reached the end of the old timeline, the server will
406 : * finish the streaming immediately, and we will go back to await
407 : * orders from the startup process. If recovery_target_timeline is
408 : * 'latest', the startup process will scan pg_wal and find the new
409 : * history file, bump recovery target timeline, and ask us to restart
410 : * on the new timeline.
411 : */
412 276 : options.logical = false;
413 276 : options.startpoint = startpoint;
414 276 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
415 276 : options.proto.physical.startpointTLI = startpointTLI;
416 276 : if (walrcv_startstreaming(wrconn, &options))
417 : {
418 274 : if (first_stream)
419 274 : ereport(LOG,
420 : (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
421 : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
422 : else
423 0 : ereport(LOG,
424 : (errmsg("restarted WAL streaming at %X/%X on timeline %u",
425 : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
426 274 : first_stream = false;
427 :
428 : /* Initialize LogstreamResult and buffers for processing messages */
429 274 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
430 274 : initStringInfo(&reply_message);
431 :
432 : /* Initialize nap wakeup times. */
433 274 : now = GetCurrentTimestamp();
434 1370 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
435 1096 : WalRcvComputeNextWakeup(i, now);
436 :
437 : /* Send initial reply/feedback messages. */
438 274 : XLogWalRcvSendReply(true, false);
439 274 : XLogWalRcvSendHSFeedback(true);
440 :
441 : /* Loop until end-of-streaming or error */
442 : for (;;)
443 23136 : {
444 : char *buf;
445 : int len;
446 23410 : bool endofwal = false;
447 23410 : pgsocket wait_fd = PGINVALID_SOCKET;
448 : int rc;
449 : TimestampTz nextWakeup;
450 : long nap;
451 :
452 : /*
453 : * Exit walreceiver if we're not in recovery. This should not
454 : * happen, but cross-check the status here.
455 : */
456 23410 : if (!RecoveryInProgress())
457 0 : ereport(FATAL,
458 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
459 : errmsg("cannot continue WAL streaming, recovery has already ended")));
460 :
461 : /* Process any requests or signals received recently */
462 23410 : ProcessWalRcvInterrupts();
463 :
464 23410 : if (ConfigReloadPending)
465 : {
466 48 : ConfigReloadPending = false;
467 48 : ProcessConfigFile(PGC_SIGHUP);
468 : /* recompute wakeup times */
469 48 : now = GetCurrentTimestamp();
470 240 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
471 192 : WalRcvComputeNextWakeup(i, now);
472 48 : XLogWalRcvSendHSFeedback(true);
473 : }
474 :
475 : /* See if we can read data immediately */
476 23410 : len = walrcv_receive(wrconn, &buf, &wait_fd);
477 23350 : if (len != 0)
478 : {
479 : /*
480 : * Process the received data, and any subsequent data we
481 : * can read without blocking.
482 : */
483 : for (;;)
484 : {
485 198994 : if (len > 0)
486 : {
487 : /*
488 : * Something was received from primary, so adjust
489 : * the ping and terminate wakeup times.
490 : */
491 182626 : now = GetCurrentTimestamp();
492 182626 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
493 : now);
494 182626 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
495 182626 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
496 : startpointTLI);
497 : }
498 16368 : else if (len == 0)
499 16292 : break;
500 76 : else if (len < 0)
501 : {
502 76 : ereport(LOG,
503 : (errmsg("replication terminated by primary server"),
504 : errdetail("End of WAL reached on timeline %u at %X/%X.",
505 : startpointTLI,
506 : LSN_FORMAT_ARGS(LogstreamResult.Write))));
507 76 : endofwal = true;
508 76 : break;
509 : }
510 182626 : len = walrcv_receive(wrconn, &buf, &wait_fd);
511 : }
512 :
513 : /* Let the primary know that we received some data. */
514 16368 : XLogWalRcvSendReply(false, false);
515 :
516 : /*
517 : * If we've written some records, flush them to disk and
518 : * let the startup process and primary server know about
519 : * them.
520 : */
521 16368 : XLogWalRcvFlush(false, startpointTLI);
522 : }
523 :
524 : /* Check if we need to exit the streaming loop. */
525 23348 : if (endofwal)
526 76 : break;
527 :
528 : /* Find the soonest wakeup time, to limit our nap. */
529 23272 : nextWakeup = TIMESTAMP_INFINITY;
530 116360 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
531 93088 : nextWakeup = Min(wakeup[i], nextWakeup);
532 :
533 : /* Calculate the nap time, clamping as necessary. */
534 23272 : now = GetCurrentTimestamp();
535 23272 : nap = TimestampDifferenceMilliseconds(now, nextWakeup);
536 :
537 : /*
538 : * Ideally we would reuse a WaitEventSet object repeatedly
539 : * here to avoid the overheads of WaitLatchOrSocket on epoll
540 : * systems, but we can't be sure that libpq (or any other
541 : * walreceiver implementation) has the same socket (even if
542 : * the fd is the same number, it may have been closed and
543 : * reopened since the last time). In future, if there is a
544 : * function for removing sockets from WaitEventSet, then we
545 : * could add and remove just the socket each time, potentially
546 : * avoiding some system calls.
547 : */
548 : Assert(wait_fd != PGINVALID_SOCKET);
549 23272 : rc = WaitLatchOrSocket(MyLatch,
550 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
551 : WL_TIMEOUT | WL_LATCH_SET,
552 : wait_fd,
553 : nap,
554 : WAIT_EVENT_WAL_RECEIVER_MAIN);
555 23272 : if (rc & WL_LATCH_SET)
556 : {
557 6362 : ResetLatch(MyLatch);
558 6362 : ProcessWalRcvInterrupts();
559 :
560 6226 : if (walrcv->force_reply)
561 : {
562 : /*
563 : * The recovery process has asked us to send apply
564 : * feedback now. Make sure the flag is really set to
565 : * false in shared memory before sending the reply, so
566 : * we don't miss a new request for a reply.
567 : */
568 6146 : walrcv->force_reply = false;
569 6146 : pg_memory_barrier();
570 6146 : XLogWalRcvSendReply(true, false);
571 : }
572 : }
573 23136 : if (rc & WL_TIMEOUT)
574 : {
575 : /*
576 : * We didn't receive anything new. If we haven't heard
577 : * anything from the server for more than
578 : * wal_receiver_timeout / 2, ping the server. Also, if
579 : * it's been longer than wal_receiver_status_interval
580 : * since the last update we sent, send a status update to
581 : * the primary anyway, to report any progress in applying
582 : * WAL.
583 : */
584 8 : bool requestReply = false;
585 :
586 : /*
587 : * Report pending statistics to the cumulative stats
588 : * system. This location is useful for the report as it
589 : * is not within a tight loop in the WAL receiver, to
590 : * avoid bloating pgstats with requests, while also making
591 : * sure that the reports happen each time a status update
592 : * is sent.
593 : */
594 8 : pgstat_report_wal(false);
595 :
596 : /*
597 : * Check if time since last receive from primary has
598 : * reached the configured limit.
599 : */
600 8 : now = GetCurrentTimestamp();
601 8 : if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
602 0 : ereport(ERROR,
603 : (errcode(ERRCODE_CONNECTION_FAILURE),
604 : errmsg("terminating walreceiver due to timeout")));
605 :
606 : /*
607 : * If we didn't receive anything new for half of receiver
608 : * replication timeout, then ping the server.
609 : */
610 8 : if (now >= wakeup[WALRCV_WAKEUP_PING])
611 : {
612 0 : requestReply = true;
613 0 : wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
614 : }
615 :
616 8 : XLogWalRcvSendReply(requestReply, requestReply);
617 8 : XLogWalRcvSendHSFeedback(false);
618 : }
619 : }
620 :
621 : /*
622 : * The backend finished streaming. Exit streaming COPY-mode from
623 : * our side, too.
624 : */
625 76 : walrcv_endstreaming(wrconn, &primaryTLI);
626 :
627 : /*
628 : * If the server had switched to a new timeline that we didn't
629 : * know about when we began streaming, fetch its timeline history
630 : * file now.
631 : */
632 24 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
633 : }
634 : else
635 0 : ereport(LOG,
636 : (errmsg("primary server contains no more WAL on requested timeline %u",
637 : startpointTLI)));
638 :
639 : /*
640 : * End of WAL reached on the requested timeline. Close the last
641 : * segment, and await for new orders from the startup process.
642 : */
643 24 : if (recvFile >= 0)
644 : {
645 : char xlogfname[MAXFNAMELEN];
646 :
647 22 : XLogWalRcvFlush(false, startpointTLI);
648 22 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
649 22 : if (close(recvFile) != 0)
650 0 : ereport(PANIC,
651 : (errcode_for_file_access(),
652 : errmsg("could not close WAL segment %s: %m",
653 : xlogfname)));
654 :
655 : /*
656 : * Create .done file forcibly to prevent the streamed segment from
657 : * being archived later.
658 : */
659 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
660 22 : XLogArchiveForceDone(xlogfname);
661 : else
662 0 : XLogArchiveNotify(xlogfname);
663 : }
664 24 : recvFile = -1;
665 :
666 24 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
667 24 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
668 : }
669 : /* not reached */
670 : }
671 :
672 : /*
673 : * Wait for startup process to set receiveStart and receiveStartTLI.
674 : */
675 : static void
676 24 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
677 : {
678 24 : WalRcvData *walrcv = WalRcv;
679 : int state;
680 :
681 24 : SpinLockAcquire(&walrcv->mutex);
682 24 : state = walrcv->walRcvState;
683 24 : if (state != WALRCV_STREAMING)
684 : {
685 0 : SpinLockRelease(&walrcv->mutex);
686 0 : if (state == WALRCV_STOPPING)
687 0 : proc_exit(0);
688 : else
689 0 : elog(FATAL, "unexpected walreceiver state");
690 : }
691 24 : walrcv->walRcvState = WALRCV_WAITING;
692 24 : walrcv->receiveStart = InvalidXLogRecPtr;
693 24 : walrcv->receiveStartTLI = 0;
694 24 : SpinLockRelease(&walrcv->mutex);
695 :
696 24 : set_ps_display("idle");
697 :
698 : /*
699 : * nudge startup process to notice that we've stopped streaming and are
700 : * now waiting for instructions.
701 : */
702 24 : WakeupRecovery();
703 : for (;;)
704 : {
705 48 : ResetLatch(MyLatch);
706 :
707 48 : ProcessWalRcvInterrupts();
708 :
709 24 : SpinLockAcquire(&walrcv->mutex);
710 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
711 : walrcv->walRcvState == WALRCV_WAITING ||
712 : walrcv->walRcvState == WALRCV_STOPPING);
713 24 : if (walrcv->walRcvState == WALRCV_RESTARTING)
714 : {
715 : /*
716 : * No need to handle changes in primary_conninfo or
717 : * primary_slot_name here. Startup process will signal us to
718 : * terminate in case those change.
719 : */
720 0 : *startpoint = walrcv->receiveStart;
721 0 : *startpointTLI = walrcv->receiveStartTLI;
722 0 : walrcv->walRcvState = WALRCV_STREAMING;
723 0 : SpinLockRelease(&walrcv->mutex);
724 0 : break;
725 : }
726 24 : if (walrcv->walRcvState == WALRCV_STOPPING)
727 : {
728 : /*
729 : * We should've received SIGTERM if the startup process wants us
730 : * to die, but might as well check it here too.
731 : */
732 0 : SpinLockRelease(&walrcv->mutex);
733 0 : exit(1);
734 : }
735 24 : SpinLockRelease(&walrcv->mutex);
736 :
737 24 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
738 : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
739 : }
740 :
741 0 : if (update_process_title)
742 : {
743 : char activitymsg[50];
744 :
745 0 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
746 0 : LSN_FORMAT_ARGS(*startpoint));
747 0 : set_ps_display(activitymsg);
748 : }
749 0 : }
750 :
751 : /*
752 : * Fetch any missing timeline history files between 'first' and 'last'
753 : * (inclusive) from the server.
754 : */
755 : static void
756 300 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
757 : {
758 : TimeLineID tli;
759 :
760 646 : for (tli = first; tli <= last; tli++)
761 : {
762 : /* there's no history file for timeline 1 */
763 346 : if (tli != 1 && !existsTimeLineHistory(tli))
764 : {
765 : char *fname;
766 : char *content;
767 : int len;
768 : char expectedfname[MAXFNAMELEN];
769 :
770 22 : ereport(LOG,
771 : (errmsg("fetching timeline history file for timeline %u from primary server",
772 : tli)));
773 :
774 22 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
775 :
776 : /*
777 : * Check that the filename on the primary matches what we
778 : * calculated ourselves. This is just a sanity check, it should
779 : * always match.
780 : */
781 22 : TLHistoryFileName(expectedfname, tli);
782 22 : if (strcmp(fname, expectedfname) != 0)
783 0 : ereport(ERROR,
784 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
785 : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
786 : tli)));
787 :
788 : /*
789 : * Write the file to pg_wal.
790 : */
791 22 : writeTimeLineHistoryFile(tli, content, len);
792 :
793 : /*
794 : * Mark the streamed history file as ready for archiving if
795 : * archive_mode is always.
796 : */
797 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
798 22 : XLogArchiveForceDone(fname);
799 : else
800 0 : XLogArchiveNotify(fname);
801 :
802 22 : pfree(fname);
803 22 : pfree(content);
804 : }
805 : }
806 300 : }
807 :
808 : /*
809 : * Mark us as STOPPED in shared memory at exit.
810 : */
811 : static void
812 436 : WalRcvDie(int code, Datum arg)
813 : {
814 436 : WalRcvData *walrcv = WalRcv;
815 436 : TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
816 :
817 : Assert(*startpointTLI_p != 0);
818 :
819 : /* Ensure that all WAL records received are flushed to disk */
820 436 : XLogWalRcvFlush(true, *startpointTLI_p);
821 :
822 : /* Mark ourselves inactive in shared memory */
823 436 : SpinLockAcquire(&walrcv->mutex);
824 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
825 : walrcv->walRcvState == WALRCV_RESTARTING ||
826 : walrcv->walRcvState == WALRCV_STARTING ||
827 : walrcv->walRcvState == WALRCV_WAITING ||
828 : walrcv->walRcvState == WALRCV_STOPPING);
829 : Assert(walrcv->pid == MyProcPid);
830 436 : walrcv->walRcvState = WALRCV_STOPPED;
831 436 : walrcv->pid = 0;
832 436 : walrcv->procno = INVALID_PROC_NUMBER;
833 436 : walrcv->ready_to_display = false;
834 436 : SpinLockRelease(&walrcv->mutex);
835 :
836 436 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
837 :
838 : /* Terminate the connection gracefully. */
839 436 : if (wrconn != NULL)
840 276 : walrcv_disconnect(wrconn);
841 :
842 : /* Wake up the startup process to notice promptly that we're gone */
843 436 : WakeupRecovery();
844 436 : }
845 :
846 : /*
847 : * Accept the message from XLOG stream, and process it.
848 : */
849 : static void
850 182626 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
851 : {
852 : int hdrlen;
853 : XLogRecPtr dataStart;
854 : XLogRecPtr walEnd;
855 : TimestampTz sendTime;
856 : bool replyRequested;
857 :
858 182626 : switch (type)
859 : {
860 182594 : case 'w': /* WAL records */
861 : {
862 : StringInfoData incoming_message;
863 :
864 182594 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
865 182594 : if (len < hdrlen)
866 0 : ereport(ERROR,
867 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
868 : errmsg_internal("invalid WAL message received from primary")));
869 :
870 : /* initialize a StringInfo with the given buffer */
871 182594 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
872 :
873 : /* read the fields */
874 182594 : dataStart = pq_getmsgint64(&incoming_message);
875 182594 : walEnd = pq_getmsgint64(&incoming_message);
876 182594 : sendTime = pq_getmsgint64(&incoming_message);
877 182594 : ProcessWalSndrMessage(walEnd, sendTime);
878 :
879 182594 : buf += hdrlen;
880 182594 : len -= hdrlen;
881 182594 : XLogWalRcvWrite(buf, len, dataStart, tli);
882 182594 : break;
883 : }
884 32 : case 'k': /* Keepalive */
885 : {
886 : StringInfoData incoming_message;
887 :
888 32 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
889 32 : if (len != hdrlen)
890 0 : ereport(ERROR,
891 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
892 : errmsg_internal("invalid keepalive message received from primary")));
893 :
894 : /* initialize a StringInfo with the given buffer */
895 32 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
896 :
897 : /* read the fields */
898 32 : walEnd = pq_getmsgint64(&incoming_message);
899 32 : sendTime = pq_getmsgint64(&incoming_message);
900 32 : replyRequested = pq_getmsgbyte(&incoming_message);
901 :
902 32 : ProcessWalSndrMessage(walEnd, sendTime);
903 :
904 : /* If the primary requested a reply, send one immediately */
905 32 : if (replyRequested)
906 32 : XLogWalRcvSendReply(true, false);
907 32 : break;
908 : }
909 0 : default:
910 0 : ereport(ERROR,
911 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
912 : errmsg_internal("invalid replication message type %d",
913 : type)));
914 : }
915 182626 : }
916 :
917 : /*
918 : * Write XLOG data to disk.
919 : */
920 : static void
921 182594 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
922 : {
923 : int startoff;
924 : int byteswritten;
925 : instr_time start;
926 :
927 : Assert(tli != 0);
928 :
929 365432 : while (nbytes > 0)
930 : {
931 : int segbytes;
932 :
933 : /* Close the current segment if it's completed */
934 182838 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
935 244 : XLogWalRcvClose(recptr, tli);
936 :
937 182838 : if (recvFile < 0)
938 : {
939 : /* Create/use new log file */
940 1636 : XLByteToSeg(recptr, recvSegNo, wal_segment_size);
941 1636 : recvFile = XLogFileInit(recvSegNo, tli);
942 1636 : recvFileTLI = tli;
943 : }
944 :
945 : /* Calculate the start offset of the received logs */
946 182838 : startoff = XLogSegmentOffset(recptr, wal_segment_size);
947 :
948 182838 : if (startoff + nbytes > wal_segment_size)
949 244 : segbytes = wal_segment_size - startoff;
950 : else
951 182594 : segbytes = nbytes;
952 :
953 : /* OK to write the logs */
954 182838 : errno = 0;
955 :
956 : /*
957 : * Measure I/O timing to write WAL data, for pg_stat_io.
958 : */
959 182838 : start = pgstat_prepare_io_time(track_wal_io_timing);
960 :
961 182838 : pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
962 182838 : byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
963 182838 : pgstat_report_wait_end();
964 :
965 182838 : pgstat_count_io_op_time(IOOBJECT_WAL, IOCONTEXT_NORMAL,
966 : IOOP_WRITE, start, 1, byteswritten);
967 :
968 182838 : if (byteswritten <= 0)
969 : {
970 : char xlogfname[MAXFNAMELEN];
971 : int save_errno;
972 :
973 : /* if write didn't set errno, assume no disk space */
974 0 : if (errno == 0)
975 0 : errno = ENOSPC;
976 :
977 0 : save_errno = errno;
978 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
979 0 : errno = save_errno;
980 0 : ereport(PANIC,
981 : (errcode_for_file_access(),
982 : errmsg("could not write to WAL segment %s "
983 : "at offset %d, length %lu: %m",
984 : xlogfname, startoff, (unsigned long) segbytes)));
985 : }
986 :
987 : /* Update state for write */
988 182838 : recptr += byteswritten;
989 :
990 182838 : nbytes -= byteswritten;
991 182838 : buf += byteswritten;
992 :
993 182838 : LogstreamResult.Write = recptr;
994 : }
995 :
996 : /* Update shared-memory status */
997 182594 : pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
998 :
999 : /*
1000 : * Close the current segment if it's fully written up in the last cycle of
1001 : * the loop, to create its archive notification file soon. Otherwise WAL
1002 : * archiving of the segment will be delayed until any data in the next
1003 : * segment is received and written.
1004 : */
1005 182594 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
1006 1158 : XLogWalRcvClose(recptr, tli);
1007 182594 : }
1008 :
1009 : /*
1010 : * Flush the log to disk.
1011 : *
1012 : * If we're in the midst of dying, it's unwise to do anything that might throw
1013 : * an error, so we skip sending a reply in that case.
1014 : */
1015 : static void
1016 18228 : XLogWalRcvFlush(bool dying, TimeLineID tli)
1017 : {
1018 : Assert(tli != 0);
1019 :
1020 18228 : if (LogstreamResult.Flush < LogstreamResult.Write)
1021 : {
1022 17518 : WalRcvData *walrcv = WalRcv;
1023 :
1024 17518 : issue_xlog_fsync(recvFile, recvSegNo, tli);
1025 :
1026 17518 : LogstreamResult.Flush = LogstreamResult.Write;
1027 :
1028 : /* Update shared-memory status */
1029 17518 : SpinLockAcquire(&walrcv->mutex);
1030 17518 : if (walrcv->flushedUpto < LogstreamResult.Flush)
1031 : {
1032 17518 : walrcv->latestChunkStart = walrcv->flushedUpto;
1033 17518 : walrcv->flushedUpto = LogstreamResult.Flush;
1034 17518 : walrcv->receivedTLI = tli;
1035 : }
1036 17518 : SpinLockRelease(&walrcv->mutex);
1037 :
1038 : /* Signal the startup process and walsender that new WAL has arrived */
1039 17518 : WakeupRecovery();
1040 17518 : if (AllowCascadeReplication())
1041 17518 : WalSndWakeup(true, false);
1042 :
1043 : /* Report XLOG streaming progress in PS display */
1044 17518 : if (update_process_title)
1045 : {
1046 : char activitymsg[50];
1047 :
1048 17518 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1049 17518 : LSN_FORMAT_ARGS(LogstreamResult.Write));
1050 17518 : set_ps_display(activitymsg);
1051 : }
1052 :
1053 : /* Also let the primary know that we made some progress */
1054 17518 : if (!dying)
1055 : {
1056 17516 : XLogWalRcvSendReply(false, false);
1057 17516 : XLogWalRcvSendHSFeedback(false);
1058 : }
1059 : }
1060 18228 : }
1061 :
1062 : /*
1063 : * Close the current segment.
1064 : *
1065 : * Flush the segment to disk before closing it. Otherwise we have to
1066 : * reopen and fsync it later.
1067 : *
1068 : * Create an archive notification file since the segment is known completed.
1069 : */
1070 : static void
1071 1402 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
1072 : {
1073 : char xlogfname[MAXFNAMELEN];
1074 :
1075 : Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1076 : Assert(tli != 0);
1077 :
1078 : /*
1079 : * fsync() and close current file before we switch to next one. We would
1080 : * otherwise have to reopen this file to fsync it later
1081 : */
1082 1402 : XLogWalRcvFlush(false, tli);
1083 :
1084 1402 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1085 :
1086 : /*
1087 : * XLOG segment files will be re-read by recovery in startup process soon,
1088 : * so we don't advise the OS to release cache pages associated with the
1089 : * file like XLogFileClose() does.
1090 : */
1091 1402 : if (close(recvFile) != 0)
1092 0 : ereport(PANIC,
1093 : (errcode_for_file_access(),
1094 : errmsg("could not close WAL segment %s: %m",
1095 : xlogfname)));
1096 :
1097 : /*
1098 : * Create .done file forcibly to prevent the streamed segment from being
1099 : * archived later.
1100 : */
1101 1402 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1102 1402 : XLogArchiveForceDone(xlogfname);
1103 : else
1104 0 : XLogArchiveNotify(xlogfname);
1105 :
1106 1402 : recvFile = -1;
1107 1402 : }
1108 :
1109 : /*
1110 : * Send reply message to primary, indicating our current WAL locations, oldest
1111 : * xmin and the current time.
1112 : *
1113 : * If 'force' is not set, the message is only sent if enough time has
1114 : * passed since last status update to reach wal_receiver_status_interval.
1115 : * If wal_receiver_status_interval is disabled altogether and 'force' is
1116 : * false, this is a no-op.
1117 : *
1118 : * If 'requestReply' is true, requests the server to reply immediately upon
1119 : * receiving this message. This is used for heartbeats, when approaching
1120 : * wal_receiver_timeout.
1121 : */
1122 : static void
1123 40344 : XLogWalRcvSendReply(bool force, bool requestReply)
1124 : {
1125 : static XLogRecPtr writePtr = 0;
1126 : static XLogRecPtr flushPtr = 0;
1127 : XLogRecPtr applyPtr;
1128 : TimestampTz now;
1129 :
1130 : /*
1131 : * If the user doesn't want status to be reported to the primary, be sure
1132 : * to exit before doing anything at all.
1133 : */
1134 40344 : if (!force && wal_receiver_status_interval <= 0)
1135 0 : return;
1136 :
1137 : /* Get current timestamp. */
1138 40344 : now = GetCurrentTimestamp();
1139 :
1140 : /*
1141 : * We can compare the write and flush positions to the last message we
1142 : * sent without taking any lock, but the apply position requires a spin
1143 : * lock, so we don't check that unless something else has changed or 10
1144 : * seconds have passed. This means that the apply WAL location will
1145 : * appear, from the primary's point of view, to lag slightly, but since
1146 : * this is only for reporting purposes and only on idle systems, that's
1147 : * probably OK.
1148 : */
1149 40344 : if (!force
1150 33892 : && writePtr == LogstreamResult.Write
1151 16334 : && flushPtr == LogstreamResult.Flush
1152 220 : && now < wakeup[WALRCV_WAKEUP_REPLY])
1153 220 : return;
1154 :
1155 : /* Make sure we wake up when it's time to send another reply. */
1156 40124 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
1157 :
1158 : /* Construct a new message */
1159 40124 : writePtr = LogstreamResult.Write;
1160 40124 : flushPtr = LogstreamResult.Flush;
1161 40124 : applyPtr = GetXLogReplayRecPtr(NULL);
1162 :
1163 40124 : resetStringInfo(&reply_message);
1164 40124 : pq_sendbyte(&reply_message, 'r');
1165 40124 : pq_sendint64(&reply_message, writePtr);
1166 40124 : pq_sendint64(&reply_message, flushPtr);
1167 40124 : pq_sendint64(&reply_message, applyPtr);
1168 40124 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1169 40124 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1170 :
1171 : /* Send it */
1172 40124 : elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1173 : LSN_FORMAT_ARGS(writePtr),
1174 : LSN_FORMAT_ARGS(flushPtr),
1175 : LSN_FORMAT_ARGS(applyPtr),
1176 : requestReply ? " (reply requested)" : "");
1177 :
1178 40124 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1179 : }
1180 :
1181 : /*
1182 : * Send hot standby feedback message to primary, plus the current time,
1183 : * in case they don't have a watch.
1184 : *
1185 : * If the user disables feedback, send one final message to tell sender
1186 : * to forget about the xmin on this standby. We also send this message
1187 : * on first connect because a previous connection might have set xmin
1188 : * on a replication slot. (If we're not using a slot it's harmless to
1189 : * send a feedback message explicitly setting InvalidTransactionId).
1190 : */
1191 : static void
1192 17846 : XLogWalRcvSendHSFeedback(bool immed)
1193 : {
1194 : TimestampTz now;
1195 : FullTransactionId nextFullXid;
1196 : TransactionId nextXid;
1197 : uint32 xmin_epoch,
1198 : catalog_xmin_epoch;
1199 : TransactionId xmin,
1200 : catalog_xmin;
1201 :
1202 : /* initially true so we always send at least one feedback message */
1203 : static bool primary_has_standby_xmin = true;
1204 :
1205 : /*
1206 : * If the user doesn't want status to be reported to the primary, be sure
1207 : * to exit before doing anything at all.
1208 : */
1209 17846 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1210 17376 : !primary_has_standby_xmin)
1211 17530 : return;
1212 :
1213 : /* Get current timestamp. */
1214 700 : now = GetCurrentTimestamp();
1215 :
1216 : /* Send feedback at most once per wal_receiver_status_interval. */
1217 700 : if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1218 382 : return;
1219 :
1220 : /* Make sure we wake up when it's time to send feedback again. */
1221 318 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1222 :
1223 : /*
1224 : * If Hot Standby is not yet accepting connections there is nothing to
1225 : * send. Check this after the interval has expired to reduce number of
1226 : * calls.
1227 : *
1228 : * Bailing out here also ensures that we don't send feedback until we've
1229 : * read our own replication slot state, so we don't tell the primary to
1230 : * discard needed xmin or catalog_xmin from any slots that may exist on
1231 : * this replica.
1232 : */
1233 318 : if (!HotStandbyActive())
1234 2 : return;
1235 :
1236 : /*
1237 : * Make the expensive call to get the oldest xmin once we are certain
1238 : * everything else has been checked.
1239 : */
1240 316 : if (hot_standby_feedback)
1241 : {
1242 90 : GetReplicationHorizons(&xmin, &catalog_xmin);
1243 : }
1244 : else
1245 : {
1246 226 : xmin = InvalidTransactionId;
1247 226 : catalog_xmin = InvalidTransactionId;
1248 : }
1249 :
1250 : /*
1251 : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1252 : * the epoch boundary.
1253 : */
1254 316 : nextFullXid = ReadNextFullTransactionId();
1255 316 : nextXid = XidFromFullTransactionId(nextFullXid);
1256 316 : xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1257 316 : catalog_xmin_epoch = xmin_epoch;
1258 316 : if (nextXid < xmin)
1259 0 : xmin_epoch--;
1260 316 : if (nextXid < catalog_xmin)
1261 0 : catalog_xmin_epoch--;
1262 :
1263 316 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1264 : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1265 :
1266 : /* Construct the message and send it. */
1267 316 : resetStringInfo(&reply_message);
1268 316 : pq_sendbyte(&reply_message, 'h');
1269 316 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1270 316 : pq_sendint32(&reply_message, xmin);
1271 316 : pq_sendint32(&reply_message, xmin_epoch);
1272 316 : pq_sendint32(&reply_message, catalog_xmin);
1273 316 : pq_sendint32(&reply_message, catalog_xmin_epoch);
1274 316 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1275 316 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1276 90 : primary_has_standby_xmin = true;
1277 : else
1278 226 : primary_has_standby_xmin = false;
1279 : }
1280 :
1281 : /*
1282 : * Update shared memory status upon receiving a message from primary.
1283 : *
1284 : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1285 : * message, reported by primary.
1286 : */
1287 : static void
1288 182626 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1289 : {
1290 182626 : WalRcvData *walrcv = WalRcv;
1291 182626 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1292 :
1293 : /* Update shared-memory status */
1294 182626 : SpinLockAcquire(&walrcv->mutex);
1295 182626 : if (walrcv->latestWalEnd < walEnd)
1296 13858 : walrcv->latestWalEndTime = sendTime;
1297 182626 : walrcv->latestWalEnd = walEnd;
1298 182626 : walrcv->lastMsgSendTime = sendTime;
1299 182626 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1300 182626 : SpinLockRelease(&walrcv->mutex);
1301 :
1302 182626 : if (message_level_is_interesting(DEBUG2))
1303 : {
1304 : char *sendtime;
1305 : char *receipttime;
1306 : int applyDelay;
1307 :
1308 : /* Copy because timestamptz_to_str returns a static buffer */
1309 760 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1310 760 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1311 760 : applyDelay = GetReplicationApplyDelay();
1312 :
1313 : /* apply delay is not available */
1314 760 : if (applyDelay == -1)
1315 2 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1316 : sendtime,
1317 : receipttime,
1318 : GetReplicationTransferLatency());
1319 : else
1320 758 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1321 : sendtime,
1322 : receipttime,
1323 : applyDelay,
1324 : GetReplicationTransferLatency());
1325 :
1326 760 : pfree(sendtime);
1327 760 : pfree(receipttime);
1328 : }
1329 182626 : }
1330 :
1331 : /*
1332 : * Compute the next wakeup time for a given wakeup reason. Can be called to
1333 : * initialize a wakeup time, to adjust it for the next wakeup, or to
1334 : * reinitialize it when GUCs have changed. We ask the caller to pass in the
1335 : * value of "now" because this frequently avoids multiple calls of
1336 : * GetCurrentTimestamp(). It had better be a reasonably up-to-date value
1337 : * though.
1338 : */
1339 : static void
1340 406982 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1341 : {
1342 406982 : switch (reason)
1343 : {
1344 182948 : case WALRCV_WAKEUP_TERMINATE:
1345 182948 : if (wal_receiver_timeout <= 0)
1346 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1347 : else
1348 182948 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
1349 182948 : break;
1350 182948 : case WALRCV_WAKEUP_PING:
1351 182948 : if (wal_receiver_timeout <= 0)
1352 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1353 : else
1354 182948 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
1355 182948 : break;
1356 640 : case WALRCV_WAKEUP_HSFEEDBACK:
1357 640 : if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
1358 468 : wakeup[reason] = TIMESTAMP_INFINITY;
1359 : else
1360 172 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1361 640 : break;
1362 40446 : case WALRCV_WAKEUP_REPLY:
1363 40446 : if (wal_receiver_status_interval <= 0)
1364 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1365 : else
1366 40446 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1367 40446 : break;
1368 : /* there's intentionally no default: here */
1369 : }
1370 406982 : }
1371 :
1372 : /*
1373 : * Wake up the walreceiver main loop.
1374 : *
1375 : * This is called by the startup process whenever interesting xlog records
1376 : * are applied, so that walreceiver can check if it needs to send an apply
1377 : * notification back to the primary which may be waiting in a COMMIT with
1378 : * synchronous_commit = remote_apply.
1379 : */
1380 : void
1381 6118 : WalRcvForceReply(void)
1382 : {
1383 : ProcNumber procno;
1384 :
1385 6118 : WalRcv->force_reply = true;
1386 : /* fetching the proc number is probably atomic, but don't rely on it */
1387 6118 : SpinLockAcquire(&WalRcv->mutex);
1388 6118 : procno = WalRcv->procno;
1389 6118 : SpinLockRelease(&WalRcv->mutex);
1390 6118 : if (procno != INVALID_PROC_NUMBER)
1391 5816 : SetLatch(&GetPGProcByNumber(procno)->procLatch);
1392 6118 : }
1393 :
1394 : /*
1395 : * Return a string constant representing the state. This is used
1396 : * in system functions and views, and should *not* be translated.
1397 : */
1398 : static const char *
1399 0 : WalRcvGetStateString(WalRcvState state)
1400 : {
1401 0 : switch (state)
1402 : {
1403 0 : case WALRCV_STOPPED:
1404 0 : return "stopped";
1405 0 : case WALRCV_STARTING:
1406 0 : return "starting";
1407 0 : case WALRCV_STREAMING:
1408 0 : return "streaming";
1409 0 : case WALRCV_WAITING:
1410 0 : return "waiting";
1411 0 : case WALRCV_RESTARTING:
1412 0 : return "restarting";
1413 0 : case WALRCV_STOPPING:
1414 0 : return "stopping";
1415 : }
1416 0 : return "UNKNOWN";
1417 : }
1418 :
1419 : /*
1420 : * Returns activity of WAL receiver, including pid, state and xlog locations
1421 : * received from the WAL sender of another server.
1422 : */
1423 : Datum
1424 6 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1425 : {
1426 : TupleDesc tupdesc;
1427 : Datum *values;
1428 : bool *nulls;
1429 : int pid;
1430 : bool ready_to_display;
1431 : WalRcvState state;
1432 : XLogRecPtr receive_start_lsn;
1433 : TimeLineID receive_start_tli;
1434 : XLogRecPtr written_lsn;
1435 : XLogRecPtr flushed_lsn;
1436 : TimeLineID received_tli;
1437 : TimestampTz last_send_time;
1438 : TimestampTz last_receipt_time;
1439 : XLogRecPtr latest_end_lsn;
1440 : TimestampTz latest_end_time;
1441 : char sender_host[NI_MAXHOST];
1442 6 : int sender_port = 0;
1443 : char slotname[NAMEDATALEN];
1444 : char conninfo[MAXCONNINFO];
1445 :
1446 : /* Take a lock to ensure value consistency */
1447 6 : SpinLockAcquire(&WalRcv->mutex);
1448 6 : pid = (int) WalRcv->pid;
1449 6 : ready_to_display = WalRcv->ready_to_display;
1450 6 : state = WalRcv->walRcvState;
1451 6 : receive_start_lsn = WalRcv->receiveStart;
1452 6 : receive_start_tli = WalRcv->receiveStartTLI;
1453 6 : flushed_lsn = WalRcv->flushedUpto;
1454 6 : received_tli = WalRcv->receivedTLI;
1455 6 : last_send_time = WalRcv->lastMsgSendTime;
1456 6 : last_receipt_time = WalRcv->lastMsgReceiptTime;
1457 6 : latest_end_lsn = WalRcv->latestWalEnd;
1458 6 : latest_end_time = WalRcv->latestWalEndTime;
1459 6 : strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1460 6 : strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1461 6 : sender_port = WalRcv->sender_port;
1462 6 : strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1463 6 : SpinLockRelease(&WalRcv->mutex);
1464 :
1465 : /*
1466 : * No WAL receiver (or not ready yet), just return a tuple with NULL
1467 : * values
1468 : */
1469 6 : if (pid == 0 || !ready_to_display)
1470 6 : PG_RETURN_NULL();
1471 :
1472 : /*
1473 : * Read "writtenUpto" without holding a spinlock. Note that it may not be
1474 : * consistent with the other shared variables of the WAL receiver
1475 : * protected by a spinlock, but this should not be used for data integrity
1476 : * checks.
1477 : */
1478 0 : written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1479 :
1480 : /* determine result type */
1481 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1482 0 : elog(ERROR, "return type must be a row type");
1483 :
1484 0 : values = palloc0(sizeof(Datum) * tupdesc->natts);
1485 0 : nulls = palloc0(sizeof(bool) * tupdesc->natts);
1486 :
1487 : /* Fetch values */
1488 0 : values[0] = Int32GetDatum(pid);
1489 :
1490 0 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1491 : {
1492 : /*
1493 : * Only superusers and roles with privileges of pg_read_all_stats can
1494 : * see details. Other users only get the pid value to know whether it
1495 : * is a WAL receiver, but no details.
1496 : */
1497 0 : memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1498 : }
1499 : else
1500 : {
1501 0 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1502 :
1503 0 : if (XLogRecPtrIsInvalid(receive_start_lsn))
1504 0 : nulls[2] = true;
1505 : else
1506 0 : values[2] = LSNGetDatum(receive_start_lsn);
1507 0 : values[3] = Int32GetDatum(receive_start_tli);
1508 0 : if (XLogRecPtrIsInvalid(written_lsn))
1509 0 : nulls[4] = true;
1510 : else
1511 0 : values[4] = LSNGetDatum(written_lsn);
1512 0 : if (XLogRecPtrIsInvalid(flushed_lsn))
1513 0 : nulls[5] = true;
1514 : else
1515 0 : values[5] = LSNGetDatum(flushed_lsn);
1516 0 : values[6] = Int32GetDatum(received_tli);
1517 0 : if (last_send_time == 0)
1518 0 : nulls[7] = true;
1519 : else
1520 0 : values[7] = TimestampTzGetDatum(last_send_time);
1521 0 : if (last_receipt_time == 0)
1522 0 : nulls[8] = true;
1523 : else
1524 0 : values[8] = TimestampTzGetDatum(last_receipt_time);
1525 0 : if (XLogRecPtrIsInvalid(latest_end_lsn))
1526 0 : nulls[9] = true;
1527 : else
1528 0 : values[9] = LSNGetDatum(latest_end_lsn);
1529 0 : if (latest_end_time == 0)
1530 0 : nulls[10] = true;
1531 : else
1532 0 : values[10] = TimestampTzGetDatum(latest_end_time);
1533 0 : if (*slotname == '\0')
1534 0 : nulls[11] = true;
1535 : else
1536 0 : values[11] = CStringGetTextDatum(slotname);
1537 0 : if (*sender_host == '\0')
1538 0 : nulls[12] = true;
1539 : else
1540 0 : values[12] = CStringGetTextDatum(sender_host);
1541 0 : if (sender_port == 0)
1542 0 : nulls[13] = true;
1543 : else
1544 0 : values[13] = Int32GetDatum(sender_port);
1545 0 : if (*conninfo == '\0')
1546 0 : nulls[14] = true;
1547 : else
1548 0 : values[14] = CStringGetTextDatum(conninfo);
1549 : }
1550 :
1551 : /* Returns the record as Datum */
1552 0 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1553 : }
|