Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.c
4 : *
5 : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : * is the process in the standby server that takes charge of receiving
7 : * XLOG records from a primary server during streaming replication.
8 : *
9 : * When the startup process determines that it's time to start streaming,
10 : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : * to the primary server (it will be served by a walsender process
12 : * in the primary server), and then keeps receiving XLOG records and
13 : * writing them to the disk as long as the connection is alive. As XLOG
14 : * records are received and flushed to disk, it updates the
15 : * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 : * process of how far it can proceed with XLOG replay.
17 : *
18 : * A WAL receiver cannot directly load GUC parameters used when establishing
19 : * its connection to the primary. Instead it relies on parameter values
20 : * that are passed down by the startup process when streaming is requested.
21 : * This applies, for example, to the replication slot and the connection
22 : * string to be used for the connection with the primary.
23 : *
24 : * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 : * goes into "waiting" mode, and waits for the startup process to give new
26 : * instructions. The startup process will treat that the same as
27 : * disconnection, and will rescan the archive/pg_wal directory. But when the
28 : * startup process wants to try streaming replication again, it will just
29 : * nudge the existing walreceiver process that's waiting, instead of launching
30 : * a new one.
31 : *
32 : * Normal termination is by SIGTERM, which instructs the walreceiver to
33 : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 : * of the connection and a FATAL error are treated not as a crash but as
36 : * normal operation.
37 : *
38 : * This file contains the server-facing parts of walreceiver. The libpq-
39 : * specific parts are in the libpqwalreceiver module. It's loaded
40 : * dynamically to avoid linking the server with libpq.
41 : *
42 : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
43 : *
44 : *
45 : * IDENTIFICATION
46 : * src/backend/replication/walreceiver.c
47 : *
48 : *-------------------------------------------------------------------------
49 : */
50 : #include "postgres.h"
51 :
52 : #include <unistd.h>
53 :
54 : #include "access/htup_details.h"
55 : #include "access/timeline.h"
56 : #include "access/transam.h"
57 : #include "access/xlog_internal.h"
58 : #include "access/xlogarchive.h"
59 : #include "access/xlogrecovery.h"
60 : #include "catalog/pg_authid.h"
61 : #include "funcapi.h"
62 : #include "libpq/pqformat.h"
63 : #include "libpq/pqsignal.h"
64 : #include "miscadmin.h"
65 : #include "pgstat.h"
66 : #include "postmaster/auxprocess.h"
67 : #include "postmaster/interrupt.h"
68 : #include "replication/walreceiver.h"
69 : #include "replication/walsender.h"
70 : #include "storage/ipc.h"
71 : #include "storage/proc.h"
72 : #include "storage/procarray.h"
73 : #include "storage/procsignal.h"
74 : #include "utils/acl.h"
75 : #include "utils/builtins.h"
76 : #include "utils/guc.h"
77 : #include "utils/pg_lsn.h"
78 : #include "utils/ps_status.h"
79 : #include "utils/timestamp.h"
80 :
81 :
82 : /*
83 : * GUC variables. (Other variables that affect walreceiver are in xlog.c
84 : * because they're passed down from the startup process, for better
85 : * synchronization.)
86 : */
87 : int wal_receiver_status_interval;
88 : int wal_receiver_timeout;
89 : bool hot_standby_feedback;
90 :
91 : /* libpqwalreceiver connection */
92 : static WalReceiverConn *wrconn = NULL;
93 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
94 :
95 : /*
96 : * These variables are used similarly to openLogFile/SegNo,
97 : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
98 : * corresponding the filename of recvFile.
99 : */
100 : static int recvFile = -1;
101 : static TimeLineID recvFileTLI = 0;
102 : static XLogSegNo recvSegNo = 0;
103 :
104 : /*
105 : * LogstreamResult indicates the byte positions that we have already
106 : * written/fsynced.
107 : */
108 : static struct
109 : {
110 : XLogRecPtr Write; /* last byte + 1 written out in the standby */
111 : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
112 : } LogstreamResult;
113 :
114 : /*
115 : * Reasons to wake up and perform periodic tasks.
116 : */
117 : typedef enum WalRcvWakeupReason
118 : {
119 : WALRCV_WAKEUP_TERMINATE,
120 : WALRCV_WAKEUP_PING,
121 : WALRCV_WAKEUP_REPLY,
122 : WALRCV_WAKEUP_HSFEEDBACK,
123 : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
124 : } WalRcvWakeupReason;
125 :
126 : /*
127 : * Wake up times for periodic tasks.
128 : */
129 : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
130 :
131 : static StringInfoData reply_message;
132 :
133 : /* Prototypes for private functions */
134 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
135 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
136 : static void WalRcvDie(int code, Datum arg);
137 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
138 : TimeLineID tli);
139 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
140 : TimeLineID tli);
141 : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
142 : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
143 : static void XLogWalRcvSendReply(bool force, bool requestReply);
144 : static void XLogWalRcvSendHSFeedback(bool immed);
145 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
146 : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
147 :
148 : /*
149 : * Process any interrupts the walreceiver process may have received.
150 : * This should be called any time the process's latch has become set.
151 : *
152 : * Currently, only SIGTERM is of interest. We can't just exit(1) within the
153 : * SIGTERM signal handler, because the signal might arrive in the middle of
154 : * some critical operation, like while we're holding a spinlock. Instead, the
155 : * signal handler sets a flag variable as well as setting the process's latch.
156 : * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
157 : * latch has become set. Operations that could block for a long time, such as
158 : * reading from a remote server, must pay attention to the latch too; see
159 : * libpqrcv_PQgetResult for example.
160 : */
161 : void
162 93708 : ProcessWalRcvInterrupts(void)
163 : {
164 : /*
165 : * Although walreceiver interrupt handling doesn't use the same scheme as
166 : * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
167 : * any incoming signals on Win32, and also to make sure we process any
168 : * barrier events.
169 : */
170 93708 : CHECK_FOR_INTERRUPTS();
171 :
172 93706 : if (ShutdownRequestPending)
173 : {
174 130 : ereport(FATAL,
175 : (errcode(ERRCODE_ADMIN_SHUTDOWN),
176 : errmsg("terminating walreceiver process due to administrator command")));
177 : }
178 93576 : }
179 :
180 :
181 : /* Main entry point for walreceiver process */
182 : void
183 358 : WalReceiverMain(char *startup_data, size_t startup_data_len)
184 : {
185 : char conninfo[MAXCONNINFO];
186 : char *tmp_conninfo;
187 : char slotname[NAMEDATALEN];
188 : bool is_temp_slot;
189 : XLogRecPtr startpoint;
190 : TimeLineID startpointTLI;
191 : TimeLineID primaryTLI;
192 : bool first_stream;
193 : WalRcvData *walrcv;
194 : TimestampTz now;
195 : char *err;
196 358 : char *sender_host = NULL;
197 358 : int sender_port = 0;
198 :
199 : Assert(startup_data_len == 0);
200 :
201 358 : MyBackendType = B_WAL_RECEIVER;
202 358 : AuxiliaryProcessMainCommon();
203 :
204 : /*
205 : * WalRcv should be set up already (if we are a backend, we inherit this
206 : * by fork() or EXEC_BACKEND mechanism from the postmaster).
207 : */
208 358 : walrcv = WalRcv;
209 : Assert(walrcv != NULL);
210 :
211 : /*
212 : * Mark walreceiver as running in shared memory.
213 : *
214 : * Do this as early as possible, so that if we fail later on, we'll set
215 : * state to STOPPED. If we die before this, the startup process will keep
216 : * waiting for us to start up, until it times out.
217 : */
218 358 : SpinLockAcquire(&walrcv->mutex);
219 : Assert(walrcv->pid == 0);
220 358 : switch (walrcv->walRcvState)
221 : {
222 0 : case WALRCV_STOPPING:
223 : /* If we've already been requested to stop, don't start up. */
224 0 : walrcv->walRcvState = WALRCV_STOPPED;
225 : /* fall through */
226 :
227 10 : case WALRCV_STOPPED:
228 10 : SpinLockRelease(&walrcv->mutex);
229 10 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
230 10 : proc_exit(1);
231 : break;
232 :
233 348 : case WALRCV_STARTING:
234 : /* The usual case */
235 348 : break;
236 :
237 0 : case WALRCV_WAITING:
238 : case WALRCV_STREAMING:
239 : case WALRCV_RESTARTING:
240 : default:
241 : /* Shouldn't happen */
242 0 : SpinLockRelease(&walrcv->mutex);
243 0 : elog(PANIC, "walreceiver still running according to shared memory state");
244 : }
245 : /* Advertise our PID so that the startup process can kill us */
246 348 : walrcv->pid = MyProcPid;
247 348 : walrcv->walRcvState = WALRCV_STREAMING;
248 :
249 : /* Fetch information required to start streaming */
250 348 : walrcv->ready_to_display = false;
251 348 : strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
252 348 : strlcpy(slotname, (char *) walrcv->slotname, NAMEDATALEN);
253 348 : is_temp_slot = walrcv->is_temp_slot;
254 348 : startpoint = walrcv->receiveStart;
255 348 : startpointTLI = walrcv->receiveStartTLI;
256 :
257 : /*
258 : * At most one of is_temp_slot and slotname can be set; otherwise,
259 : * RequestXLogStreaming messed up.
260 : */
261 : Assert(!is_temp_slot || (slotname[0] == '\0'));
262 :
263 : /* Initialise to a sanish value */
264 348 : now = GetCurrentTimestamp();
265 348 : walrcv->lastMsgSendTime =
266 348 : walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
267 :
268 : /* Report the latch to use to awaken this process */
269 348 : walrcv->latch = &MyProc->procLatch;
270 :
271 348 : SpinLockRelease(&walrcv->mutex);
272 :
273 348 : pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
274 :
275 : /* Arrange to clean up at walreceiver exit */
276 348 : on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
277 :
278 : /* Properly accept or ignore signals the postmaster might send us */
279 348 : pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
280 : * file */
281 348 : pqsignal(SIGINT, SIG_IGN);
282 348 : pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
283 : /* SIGQUIT handler was already set up by InitPostmasterChild */
284 348 : pqsignal(SIGALRM, SIG_IGN);
285 348 : pqsignal(SIGPIPE, SIG_IGN);
286 348 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
287 348 : pqsignal(SIGUSR2, SIG_IGN);
288 :
289 : /* Reset some signals that are accepted by postmaster but not here */
290 348 : pqsignal(SIGCHLD, SIG_DFL);
291 :
292 : /* Load the libpq-specific functions */
293 348 : load_file("libpqwalreceiver", false);
294 348 : if (WalReceiverFunctions == NULL)
295 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
296 :
297 : /* Unblock signals (they were blocked when the postmaster forked us) */
298 348 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
299 :
300 : /* Establish the connection to the primary for XLOG streaming */
301 348 : wrconn = walrcv_connect(conninfo, true, false, false,
302 : cluster_name[0] ? cluster_name : "walreceiver",
303 : &err);
304 340 : if (!wrconn)
305 114 : ereport(ERROR,
306 : (errcode(ERRCODE_CONNECTION_FAILURE),
307 : errmsg("could not connect to the primary server: %s", err)));
308 :
309 : /*
310 : * Save user-visible connection string. This clobbers the original
311 : * conninfo, for security. Also save host and port of the sender server
312 : * this walreceiver is connected to.
313 : */
314 226 : tmp_conninfo = walrcv_get_conninfo(wrconn);
315 226 : walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
316 226 : SpinLockAcquire(&walrcv->mutex);
317 226 : memset(walrcv->conninfo, 0, MAXCONNINFO);
318 226 : if (tmp_conninfo)
319 226 : strlcpy((char *) walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
320 :
321 226 : memset(walrcv->sender_host, 0, NI_MAXHOST);
322 226 : if (sender_host)
323 226 : strlcpy((char *) walrcv->sender_host, sender_host, NI_MAXHOST);
324 :
325 226 : walrcv->sender_port = sender_port;
326 226 : walrcv->ready_to_display = true;
327 226 : SpinLockRelease(&walrcv->mutex);
328 :
329 226 : if (tmp_conninfo)
330 226 : pfree(tmp_conninfo);
331 :
332 226 : if (sender_host)
333 226 : pfree(sender_host);
334 :
335 226 : first_stream = true;
336 : for (;;)
337 0 : {
338 : char *primary_sysid;
339 : char standby_sysid[32];
340 : WalRcvStreamOptions options;
341 :
342 : /*
343 : * Check that we're connected to a valid server using the
344 : * IDENTIFY_SYSTEM replication command.
345 : */
346 226 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
347 :
348 226 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
349 : GetSystemIdentifier());
350 226 : if (strcmp(primary_sysid, standby_sysid) != 0)
351 : {
352 0 : ereport(ERROR,
353 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
354 : errmsg("database system identifier differs between the primary and standby"),
355 : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
356 : primary_sysid, standby_sysid)));
357 : }
358 :
359 : /*
360 : * Confirm that the current timeline of the primary is the same or
361 : * ahead of ours.
362 : */
363 226 : if (primaryTLI < startpointTLI)
364 0 : ereport(ERROR,
365 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
366 : errmsg("highest timeline %u of the primary is behind recovery timeline %u",
367 : primaryTLI, startpointTLI)));
368 :
369 : /*
370 : * Get any missing history files. We do this always, even when we're
371 : * not interested in that timeline, so that if we're promoted to
372 : * become the primary later on, we don't select the same timeline that
373 : * was already used in the current primary. This isn't bullet-proof -
374 : * you'll need some external software to manage your cluster if you
375 : * need to ensure that a unique timeline id is chosen in every case,
376 : * but let's avoid the confusion of timeline id collisions where we
377 : * can.
378 : */
379 226 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
380 :
381 : /*
382 : * Create temporary replication slot if requested, and update slot
383 : * name in shared memory. (Note the slot name cannot already be set
384 : * in this case.)
385 : */
386 226 : if (is_temp_slot)
387 : {
388 0 : snprintf(slotname, sizeof(slotname),
389 : "pg_walreceiver_%lld",
390 0 : (long long int) walrcv_get_backend_pid(wrconn));
391 :
392 0 : walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
393 :
394 0 : SpinLockAcquire(&walrcv->mutex);
395 0 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
396 0 : SpinLockRelease(&walrcv->mutex);
397 : }
398 :
399 : /*
400 : * Start streaming.
401 : *
402 : * We'll try to start at the requested starting point and timeline,
403 : * even if it's different from the server's latest timeline. In case
404 : * we've already reached the end of the old timeline, the server will
405 : * finish the streaming immediately, and we will go back to await
406 : * orders from the startup process. If recovery_target_timeline is
407 : * 'latest', the startup process will scan pg_wal and find the new
408 : * history file, bump recovery target timeline, and ask us to restart
409 : * on the new timeline.
410 : */
411 226 : options.logical = false;
412 226 : options.startpoint = startpoint;
413 226 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
414 226 : options.proto.physical.startpointTLI = startpointTLI;
415 226 : if (walrcv_startstreaming(wrconn, &options))
416 : {
417 226 : if (first_stream)
418 226 : ereport(LOG,
419 : (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
420 : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
421 : else
422 0 : ereport(LOG,
423 : (errmsg("restarted WAL streaming at %X/%X on timeline %u",
424 : LSN_FORMAT_ARGS(startpoint), startpointTLI)));
425 226 : first_stream = false;
426 :
427 : /* Initialize LogstreamResult and buffers for processing messages */
428 226 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
429 226 : initStringInfo(&reply_message);
430 :
431 : /* Initialize nap wakeup times. */
432 226 : now = GetCurrentTimestamp();
433 1130 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
434 904 : WalRcvComputeNextWakeup(i, now);
435 :
436 : /* Send initial reply/feedback messages. */
437 226 : XLogWalRcvSendReply(true, false);
438 226 : XLogWalRcvSendHSFeedback(true);
439 :
440 : /* Loop until end-of-streaming or error */
441 : for (;;)
442 63874 : {
443 : char *buf;
444 : int len;
445 64100 : bool endofwal = false;
446 64100 : pgsocket wait_fd = PGINVALID_SOCKET;
447 : int rc;
448 : TimestampTz nextWakeup;
449 : long nap;
450 :
451 : /*
452 : * Exit walreceiver if we're not in recovery. This should not
453 : * happen, but cross-check the status here.
454 : */
455 64100 : if (!RecoveryInProgress())
456 0 : ereport(FATAL,
457 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
458 : errmsg("cannot continue WAL streaming, recovery has already ended")));
459 :
460 : /* Process any requests or signals received recently */
461 64100 : ProcessWalRcvInterrupts();
462 :
463 64100 : if (ConfigReloadPending)
464 : {
465 44 : ConfigReloadPending = false;
466 44 : ProcessConfigFile(PGC_SIGHUP);
467 : /* recompute wakeup times */
468 44 : now = GetCurrentTimestamp();
469 220 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
470 176 : WalRcvComputeNextWakeup(i, now);
471 44 : XLogWalRcvSendHSFeedback(true);
472 : }
473 :
474 : /* See if we can read data immediately */
475 64100 : len = walrcv_receive(wrconn, &buf, &wait_fd);
476 64040 : if (len != 0)
477 : {
478 : /*
479 : * Process the received data, and any subsequent data we
480 : * can read without blocking.
481 : */
482 : for (;;)
483 : {
484 81872 : if (len > 0)
485 : {
486 : /*
487 : * Something was received from primary, so adjust
488 : * the ping and terminate wakeup times.
489 : */
490 44130 : now = GetCurrentTimestamp();
491 44130 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
492 : now);
493 44130 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
494 44130 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
495 : startpointTLI);
496 : }
497 37742 : else if (len == 0)
498 37676 : break;
499 66 : else if (len < 0)
500 : {
501 66 : ereport(LOG,
502 : (errmsg("replication terminated by primary server"),
503 : errdetail("End of WAL reached on timeline %u at %X/%X.",
504 : startpointTLI,
505 : LSN_FORMAT_ARGS(LogstreamResult.Write))));
506 66 : endofwal = true;
507 66 : break;
508 : }
509 44130 : len = walrcv_receive(wrconn, &buf, &wait_fd);
510 : }
511 :
512 : /* Let the primary know that we received some data. */
513 37742 : XLogWalRcvSendReply(false, false);
514 :
515 : /*
516 : * If we've written some records, flush them to disk and
517 : * let the startup process and primary server know about
518 : * them.
519 : */
520 37742 : XLogWalRcvFlush(false, startpointTLI);
521 : }
522 :
523 : /* Check if we need to exit the streaming loop. */
524 64038 : if (endofwal)
525 66 : break;
526 :
527 : /* Find the soonest wakeup time, to limit our nap. */
528 63972 : nextWakeup = TIMESTAMP_INFINITY;
529 319860 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
530 255888 : nextWakeup = Min(wakeup[i], nextWakeup);
531 :
532 : /* Calculate the nap time, clamping as necessary. */
533 63972 : now = GetCurrentTimestamp();
534 63972 : nap = TimestampDifferenceMilliseconds(now, nextWakeup);
535 :
536 : /*
537 : * Ideally we would reuse a WaitEventSet object repeatedly
538 : * here to avoid the overheads of WaitLatchOrSocket on epoll
539 : * systems, but we can't be sure that libpq (or any other
540 : * walreceiver implementation) has the same socket (even if
541 : * the fd is the same number, it may have been closed and
542 : * reopened since the last time). In future, if there is a
543 : * function for removing sockets from WaitEventSet, then we
544 : * could add and remove just the socket each time, potentially
545 : * avoiding some system calls.
546 : */
547 : Assert(wait_fd != PGINVALID_SOCKET);
548 63972 : rc = WaitLatchOrSocket(MyLatch,
549 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
550 : WL_TIMEOUT | WL_LATCH_SET,
551 : wait_fd,
552 : nap,
553 : WAIT_EVENT_WAL_RECEIVER_MAIN);
554 63972 : if (rc & WL_LATCH_SET)
555 : {
556 26222 : ResetLatch(MyLatch);
557 26222 : ProcessWalRcvInterrupts();
558 :
559 26124 : if (walrcv->force_reply)
560 : {
561 : /*
562 : * The recovery process has asked us to send apply
563 : * feedback now. Make sure the flag is really set to
564 : * false in shared memory before sending the reply, so
565 : * we don't miss a new request for a reply.
566 : */
567 26044 : walrcv->force_reply = false;
568 26044 : pg_memory_barrier();
569 26044 : XLogWalRcvSendReply(true, false);
570 : }
571 : }
572 63874 : if (rc & WL_TIMEOUT)
573 : {
574 : /*
575 : * We didn't receive anything new. If we haven't heard
576 : * anything from the server for more than
577 : * wal_receiver_timeout / 2, ping the server. Also, if
578 : * it's been longer than wal_receiver_status_interval
579 : * since the last update we sent, send a status update to
580 : * the primary anyway, to report any progress in applying
581 : * WAL.
582 : */
583 8 : bool requestReply = false;
584 :
585 : /*
586 : * Check if time since last receive from primary has
587 : * reached the configured limit.
588 : */
589 8 : now = GetCurrentTimestamp();
590 8 : if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
591 0 : ereport(ERROR,
592 : (errcode(ERRCODE_CONNECTION_FAILURE),
593 : errmsg("terminating walreceiver due to timeout")));
594 :
595 : /*
596 : * If we didn't receive anything new for half of receiver
597 : * replication timeout, then ping the server.
598 : */
599 8 : if (now >= wakeup[WALRCV_WAKEUP_PING])
600 : {
601 0 : requestReply = true;
602 0 : wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
603 : }
604 :
605 8 : XLogWalRcvSendReply(requestReply, requestReply);
606 8 : XLogWalRcvSendHSFeedback(false);
607 : }
608 : }
609 :
610 : /*
611 : * The backend finished streaming. Exit streaming COPY-mode from
612 : * our side, too.
613 : */
614 66 : walrcv_endstreaming(wrconn, &primaryTLI);
615 :
616 : /*
617 : * If the server had switched to a new timeline that we didn't
618 : * know about when we began streaming, fetch its timeline history
619 : * file now.
620 : */
621 24 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
622 : }
623 : else
624 0 : ereport(LOG,
625 : (errmsg("primary server contains no more WAL on requested timeline %u",
626 : startpointTLI)));
627 :
628 : /*
629 : * End of WAL reached on the requested timeline. Close the last
630 : * segment, and await for new orders from the startup process.
631 : */
632 24 : if (recvFile >= 0)
633 : {
634 : char xlogfname[MAXFNAMELEN];
635 :
636 22 : XLogWalRcvFlush(false, startpointTLI);
637 22 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
638 22 : if (close(recvFile) != 0)
639 0 : ereport(PANIC,
640 : (errcode_for_file_access(),
641 : errmsg("could not close WAL segment %s: %m",
642 : xlogfname)));
643 :
644 : /*
645 : * Create .done file forcibly to prevent the streamed segment from
646 : * being archived later.
647 : */
648 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
649 22 : XLogArchiveForceDone(xlogfname);
650 : else
651 0 : XLogArchiveNotify(xlogfname);
652 : }
653 24 : recvFile = -1;
654 :
655 24 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
656 24 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
657 : }
658 : /* not reached */
659 : }
660 :
661 : /*
662 : * Wait for startup process to set receiveStart and receiveStartTLI.
663 : */
664 : static void
665 24 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
666 : {
667 24 : WalRcvData *walrcv = WalRcv;
668 : int state;
669 :
670 24 : SpinLockAcquire(&walrcv->mutex);
671 24 : state = walrcv->walRcvState;
672 24 : if (state != WALRCV_STREAMING)
673 : {
674 0 : SpinLockRelease(&walrcv->mutex);
675 0 : if (state == WALRCV_STOPPING)
676 0 : proc_exit(0);
677 : else
678 0 : elog(FATAL, "unexpected walreceiver state");
679 : }
680 24 : walrcv->walRcvState = WALRCV_WAITING;
681 24 : walrcv->receiveStart = InvalidXLogRecPtr;
682 24 : walrcv->receiveStartTLI = 0;
683 24 : SpinLockRelease(&walrcv->mutex);
684 :
685 24 : set_ps_display("idle");
686 :
687 : /*
688 : * nudge startup process to notice that we've stopped streaming and are
689 : * now waiting for instructions.
690 : */
691 24 : WakeupRecovery();
692 : for (;;)
693 : {
694 46 : ResetLatch(MyLatch);
695 :
696 46 : ProcessWalRcvInterrupts();
697 :
698 22 : SpinLockAcquire(&walrcv->mutex);
699 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
700 : walrcv->walRcvState == WALRCV_WAITING ||
701 : walrcv->walRcvState == WALRCV_STOPPING);
702 22 : if (walrcv->walRcvState == WALRCV_RESTARTING)
703 : {
704 : /*
705 : * No need to handle changes in primary_conninfo or
706 : * primary_slot_name here. Startup process will signal us to
707 : * terminate in case those change.
708 : */
709 0 : *startpoint = walrcv->receiveStart;
710 0 : *startpointTLI = walrcv->receiveStartTLI;
711 0 : walrcv->walRcvState = WALRCV_STREAMING;
712 0 : SpinLockRelease(&walrcv->mutex);
713 0 : break;
714 : }
715 22 : if (walrcv->walRcvState == WALRCV_STOPPING)
716 : {
717 : /*
718 : * We should've received SIGTERM if the startup process wants us
719 : * to die, but might as well check it here too.
720 : */
721 0 : SpinLockRelease(&walrcv->mutex);
722 0 : exit(1);
723 : }
724 22 : SpinLockRelease(&walrcv->mutex);
725 :
726 22 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
727 : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
728 : }
729 :
730 0 : if (update_process_title)
731 : {
732 : char activitymsg[50];
733 :
734 0 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
735 0 : LSN_FORMAT_ARGS(*startpoint));
736 0 : set_ps_display(activitymsg);
737 : }
738 0 : }
739 :
740 : /*
741 : * Fetch any missing timeline history files between 'first' and 'last'
742 : * (inclusive) from the server.
743 : */
744 : static void
745 250 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
746 : {
747 : TimeLineID tli;
748 :
749 546 : for (tli = first; tli <= last; tli++)
750 : {
751 : /* there's no history file for timeline 1 */
752 296 : if (tli != 1 && !existsTimeLineHistory(tli))
753 : {
754 : char *fname;
755 : char *content;
756 : int len;
757 : char expectedfname[MAXFNAMELEN];
758 :
759 22 : ereport(LOG,
760 : (errmsg("fetching timeline history file for timeline %u from primary server",
761 : tli)));
762 :
763 22 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
764 :
765 : /*
766 : * Check that the filename on the primary matches what we
767 : * calculated ourselves. This is just a sanity check, it should
768 : * always match.
769 : */
770 22 : TLHistoryFileName(expectedfname, tli);
771 22 : if (strcmp(fname, expectedfname) != 0)
772 0 : ereport(ERROR,
773 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
774 : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
775 : tli)));
776 :
777 : /*
778 : * Write the file to pg_wal.
779 : */
780 22 : writeTimeLineHistoryFile(tli, content, len);
781 :
782 : /*
783 : * Mark the streamed history file as ready for archiving if
784 : * archive_mode is always.
785 : */
786 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
787 22 : XLogArchiveForceDone(fname);
788 : else
789 0 : XLogArchiveNotify(fname);
790 :
791 22 : pfree(fname);
792 22 : pfree(content);
793 : }
794 : }
795 250 : }
796 :
797 : /*
798 : * Mark us as STOPPED in shared memory at exit.
799 : */
800 : static void
801 348 : WalRcvDie(int code, Datum arg)
802 : {
803 348 : WalRcvData *walrcv = WalRcv;
804 348 : TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
805 :
806 : Assert(*startpointTLI_p != 0);
807 :
808 : /* Ensure that all WAL records received are flushed to disk */
809 348 : XLogWalRcvFlush(true, *startpointTLI_p);
810 :
811 : /* Mark ourselves inactive in shared memory */
812 348 : SpinLockAcquire(&walrcv->mutex);
813 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
814 : walrcv->walRcvState == WALRCV_RESTARTING ||
815 : walrcv->walRcvState == WALRCV_STARTING ||
816 : walrcv->walRcvState == WALRCV_WAITING ||
817 : walrcv->walRcvState == WALRCV_STOPPING);
818 : Assert(walrcv->pid == MyProcPid);
819 348 : walrcv->walRcvState = WALRCV_STOPPED;
820 348 : walrcv->pid = 0;
821 348 : walrcv->ready_to_display = false;
822 348 : walrcv->latch = NULL;
823 348 : SpinLockRelease(&walrcv->mutex);
824 :
825 348 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
826 :
827 : /* Terminate the connection gracefully. */
828 348 : if (wrconn != NULL)
829 226 : walrcv_disconnect(wrconn);
830 :
831 : /* Wake up the startup process to notice promptly that we're gone */
832 348 : WakeupRecovery();
833 348 : }
834 :
835 : /*
836 : * Accept the message from XLOG stream, and process it.
837 : */
838 : static void
839 44130 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
840 : {
841 : int hdrlen;
842 : XLogRecPtr dataStart;
843 : XLogRecPtr walEnd;
844 : TimestampTz sendTime;
845 : bool replyRequested;
846 :
847 44130 : switch (type)
848 : {
849 44130 : case 'w': /* WAL records */
850 : {
851 : StringInfoData incoming_message;
852 :
853 44130 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
854 44130 : if (len < hdrlen)
855 0 : ereport(ERROR,
856 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
857 : errmsg_internal("invalid WAL message received from primary")));
858 :
859 : /* initialize a StringInfo with the given buffer */
860 44130 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
861 :
862 : /* read the fields */
863 44130 : dataStart = pq_getmsgint64(&incoming_message);
864 44130 : walEnd = pq_getmsgint64(&incoming_message);
865 44130 : sendTime = pq_getmsgint64(&incoming_message);
866 44130 : ProcessWalSndrMessage(walEnd, sendTime);
867 :
868 44130 : buf += hdrlen;
869 44130 : len -= hdrlen;
870 44130 : XLogWalRcvWrite(buf, len, dataStart, tli);
871 44130 : break;
872 : }
873 0 : case 'k': /* Keepalive */
874 : {
875 : StringInfoData incoming_message;
876 :
877 0 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
878 0 : if (len != hdrlen)
879 0 : ereport(ERROR,
880 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
881 : errmsg_internal("invalid keepalive message received from primary")));
882 :
883 : /* initialize a StringInfo with the given buffer */
884 0 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
885 :
886 : /* read the fields */
887 0 : walEnd = pq_getmsgint64(&incoming_message);
888 0 : sendTime = pq_getmsgint64(&incoming_message);
889 0 : replyRequested = pq_getmsgbyte(&incoming_message);
890 :
891 0 : ProcessWalSndrMessage(walEnd, sendTime);
892 :
893 : /* If the primary requested a reply, send one immediately */
894 0 : if (replyRequested)
895 0 : XLogWalRcvSendReply(true, false);
896 0 : break;
897 : }
898 0 : default:
899 0 : ereport(ERROR,
900 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
901 : errmsg_internal("invalid replication message type %d",
902 : type)));
903 : }
904 44130 : }
905 :
906 : /*
907 : * Write XLOG data to disk.
908 : */
909 : static void
910 44130 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
911 : {
912 : int startoff;
913 : int byteswritten;
914 :
915 : Assert(tli != 0);
916 :
917 88292 : while (nbytes > 0)
918 : {
919 : int segbytes;
920 :
921 : /* Close the current segment if it's completed */
922 44162 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
923 32 : XLogWalRcvClose(recptr, tli);
924 :
925 44162 : if (recvFile < 0)
926 : {
927 : /* Create/use new log file */
928 296 : XLByteToSeg(recptr, recvSegNo, wal_segment_size);
929 296 : recvFile = XLogFileInit(recvSegNo, tli);
930 296 : recvFileTLI = tli;
931 : }
932 :
933 : /* Calculate the start offset of the received logs */
934 44162 : startoff = XLogSegmentOffset(recptr, wal_segment_size);
935 :
936 44162 : if (startoff + nbytes > wal_segment_size)
937 32 : segbytes = wal_segment_size - startoff;
938 : else
939 44130 : segbytes = nbytes;
940 :
941 : /* OK to write the logs */
942 44162 : errno = 0;
943 :
944 44162 : byteswritten = pg_pwrite(recvFile, buf, segbytes, (off_t) startoff);
945 44162 : if (byteswritten <= 0)
946 : {
947 : char xlogfname[MAXFNAMELEN];
948 : int save_errno;
949 :
950 : /* if write didn't set errno, assume no disk space */
951 0 : if (errno == 0)
952 0 : errno = ENOSPC;
953 :
954 0 : save_errno = errno;
955 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
956 0 : errno = save_errno;
957 0 : ereport(PANIC,
958 : (errcode_for_file_access(),
959 : errmsg("could not write to WAL segment %s "
960 : "at offset %d, length %lu: %m",
961 : xlogfname, startoff, (unsigned long) segbytes)));
962 : }
963 :
964 : /* Update state for write */
965 44162 : recptr += byteswritten;
966 :
967 44162 : nbytes -= byteswritten;
968 44162 : buf += byteswritten;
969 :
970 44162 : LogstreamResult.Write = recptr;
971 : }
972 :
973 : /* Update shared-memory status */
974 44130 : pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
975 :
976 : /*
977 : * Close the current segment if it's fully written up in the last cycle of
978 : * the loop, to create its archive notification file soon. Otherwise WAL
979 : * archiving of the segment will be delayed until any data in the next
980 : * segment is received and written.
981 : */
982 44130 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
983 70 : XLogWalRcvClose(recptr, tli);
984 44130 : }
985 :
986 : /*
987 : * Flush the log to disk.
988 : *
989 : * If we're in the midst of dying, it's unwise to do anything that might throw
990 : * an error, so we skip sending a reply in that case.
991 : */
992 : static void
993 38214 : XLogWalRcvFlush(bool dying, TimeLineID tli)
994 : {
995 : Assert(tli != 0);
996 :
997 38214 : if (LogstreamResult.Flush < LogstreamResult.Write)
998 : {
999 37696 : WalRcvData *walrcv = WalRcv;
1000 :
1001 37696 : issue_xlog_fsync(recvFile, recvSegNo, tli);
1002 :
1003 37696 : LogstreamResult.Flush = LogstreamResult.Write;
1004 :
1005 : /* Update shared-memory status */
1006 37696 : SpinLockAcquire(&walrcv->mutex);
1007 37696 : if (walrcv->flushedUpto < LogstreamResult.Flush)
1008 : {
1009 37696 : walrcv->latestChunkStart = walrcv->flushedUpto;
1010 37696 : walrcv->flushedUpto = LogstreamResult.Flush;
1011 37696 : walrcv->receivedTLI = tli;
1012 : }
1013 37696 : SpinLockRelease(&walrcv->mutex);
1014 :
1015 : /* Signal the startup process and walsender that new WAL has arrived */
1016 37696 : WakeupRecovery();
1017 37696 : if (AllowCascadeReplication())
1018 37696 : WalSndWakeup(true, false);
1019 :
1020 : /* Report XLOG streaming progress in PS display */
1021 37696 : if (update_process_title)
1022 : {
1023 : char activitymsg[50];
1024 :
1025 37696 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
1026 37696 : LSN_FORMAT_ARGS(LogstreamResult.Write));
1027 37696 : set_ps_display(activitymsg);
1028 : }
1029 :
1030 : /* Also let the primary know that we made some progress */
1031 37696 : if (!dying)
1032 : {
1033 37694 : XLogWalRcvSendReply(false, false);
1034 37694 : XLogWalRcvSendHSFeedback(false);
1035 : }
1036 : }
1037 38214 : }
1038 :
1039 : /*
1040 : * Close the current segment.
1041 : *
1042 : * Flush the segment to disk before closing it. Otherwise we have to
1043 : * reopen and fsync it later.
1044 : *
1045 : * Create an archive notification file since the segment is known completed.
1046 : */
1047 : static void
1048 102 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
1049 : {
1050 : char xlogfname[MAXFNAMELEN];
1051 :
1052 : Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1053 : Assert(tli != 0);
1054 :
1055 : /*
1056 : * fsync() and close current file before we switch to next one. We would
1057 : * otherwise have to reopen this file to fsync it later
1058 : */
1059 102 : XLogWalRcvFlush(false, tli);
1060 :
1061 102 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1062 :
1063 : /*
1064 : * XLOG segment files will be re-read by recovery in startup process soon,
1065 : * so we don't advise the OS to release cache pages associated with the
1066 : * file like XLogFileClose() does.
1067 : */
1068 102 : if (close(recvFile) != 0)
1069 0 : ereport(PANIC,
1070 : (errcode_for_file_access(),
1071 : errmsg("could not close WAL segment %s: %m",
1072 : xlogfname)));
1073 :
1074 : /*
1075 : * Create .done file forcibly to prevent the streamed segment from being
1076 : * archived later.
1077 : */
1078 102 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1079 102 : XLogArchiveForceDone(xlogfname);
1080 : else
1081 0 : XLogArchiveNotify(xlogfname);
1082 :
1083 102 : recvFile = -1;
1084 102 : }
1085 :
1086 : /*
1087 : * Send reply message to primary, indicating our current WAL locations, oldest
1088 : * xmin and the current time.
1089 : *
1090 : * If 'force' is not set, the message is only sent if enough time has
1091 : * passed since last status update to reach wal_receiver_status_interval.
1092 : * If wal_receiver_status_interval is disabled altogether and 'force' is
1093 : * false, this is a no-op.
1094 : *
1095 : * If 'requestReply' is true, requests the server to reply immediately upon
1096 : * receiving this message. This is used for heartbeats, when approaching
1097 : * wal_receiver_timeout.
1098 : */
1099 : static void
1100 101714 : XLogWalRcvSendReply(bool force, bool requestReply)
1101 : {
1102 : static XLogRecPtr writePtr = 0;
1103 : static XLogRecPtr flushPtr = 0;
1104 : XLogRecPtr applyPtr;
1105 : TimestampTz now;
1106 :
1107 : /*
1108 : * If the user doesn't want status to be reported to the primary, be sure
1109 : * to exit before doing anything at all.
1110 : */
1111 101714 : if (!force && wal_receiver_status_interval <= 0)
1112 0 : return;
1113 :
1114 : /* Get current timestamp. */
1115 101714 : now = GetCurrentTimestamp();
1116 :
1117 : /*
1118 : * We can compare the write and flush positions to the last message we
1119 : * sent without taking any lock, but the apply position requires a spin
1120 : * lock, so we don't check that unless something else has changed or 10
1121 : * seconds have passed. This means that the apply WAL location will
1122 : * appear, from the primary's point of view, to lag slightly, but since
1123 : * this is only for reporting purposes and only on idle systems, that's
1124 : * probably OK.
1125 : */
1126 101714 : if (!force
1127 75444 : && writePtr == LogstreamResult.Write
1128 37716 : && flushPtr == LogstreamResult.Flush
1129 124 : && now < wakeup[WALRCV_WAKEUP_REPLY])
1130 124 : return;
1131 :
1132 : /* Make sure we wake up when it's time to send another reply. */
1133 101590 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
1134 :
1135 : /* Construct a new message */
1136 101590 : writePtr = LogstreamResult.Write;
1137 101590 : flushPtr = LogstreamResult.Flush;
1138 101590 : applyPtr = GetXLogReplayRecPtr(NULL);
1139 :
1140 101590 : resetStringInfo(&reply_message);
1141 101590 : pq_sendbyte(&reply_message, 'r');
1142 101590 : pq_sendint64(&reply_message, writePtr);
1143 101590 : pq_sendint64(&reply_message, flushPtr);
1144 101590 : pq_sendint64(&reply_message, applyPtr);
1145 101590 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1146 101590 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1147 :
1148 : /* Send it */
1149 101590 : elog(DEBUG2, "sending write %X/%X flush %X/%X apply %X/%X%s",
1150 : LSN_FORMAT_ARGS(writePtr),
1151 : LSN_FORMAT_ARGS(flushPtr),
1152 : LSN_FORMAT_ARGS(applyPtr),
1153 : requestReply ? " (reply requested)" : "");
1154 :
1155 101590 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1156 : }
1157 :
1158 : /*
1159 : * Send hot standby feedback message to primary, plus the current time,
1160 : * in case they don't have a watch.
1161 : *
1162 : * If the user disables feedback, send one final message to tell sender
1163 : * to forget about the xmin on this standby. We also send this message
1164 : * on first connect because a previous connection might have set xmin
1165 : * on a replication slot. (If we're not using a slot it's harmless to
1166 : * send a feedback message explicitly setting InvalidTransactionId).
1167 : */
1168 : static void
1169 37972 : XLogWalRcvSendHSFeedback(bool immed)
1170 : {
1171 : TimestampTz now;
1172 : FullTransactionId nextFullXid;
1173 : TransactionId nextXid;
1174 : uint32 xmin_epoch,
1175 : catalog_xmin_epoch;
1176 : TransactionId xmin,
1177 : catalog_xmin;
1178 :
1179 : /* initially true so we always send at least one feedback message */
1180 : static bool primary_has_standby_xmin = true;
1181 :
1182 : /*
1183 : * If the user doesn't want status to be reported to the primary, be sure
1184 : * to exit before doing anything at all.
1185 : */
1186 37972 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1187 37576 : !primary_has_standby_xmin)
1188 37704 : return;
1189 :
1190 : /* Get current timestamp. */
1191 606 : now = GetCurrentTimestamp();
1192 :
1193 : /* Send feedback at most once per wal_receiver_status_interval. */
1194 606 : if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1195 336 : return;
1196 :
1197 : /* Make sure we wake up when it's time to send feedback again. */
1198 270 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1199 :
1200 : /*
1201 : * If Hot Standby is not yet accepting connections there is nothing to
1202 : * send. Check this after the interval has expired to reduce number of
1203 : * calls.
1204 : *
1205 : * Bailing out here also ensures that we don't send feedback until we've
1206 : * read our own replication slot state, so we don't tell the primary to
1207 : * discard needed xmin or catalog_xmin from any slots that may exist on
1208 : * this replica.
1209 : */
1210 270 : if (!HotStandbyActive())
1211 2 : return;
1212 :
1213 : /*
1214 : * Make the expensive call to get the oldest xmin once we are certain
1215 : * everything else has been checked.
1216 : */
1217 268 : if (hot_standby_feedback)
1218 : {
1219 62 : GetReplicationHorizons(&xmin, &catalog_xmin);
1220 : }
1221 : else
1222 : {
1223 206 : xmin = InvalidTransactionId;
1224 206 : catalog_xmin = InvalidTransactionId;
1225 : }
1226 :
1227 : /*
1228 : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1229 : * the epoch boundary.
1230 : */
1231 268 : nextFullXid = ReadNextFullTransactionId();
1232 268 : nextXid = XidFromFullTransactionId(nextFullXid);
1233 268 : xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1234 268 : catalog_xmin_epoch = xmin_epoch;
1235 268 : if (nextXid < xmin)
1236 0 : xmin_epoch--;
1237 268 : if (nextXid < catalog_xmin)
1238 0 : catalog_xmin_epoch--;
1239 :
1240 268 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1241 : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1242 :
1243 : /* Construct the message and send it. */
1244 268 : resetStringInfo(&reply_message);
1245 268 : pq_sendbyte(&reply_message, 'h');
1246 268 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1247 268 : pq_sendint32(&reply_message, xmin);
1248 268 : pq_sendint32(&reply_message, xmin_epoch);
1249 268 : pq_sendint32(&reply_message, catalog_xmin);
1250 268 : pq_sendint32(&reply_message, catalog_xmin_epoch);
1251 268 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1252 268 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1253 62 : primary_has_standby_xmin = true;
1254 : else
1255 206 : primary_has_standby_xmin = false;
1256 : }
1257 :
1258 : /*
1259 : * Update shared memory status upon receiving a message from primary.
1260 : *
1261 : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1262 : * message, reported by primary.
1263 : */
1264 : static void
1265 44130 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1266 : {
1267 44130 : WalRcvData *walrcv = WalRcv;
1268 44130 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1269 :
1270 : /* Update shared-memory status */
1271 44130 : SpinLockAcquire(&walrcv->mutex);
1272 44130 : if (walrcv->latestWalEnd < walEnd)
1273 37760 : walrcv->latestWalEndTime = sendTime;
1274 44130 : walrcv->latestWalEnd = walEnd;
1275 44130 : walrcv->lastMsgSendTime = sendTime;
1276 44130 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1277 44130 : SpinLockRelease(&walrcv->mutex);
1278 :
1279 44130 : if (message_level_is_interesting(DEBUG2))
1280 : {
1281 : char *sendtime;
1282 : char *receipttime;
1283 : int applyDelay;
1284 :
1285 : /* Copy because timestamptz_to_str returns a static buffer */
1286 780 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1287 780 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1288 780 : applyDelay = GetReplicationApplyDelay();
1289 :
1290 : /* apply delay is not available */
1291 780 : if (applyDelay == -1)
1292 2 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1293 : sendtime,
1294 : receipttime,
1295 : GetReplicationTransferLatency());
1296 : else
1297 778 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1298 : sendtime,
1299 : receipttime,
1300 : applyDelay,
1301 : GetReplicationTransferLatency());
1302 :
1303 780 : pfree(sendtime);
1304 780 : pfree(receipttime);
1305 : }
1306 44130 : }
1307 :
1308 : /*
1309 : * Compute the next wakeup time for a given wakeup reason. Can be called to
1310 : * initialize a wakeup time, to adjust it for the next wakeup, or to
1311 : * reinitialize it when GUCs have changed. We ask the caller to pass in the
1312 : * value of "now" because this frequently avoids multiple calls of
1313 : * GetCurrentTimestamp(). It had better be a reasonably up-to-date value
1314 : * though.
1315 : */
1316 : static void
1317 191200 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1318 : {
1319 191200 : switch (reason)
1320 : {
1321 44400 : case WALRCV_WAKEUP_TERMINATE:
1322 44400 : if (wal_receiver_timeout <= 0)
1323 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1324 : else
1325 44400 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
1326 44400 : break;
1327 44400 : case WALRCV_WAKEUP_PING:
1328 44400 : if (wal_receiver_timeout <= 0)
1329 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1330 : else
1331 44400 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
1332 44400 : break;
1333 540 : case WALRCV_WAKEUP_HSFEEDBACK:
1334 540 : if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
1335 424 : wakeup[reason] = TIMESTAMP_INFINITY;
1336 : else
1337 116 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1338 540 : break;
1339 101860 : case WALRCV_WAKEUP_REPLY:
1340 101860 : if (wal_receiver_status_interval <= 0)
1341 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1342 : else
1343 101860 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1344 101860 : break;
1345 : /* there's intentionally no default: here */
1346 : }
1347 191200 : }
1348 :
1349 : /*
1350 : * Wake up the walreceiver main loop.
1351 : *
1352 : * This is called by the startup process whenever interesting xlog records
1353 : * are applied, so that walreceiver can check if it needs to send an apply
1354 : * notification back to the primary which may be waiting in a COMMIT with
1355 : * synchronous_commit = remote_apply.
1356 : */
1357 : void
1358 25560 : WalRcvForceReply(void)
1359 : {
1360 : Latch *latch;
1361 :
1362 25560 : WalRcv->force_reply = true;
1363 : /* fetching the latch pointer might not be atomic, so use spinlock */
1364 25560 : SpinLockAcquire(&WalRcv->mutex);
1365 25560 : latch = WalRcv->latch;
1366 25560 : SpinLockRelease(&WalRcv->mutex);
1367 25560 : if (latch)
1368 25326 : SetLatch(latch);
1369 25560 : }
1370 :
1371 : /*
1372 : * Return a string constant representing the state. This is used
1373 : * in system functions and views, and should *not* be translated.
1374 : */
1375 : static const char *
1376 0 : WalRcvGetStateString(WalRcvState state)
1377 : {
1378 0 : switch (state)
1379 : {
1380 0 : case WALRCV_STOPPED:
1381 0 : return "stopped";
1382 0 : case WALRCV_STARTING:
1383 0 : return "starting";
1384 0 : case WALRCV_STREAMING:
1385 0 : return "streaming";
1386 0 : case WALRCV_WAITING:
1387 0 : return "waiting";
1388 0 : case WALRCV_RESTARTING:
1389 0 : return "restarting";
1390 0 : case WALRCV_STOPPING:
1391 0 : return "stopping";
1392 : }
1393 0 : return "UNKNOWN";
1394 : }
1395 :
1396 : /*
1397 : * Returns activity of WAL receiver, including pid, state and xlog locations
1398 : * received from the WAL sender of another server.
1399 : */
1400 : Datum
1401 6 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1402 : {
1403 : TupleDesc tupdesc;
1404 : Datum *values;
1405 : bool *nulls;
1406 : int pid;
1407 : bool ready_to_display;
1408 : WalRcvState state;
1409 : XLogRecPtr receive_start_lsn;
1410 : TimeLineID receive_start_tli;
1411 : XLogRecPtr written_lsn;
1412 : XLogRecPtr flushed_lsn;
1413 : TimeLineID received_tli;
1414 : TimestampTz last_send_time;
1415 : TimestampTz last_receipt_time;
1416 : XLogRecPtr latest_end_lsn;
1417 : TimestampTz latest_end_time;
1418 : char sender_host[NI_MAXHOST];
1419 6 : int sender_port = 0;
1420 : char slotname[NAMEDATALEN];
1421 : char conninfo[MAXCONNINFO];
1422 :
1423 : /* Take a lock to ensure value consistency */
1424 6 : SpinLockAcquire(&WalRcv->mutex);
1425 6 : pid = (int) WalRcv->pid;
1426 6 : ready_to_display = WalRcv->ready_to_display;
1427 6 : state = WalRcv->walRcvState;
1428 6 : receive_start_lsn = WalRcv->receiveStart;
1429 6 : receive_start_tli = WalRcv->receiveStartTLI;
1430 6 : flushed_lsn = WalRcv->flushedUpto;
1431 6 : received_tli = WalRcv->receivedTLI;
1432 6 : last_send_time = WalRcv->lastMsgSendTime;
1433 6 : last_receipt_time = WalRcv->lastMsgReceiptTime;
1434 6 : latest_end_lsn = WalRcv->latestWalEnd;
1435 6 : latest_end_time = WalRcv->latestWalEndTime;
1436 6 : strlcpy(slotname, (char *) WalRcv->slotname, sizeof(slotname));
1437 6 : strlcpy(sender_host, (char *) WalRcv->sender_host, sizeof(sender_host));
1438 6 : sender_port = WalRcv->sender_port;
1439 6 : strlcpy(conninfo, (char *) WalRcv->conninfo, sizeof(conninfo));
1440 6 : SpinLockRelease(&WalRcv->mutex);
1441 :
1442 : /*
1443 : * No WAL receiver (or not ready yet), just return a tuple with NULL
1444 : * values
1445 : */
1446 6 : if (pid == 0 || !ready_to_display)
1447 6 : PG_RETURN_NULL();
1448 :
1449 : /*
1450 : * Read "writtenUpto" without holding a spinlock. Note that it may not be
1451 : * consistent with the other shared variables of the WAL receiver
1452 : * protected by a spinlock, but this should not be used for data integrity
1453 : * checks.
1454 : */
1455 0 : written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1456 :
1457 : /* determine result type */
1458 0 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1459 0 : elog(ERROR, "return type must be a row type");
1460 :
1461 0 : values = palloc0(sizeof(Datum) * tupdesc->natts);
1462 0 : nulls = palloc0(sizeof(bool) * tupdesc->natts);
1463 :
1464 : /* Fetch values */
1465 0 : values[0] = Int32GetDatum(pid);
1466 :
1467 0 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1468 : {
1469 : /*
1470 : * Only superusers and roles with privileges of pg_read_all_stats can
1471 : * see details. Other users only get the pid value to know whether it
1472 : * is a WAL receiver, but no details.
1473 : */
1474 0 : memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1475 : }
1476 : else
1477 : {
1478 0 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1479 :
1480 0 : if (XLogRecPtrIsInvalid(receive_start_lsn))
1481 0 : nulls[2] = true;
1482 : else
1483 0 : values[2] = LSNGetDatum(receive_start_lsn);
1484 0 : values[3] = Int32GetDatum(receive_start_tli);
1485 0 : if (XLogRecPtrIsInvalid(written_lsn))
1486 0 : nulls[4] = true;
1487 : else
1488 0 : values[4] = LSNGetDatum(written_lsn);
1489 0 : if (XLogRecPtrIsInvalid(flushed_lsn))
1490 0 : nulls[5] = true;
1491 : else
1492 0 : values[5] = LSNGetDatum(flushed_lsn);
1493 0 : values[6] = Int32GetDatum(received_tli);
1494 0 : if (last_send_time == 0)
1495 0 : nulls[7] = true;
1496 : else
1497 0 : values[7] = TimestampTzGetDatum(last_send_time);
1498 0 : if (last_receipt_time == 0)
1499 0 : nulls[8] = true;
1500 : else
1501 0 : values[8] = TimestampTzGetDatum(last_receipt_time);
1502 0 : if (XLogRecPtrIsInvalid(latest_end_lsn))
1503 0 : nulls[9] = true;
1504 : else
1505 0 : values[9] = LSNGetDatum(latest_end_lsn);
1506 0 : if (latest_end_time == 0)
1507 0 : nulls[10] = true;
1508 : else
1509 0 : values[10] = TimestampTzGetDatum(latest_end_time);
1510 0 : if (*slotname == '\0')
1511 0 : nulls[11] = true;
1512 : else
1513 0 : values[11] = CStringGetTextDatum(slotname);
1514 0 : if (*sender_host == '\0')
1515 0 : nulls[12] = true;
1516 : else
1517 0 : values[12] = CStringGetTextDatum(sender_host);
1518 0 : if (sender_port == 0)
1519 0 : nulls[13] = true;
1520 : else
1521 0 : values[13] = Int32GetDatum(sender_port);
1522 0 : if (*conninfo == '\0')
1523 0 : nulls[14] = true;
1524 : else
1525 0 : values[14] = CStringGetTextDatum(conninfo);
1526 : }
1527 :
1528 : /* Returns the record as Datum */
1529 0 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1530 : }
|