Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.c
4 : *
5 : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : * is the process in the standby server that takes charge of receiving
7 : * XLOG records from a primary server during streaming replication.
8 : *
9 : * When the startup process determines that it's time to start streaming,
10 : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : * to the primary server (it will be served by a walsender process
12 : * in the primary server), and then keeps receiving XLOG records and
13 : * writing them to the disk as long as the connection is alive. As XLOG
14 : * records are received and flushed to disk, it updates the
15 : * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 : * process of how far it can proceed with XLOG replay.
17 : *
18 : * A WAL receiver cannot directly load GUC parameters used when establishing
19 : * its connection to the primary. Instead it relies on parameter values
20 : * that are passed down by the startup process when streaming is requested.
21 : * This applies, for example, to the replication slot and the connection
22 : * string to be used for the connection with the primary.
23 : *
24 : * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 : * goes into "waiting" mode, and waits for the startup process to give new
26 : * instructions. The startup process will treat that the same as
27 : * disconnection, and will rescan the archive/pg_wal directory. But when the
28 : * startup process wants to try streaming replication again, it will just
29 : * nudge the existing walreceiver process that's waiting, instead of launching
30 : * a new one.
31 : *
32 : * Normal termination is by SIGTERM, which instructs the walreceiver to
33 : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 : * of the connection and a FATAL error are treated not as a crash but as
36 : * normal operation.
37 : *
38 : * This file contains the server-facing parts of walreceiver. The libpq-
39 : * specific parts are in the libpqwalreceiver module. It's loaded
40 : * dynamically to avoid linking the server with libpq.
41 : *
42 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
43 : *
44 : *
45 : * IDENTIFICATION
46 : * src/backend/replication/walreceiver.c
47 : *
48 : *-------------------------------------------------------------------------
49 : */
50 : #include "postgres.h"
51 :
52 : #include <unistd.h>
53 :
54 : #include "access/htup_details.h"
55 : #include "access/timeline.h"
56 : #include "access/transam.h"
57 : #include "access/xlog_internal.h"
58 : #include "access/xlogarchive.h"
59 : #include "access/xlogrecovery.h"
60 : #include "access/xlogwait.h"
61 : #include "catalog/pg_authid.h"
62 : #include "funcapi.h"
63 : #include "libpq/pqformat.h"
64 : #include "libpq/pqsignal.h"
65 : #include "miscadmin.h"
66 : #include "pgstat.h"
67 : #include "postmaster/auxprocess.h"
68 : #include "postmaster/interrupt.h"
69 : #include "replication/walreceiver.h"
70 : #include "replication/walsender.h"
71 : #include "storage/ipc.h"
72 : #include "storage/proc.h"
73 : #include "storage/procarray.h"
74 : #include "storage/procsignal.h"
75 : #include "tcop/tcopprot.h"
76 : #include "utils/acl.h"
77 : #include "utils/builtins.h"
78 : #include "utils/guc.h"
79 : #include "utils/pg_lsn.h"
80 : #include "utils/ps_status.h"
81 : #include "utils/timestamp.h"
82 :
83 :
84 : /*
85 : * GUC variables. (Other variables that affect walreceiver are in xlog.c
86 : * because they're passed down from the startup process, for better
87 : * synchronization.)
88 : */
89 : int wal_receiver_status_interval;
90 : int wal_receiver_timeout;
91 : bool hot_standby_feedback;
92 :
93 : /* libpqwalreceiver connection */
94 : static WalReceiverConn *wrconn = NULL;
95 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
96 :
97 : /*
98 : * These variables are used similarly to openLogFile/SegNo,
99 : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
100 : * corresponding the filename of recvFile.
101 : */
102 : static int recvFile = -1;
103 : static TimeLineID recvFileTLI = 0;
104 : static XLogSegNo recvSegNo = 0;
105 :
106 : /*
107 : * LogstreamResult indicates the byte positions that we have already
108 : * written/fsynced.
109 : */
110 : static struct
111 : {
112 : XLogRecPtr Write; /* last byte + 1 written out in the standby */
113 : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
114 : } LogstreamResult;
115 :
116 : /*
117 : * Reasons to wake up and perform periodic tasks.
118 : */
119 : typedef enum WalRcvWakeupReason
120 : {
121 : WALRCV_WAKEUP_TERMINATE,
122 : WALRCV_WAKEUP_PING,
123 : WALRCV_WAKEUP_REPLY,
124 : WALRCV_WAKEUP_HSFEEDBACK,
125 : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
126 : } WalRcvWakeupReason;
127 :
128 : /*
129 : * Wake up times for periodic tasks.
130 : */
131 : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
132 :
133 : static StringInfoData reply_message;
134 :
135 : /* Prototypes for private functions */
136 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
137 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
138 : static void WalRcvDie(int code, Datum arg);
139 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
140 : TimeLineID tli);
141 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
142 : TimeLineID tli);
143 : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
144 : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
145 : static void XLogWalRcvSendReply(bool force, bool requestReply);
146 : static void XLogWalRcvSendHSFeedback(bool immed);
147 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
148 : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
149 :
150 :
151 : /* Main entry point for walreceiver process */
152 : void
153 490 : WalReceiverMain(const void *startup_data, size_t startup_data_len)
154 : {
155 : char conninfo[MAXCONNINFO];
156 : char *tmp_conninfo;
157 : char slotname[NAMEDATALEN];
158 : bool is_temp_slot;
159 : XLogRecPtr startpoint;
160 : TimeLineID startpointTLI;
161 : TimeLineID primaryTLI;
162 : bool first_stream;
163 : WalRcvData *walrcv;
164 : TimestampTz now;
165 : char *err;
166 490 : char *sender_host = NULL;
167 490 : int sender_port = 0;
168 : char *appname;
169 :
170 : Assert(startup_data_len == 0);
171 :
172 490 : AuxiliaryProcessMainCommon();
173 :
174 : /*
175 : * WalRcv should be set up already (if we are a backend, we inherit this
176 : * by fork() or EXEC_BACKEND mechanism from the postmaster).
177 : */
178 490 : walrcv = WalRcv;
179 : Assert(walrcv != NULL);
180 :
181 : /*
182 : * Mark walreceiver as running in shared memory.
183 : *
184 : * Do this as early as possible, so that if we fail later on, we'll set
185 : * state to STOPPED. If we die before this, the startup process will keep
186 : * waiting for us to start up, until it times out.
187 : */
188 490 : SpinLockAcquire(&walrcv->mutex);
189 : Assert(walrcv->pid == 0);
190 490 : switch (walrcv->walRcvState)
191 : {
192 0 : case WALRCV_STOPPING:
193 : /* If we've already been requested to stop, don't start up. */
194 0 : walrcv->walRcvState = WALRCV_STOPPED;
195 : /* fall through */
196 :
197 8 : case WALRCV_STOPPED:
198 8 : SpinLockRelease(&walrcv->mutex);
199 8 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
200 8 : proc_exit(1);
201 : break;
202 :
203 482 : case WALRCV_STARTING:
204 : /* The usual case */
205 482 : break;
206 :
207 0 : case WALRCV_CONNECTING:
208 : case WALRCV_WAITING:
209 : case WALRCV_STREAMING:
210 : case WALRCV_RESTARTING:
211 : default:
212 : /* Shouldn't happen */
213 0 : SpinLockRelease(&walrcv->mutex);
214 0 : elog(PANIC, "walreceiver still running according to shared memory state");
215 : }
216 : /* Advertise our PID so that the startup process can kill us */
217 482 : walrcv->pid = MyProcPid;
218 482 : walrcv->walRcvState = WALRCV_CONNECTING;
219 :
220 : /* Fetch information required to start streaming */
221 482 : walrcv->ready_to_display = false;
222 482 : strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
223 482 : strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
224 482 : is_temp_slot = walrcv->is_temp_slot;
225 482 : startpoint = walrcv->receiveStart;
226 482 : startpointTLI = walrcv->receiveStartTLI;
227 :
228 : /*
229 : * At most one of is_temp_slot and slotname can be set; otherwise,
230 : * RequestXLogStreaming messed up.
231 : */
232 : Assert(!is_temp_slot || (slotname[0] == '\0'));
233 :
234 : /* Initialise to a sanish value */
235 482 : now = GetCurrentTimestamp();
236 482 : walrcv->lastMsgSendTime =
237 482 : walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
238 :
239 : /* Report our proc number so that others can wake us up */
240 482 : walrcv->procno = MyProcNumber;
241 :
242 482 : SpinLockRelease(&walrcv->mutex);
243 :
244 482 : pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
245 :
246 : /* Arrange to clean up at walreceiver exit */
247 482 : on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
248 :
249 : /* Properly accept or ignore signals the postmaster might send us */
250 482 : pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
251 : * file */
252 482 : pqsignal(SIGINT, SIG_IGN);
253 482 : pqsignal(SIGTERM, die); /* request shutdown */
254 : /* SIGQUIT handler was already set up by InitPostmasterChild */
255 482 : pqsignal(SIGALRM, SIG_IGN);
256 482 : pqsignal(SIGPIPE, SIG_IGN);
257 482 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
258 482 : pqsignal(SIGUSR2, SIG_IGN);
259 :
260 : /* Reset some signals that are accepted by postmaster but not here */
261 482 : pqsignal(SIGCHLD, SIG_DFL);
262 :
263 : /* Load the libpq-specific functions */
264 482 : load_file("libpqwalreceiver", false);
265 482 : if (WalReceiverFunctions == NULL)
266 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
267 :
268 : /* Unblock signals (they were blocked when the postmaster forked us) */
269 482 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
270 :
271 : /* Establish the connection to the primary for XLOG streaming */
272 482 : appname = cluster_name[0] ? cluster_name : "walreceiver";
273 482 : wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
274 478 : if (!wrconn)
275 190 : ereport(ERROR,
276 : (errcode(ERRCODE_CONNECTION_FAILURE),
277 : errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
278 : appname, err)));
279 :
280 : /*
281 : * Save user-visible connection string. This clobbers the original
282 : * conninfo, for security. Also save host and port of the sender server
283 : * this walreceiver is connected to.
284 : */
285 288 : tmp_conninfo = walrcv_get_conninfo(wrconn);
286 288 : walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
287 288 : SpinLockAcquire(&walrcv->mutex);
288 288 : memset(walrcv->conninfo, 0, MAXCONNINFO);
289 288 : if (tmp_conninfo)
290 288 : strlcpy(walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
291 :
292 288 : memset(walrcv->sender_host, 0, NI_MAXHOST);
293 288 : if (sender_host)
294 288 : strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
295 :
296 288 : walrcv->sender_port = sender_port;
297 288 : walrcv->ready_to_display = true;
298 288 : SpinLockRelease(&walrcv->mutex);
299 :
300 288 : if (tmp_conninfo)
301 288 : pfree(tmp_conninfo);
302 :
303 288 : if (sender_host)
304 288 : pfree(sender_host);
305 :
306 288 : first_stream = true;
307 : for (;;)
308 24 : {
309 : char *primary_sysid;
310 : char standby_sysid[32];
311 : WalRcvStreamOptions options;
312 :
313 : /*
314 : * Check that we're connected to a valid server using the
315 : * IDENTIFY_SYSTEM replication command.
316 : */
317 312 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
318 :
319 312 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
320 : GetSystemIdentifier());
321 312 : if (strcmp(primary_sysid, standby_sysid) != 0)
322 : {
323 0 : ereport(ERROR,
324 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
325 : errmsg("database system identifier differs between the primary and standby"),
326 : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
327 : primary_sysid, standby_sysid)));
328 : }
329 :
330 : /*
331 : * Confirm that the current timeline of the primary is the same or
332 : * ahead of ours.
333 : */
334 312 : if (primaryTLI < startpointTLI)
335 0 : ereport(ERROR,
336 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
337 : errmsg("highest timeline %u of the primary is behind recovery timeline %u",
338 : primaryTLI, startpointTLI)));
339 :
340 : /*
341 : * Get any missing history files. We do this always, even when we're
342 : * not interested in that timeline, so that if we're promoted to
343 : * become the primary later on, we don't select the same timeline that
344 : * was already used in the current primary. This isn't bullet-proof -
345 : * you'll need some external software to manage your cluster if you
346 : * need to ensure that a unique timeline id is chosen in every case,
347 : * but let's avoid the confusion of timeline id collisions where we
348 : * can.
349 : */
350 312 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
351 :
352 : /*
353 : * Create temporary replication slot if requested, and update slot
354 : * name in shared memory. (Note the slot name cannot already be set
355 : * in this case.)
356 : */
357 312 : if (is_temp_slot)
358 : {
359 0 : snprintf(slotname, sizeof(slotname),
360 : "pg_walreceiver_%lld",
361 0 : (long long int) walrcv_get_backend_pid(wrconn));
362 :
363 0 : walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
364 :
365 0 : SpinLockAcquire(&walrcv->mutex);
366 0 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
367 0 : SpinLockRelease(&walrcv->mutex);
368 : }
369 :
370 : /*
371 : * Start streaming.
372 : *
373 : * We'll try to start at the requested starting point and timeline,
374 : * even if it's different from the server's latest timeline. In case
375 : * we've already reached the end of the old timeline, the server will
376 : * finish the streaming immediately, and we will go back to await
377 : * orders from the startup process. If recovery_target_timeline is
378 : * 'latest', the startup process will scan pg_wal and find the new
379 : * history file, bump recovery target timeline, and ask us to restart
380 : * on the new timeline.
381 : */
382 312 : options.logical = false;
383 312 : options.startpoint = startpoint;
384 312 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
385 312 : options.proto.physical.startpointTLI = startpointTLI;
386 312 : if (walrcv_startstreaming(wrconn, &options))
387 : {
388 310 : if (first_stream)
389 286 : ereport(LOG,
390 : errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
391 : LSN_FORMAT_ARGS(startpoint), startpointTLI));
392 : else
393 24 : ereport(LOG,
394 : errmsg("restarted WAL streaming at %X/%08X on timeline %u",
395 : LSN_FORMAT_ARGS(startpoint), startpointTLI));
396 310 : first_stream = false;
397 :
398 : /*
399 : * Switch to STREAMING after a successful connection if current
400 : * state is CONNECTING. This switch happens after an initial
401 : * startup, or after a restart as determined by
402 : * WalRcvWaitForStartPosition().
403 : */
404 310 : SpinLockAcquire(&walrcv->mutex);
405 310 : if (walrcv->walRcvState == WALRCV_CONNECTING)
406 310 : walrcv->walRcvState = WALRCV_STREAMING;
407 310 : SpinLockRelease(&walrcv->mutex);
408 :
409 : /* Initialize LogstreamResult and buffers for processing messages */
410 310 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
411 310 : initStringInfo(&reply_message);
412 :
413 : /* Initialize nap wakeup times. */
414 310 : now = GetCurrentTimestamp();
415 1550 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
416 1240 : WalRcvComputeNextWakeup(i, now);
417 :
418 : /* Send initial reply/feedback messages. */
419 310 : XLogWalRcvSendReply(true, false);
420 310 : XLogWalRcvSendHSFeedback(true);
421 :
422 : /* Loop until end-of-streaming or error */
423 : for (;;)
424 79650 : {
425 : char *buf;
426 : int len;
427 79960 : bool endofwal = false;
428 79960 : pgsocket wait_fd = PGINVALID_SOCKET;
429 : int rc;
430 : TimestampTz nextWakeup;
431 : long nap;
432 :
433 : /*
434 : * Exit walreceiver if we're not in recovery. This should not
435 : * happen, but cross-check the status here.
436 : */
437 79960 : if (!RecoveryInProgress())
438 0 : ereport(FATAL,
439 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
440 : errmsg("cannot continue WAL streaming, recovery has already ended")));
441 :
442 : /* Process any requests or signals received recently */
443 79960 : CHECK_FOR_INTERRUPTS();
444 :
445 79960 : if (ConfigReloadPending)
446 : {
447 56 : ConfigReloadPending = false;
448 56 : ProcessConfigFile(PGC_SIGHUP);
449 : /* recompute wakeup times */
450 56 : now = GetCurrentTimestamp();
451 280 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
452 224 : WalRcvComputeNextWakeup(i, now);
453 56 : XLogWalRcvSendHSFeedback(true);
454 : }
455 :
456 : /* See if we can read data immediately */
457 79960 : len = walrcv_receive(wrconn, &buf, &wait_fd);
458 79898 : if (len != 0)
459 : {
460 : /*
461 : * Process the received data, and any subsequent data we
462 : * can read without blocking.
463 : */
464 : for (;;)
465 : {
466 268300 : if (len > 0)
467 : {
468 : /*
469 : * Something was received from primary, so adjust
470 : * the ping and terminate wakeup times.
471 : */
472 203954 : now = GetCurrentTimestamp();
473 203954 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
474 : now);
475 203954 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
476 203954 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
477 : startpointTLI);
478 : }
479 64346 : else if (len == 0)
480 64262 : break;
481 84 : else if (len < 0)
482 : {
483 84 : ereport(LOG,
484 : (errmsg("replication terminated by primary server"),
485 : errdetail("End of WAL reached on timeline %u at %X/%08X.",
486 : startpointTLI,
487 : LSN_FORMAT_ARGS(LogstreamResult.Write))));
488 84 : endofwal = true;
489 84 : break;
490 : }
491 203954 : len = walrcv_receive(wrconn, &buf, &wait_fd);
492 : }
493 :
494 : /* Let the primary know that we received some data. */
495 64346 : XLogWalRcvSendReply(false, false);
496 :
497 : /*
498 : * If we've written some records, flush them to disk and
499 : * let the startup process and primary server know about
500 : * them.
501 : */
502 64346 : XLogWalRcvFlush(false, startpointTLI);
503 : }
504 :
505 : /* Check if we need to exit the streaming loop. */
506 79894 : if (endofwal)
507 84 : break;
508 :
509 : /* Find the soonest wakeup time, to limit our nap. */
510 79810 : nextWakeup = TIMESTAMP_INFINITY;
511 399050 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
512 319240 : nextWakeup = Min(wakeup[i], nextWakeup);
513 :
514 : /* Calculate the nap time, clamping as necessary. */
515 79810 : now = GetCurrentTimestamp();
516 79810 : nap = TimestampDifferenceMilliseconds(now, nextWakeup);
517 :
518 : /*
519 : * Ideally we would reuse a WaitEventSet object repeatedly
520 : * here to avoid the overheads of WaitLatchOrSocket on epoll
521 : * systems, but we can't be sure that libpq (or any other
522 : * walreceiver implementation) has the same socket (even if
523 : * the fd is the same number, it may have been closed and
524 : * reopened since the last time). In future, if there is a
525 : * function for removing sockets from WaitEventSet, then we
526 : * could add and remove just the socket each time, potentially
527 : * avoiding some system calls.
528 : */
529 : Assert(wait_fd != PGINVALID_SOCKET);
530 79810 : rc = WaitLatchOrSocket(MyLatch,
531 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
532 : WL_TIMEOUT | WL_LATCH_SET,
533 : wait_fd,
534 : nap,
535 : WAIT_EVENT_WAL_RECEIVER_MAIN);
536 79810 : if (rc & WL_LATCH_SET)
537 : {
538 12364 : ResetLatch(MyLatch);
539 12364 : CHECK_FOR_INTERRUPTS();
540 :
541 12204 : if (walrcv->force_reply)
542 : {
543 : /*
544 : * The recovery process has asked us to send apply
545 : * feedback now. Make sure the flag is really set to
546 : * false in shared memory before sending the reply, so
547 : * we don't miss a new request for a reply.
548 : */
549 12100 : walrcv->force_reply = false;
550 12100 : pg_memory_barrier();
551 12100 : XLogWalRcvSendReply(true, false);
552 : }
553 : }
554 79650 : if (rc & WL_TIMEOUT)
555 : {
556 : /*
557 : * We didn't receive anything new. If we haven't heard
558 : * anything from the server for more than
559 : * wal_receiver_timeout / 2, ping the server. Also, if
560 : * it's been longer than wal_receiver_status_interval
561 : * since the last update we sent, send a status update to
562 : * the primary anyway, to report any progress in applying
563 : * WAL.
564 : */
565 10 : bool requestReply = false;
566 :
567 : /*
568 : * Report pending statistics to the cumulative stats
569 : * system. This location is useful for the report as it
570 : * is not within a tight loop in the WAL receiver, to
571 : * avoid bloating pgstats with requests, while also making
572 : * sure that the reports happen each time a status update
573 : * is sent.
574 : */
575 10 : pgstat_report_wal(false);
576 :
577 : /*
578 : * Check if time since last receive from primary has
579 : * reached the configured limit.
580 : */
581 10 : now = GetCurrentTimestamp();
582 10 : if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
583 0 : ereport(ERROR,
584 : (errcode(ERRCODE_CONNECTION_FAILURE),
585 : errmsg("terminating walreceiver due to timeout")));
586 :
587 : /*
588 : * If we didn't receive anything new for half of receiver
589 : * replication timeout, then ping the server.
590 : */
591 10 : if (now >= wakeup[WALRCV_WAKEUP_PING])
592 : {
593 0 : requestReply = true;
594 0 : wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
595 : }
596 :
597 10 : XLogWalRcvSendReply(requestReply, requestReply);
598 10 : XLogWalRcvSendHSFeedback(false);
599 : }
600 : }
601 :
602 : /*
603 : * The backend finished streaming. Exit streaming COPY-mode from
604 : * our side, too.
605 : */
606 84 : walrcv_endstreaming(wrconn, &primaryTLI);
607 :
608 : /*
609 : * If the server had switched to a new timeline that we didn't
610 : * know about when we began streaming, fetch its timeline history
611 : * file now.
612 : */
613 24 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
614 : }
615 : else
616 0 : ereport(LOG,
617 : (errmsg("primary server contains no more WAL on requested timeline %u",
618 : startpointTLI)));
619 :
620 : /*
621 : * End of WAL reached on the requested timeline. Close the last
622 : * segment, and await for new orders from the startup process.
623 : */
624 24 : if (recvFile >= 0)
625 : {
626 : char xlogfname[MAXFNAMELEN];
627 :
628 22 : XLogWalRcvFlush(false, startpointTLI);
629 22 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
630 22 : if (close(recvFile) != 0)
631 0 : ereport(PANIC,
632 : (errcode_for_file_access(),
633 : errmsg("could not close WAL segment %s: %m",
634 : xlogfname)));
635 :
636 : /*
637 : * Create .done file forcibly to prevent the streamed segment from
638 : * being archived later.
639 : */
640 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
641 22 : XLogArchiveForceDone(xlogfname);
642 : else
643 0 : XLogArchiveNotify(xlogfname);
644 : }
645 24 : recvFile = -1;
646 :
647 24 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
648 24 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
649 : }
650 : /* not reached */
651 : }
652 :
653 : /*
654 : * Wait for startup process to set receiveStart and receiveStartTLI.
655 : */
656 : static void
657 24 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
658 : {
659 24 : WalRcvData *walrcv = WalRcv;
660 : int state;
661 :
662 24 : SpinLockAcquire(&walrcv->mutex);
663 24 : state = walrcv->walRcvState;
664 24 : if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING)
665 : {
666 0 : SpinLockRelease(&walrcv->mutex);
667 0 : if (state == WALRCV_STOPPING)
668 0 : proc_exit(0);
669 : else
670 0 : elog(FATAL, "unexpected walreceiver state");
671 : }
672 24 : walrcv->walRcvState = WALRCV_WAITING;
673 24 : walrcv->receiveStart = InvalidXLogRecPtr;
674 24 : walrcv->receiveStartTLI = 0;
675 24 : SpinLockRelease(&walrcv->mutex);
676 :
677 24 : set_ps_display("idle");
678 :
679 : /*
680 : * nudge startup process to notice that we've stopped streaming and are
681 : * now waiting for instructions.
682 : */
683 24 : WakeupRecovery();
684 : for (;;)
685 : {
686 48 : ResetLatch(MyLatch);
687 :
688 48 : CHECK_FOR_INTERRUPTS();
689 :
690 48 : SpinLockAcquire(&walrcv->mutex);
691 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
692 : walrcv->walRcvState == WALRCV_WAITING ||
693 : walrcv->walRcvState == WALRCV_STOPPING);
694 48 : if (walrcv->walRcvState == WALRCV_RESTARTING)
695 : {
696 : /*
697 : * No need to handle changes in primary_conninfo or
698 : * primary_slot_name here. Startup process will signal us to
699 : * terminate in case those change.
700 : */
701 24 : *startpoint = walrcv->receiveStart;
702 24 : *startpointTLI = walrcv->receiveStartTLI;
703 24 : walrcv->walRcvState = WALRCV_CONNECTING;
704 24 : SpinLockRelease(&walrcv->mutex);
705 24 : break;
706 : }
707 24 : if (walrcv->walRcvState == WALRCV_STOPPING)
708 : {
709 : /*
710 : * We should've received SIGTERM if the startup process wants us
711 : * to die, but might as well check it here too.
712 : */
713 0 : SpinLockRelease(&walrcv->mutex);
714 0 : exit(1);
715 : }
716 24 : SpinLockRelease(&walrcv->mutex);
717 :
718 24 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
719 : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
720 : }
721 :
722 24 : if (update_process_title)
723 : {
724 : char activitymsg[50];
725 :
726 24 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
727 24 : LSN_FORMAT_ARGS(*startpoint));
728 24 : set_ps_display(activitymsg);
729 : }
730 24 : }
731 :
732 : /*
733 : * Fetch any missing timeline history files between 'first' and 'last'
734 : * (inclusive) from the server.
735 : */
736 : static void
737 336 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
738 : {
739 : TimeLineID tli;
740 :
741 716 : for (tli = first; tli <= last; tli++)
742 : {
743 : /* there's no history file for timeline 1 */
744 380 : if (tli != 1 && !existsTimeLineHistory(tli))
745 : {
746 : char *fname;
747 : char *content;
748 : int len;
749 : char expectedfname[MAXFNAMELEN];
750 :
751 22 : ereport(LOG,
752 : (errmsg("fetching timeline history file for timeline %u from primary server",
753 : tli)));
754 :
755 22 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
756 :
757 : /*
758 : * Check that the filename on the primary matches what we
759 : * calculated ourselves. This is just a sanity check, it should
760 : * always match.
761 : */
762 22 : TLHistoryFileName(expectedfname, tli);
763 22 : if (strcmp(fname, expectedfname) != 0)
764 0 : ereport(ERROR,
765 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
766 : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
767 : tli)));
768 :
769 : /*
770 : * Write the file to pg_wal.
771 : */
772 22 : writeTimeLineHistoryFile(tli, content, len);
773 :
774 : /*
775 : * Mark the streamed history file as ready for archiving if
776 : * archive_mode is always.
777 : */
778 22 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
779 22 : XLogArchiveForceDone(fname);
780 : else
781 0 : XLogArchiveNotify(fname);
782 :
783 22 : pfree(fname);
784 22 : pfree(content);
785 : }
786 : }
787 336 : }
788 :
789 : /*
790 : * Mark us as STOPPED in shared memory at exit.
791 : */
792 : static void
793 482 : WalRcvDie(int code, Datum arg)
794 : {
795 482 : WalRcvData *walrcv = WalRcv;
796 482 : TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
797 :
798 : Assert(*startpointTLI_p != 0);
799 :
800 : /* Ensure that all WAL records received are flushed to disk */
801 482 : XLogWalRcvFlush(true, *startpointTLI_p);
802 :
803 : /* Mark ourselves inactive in shared memory */
804 482 : SpinLockAcquire(&walrcv->mutex);
805 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
806 : walrcv->walRcvState == WALRCV_CONNECTING ||
807 : walrcv->walRcvState == WALRCV_RESTARTING ||
808 : walrcv->walRcvState == WALRCV_STARTING ||
809 : walrcv->walRcvState == WALRCV_WAITING ||
810 : walrcv->walRcvState == WALRCV_STOPPING);
811 : Assert(walrcv->pid == MyProcPid);
812 482 : walrcv->walRcvState = WALRCV_STOPPED;
813 482 : walrcv->pid = 0;
814 482 : walrcv->procno = INVALID_PROC_NUMBER;
815 482 : walrcv->ready_to_display = false;
816 482 : SpinLockRelease(&walrcv->mutex);
817 :
818 482 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
819 :
820 : /* Terminate the connection gracefully. */
821 482 : if (wrconn != NULL)
822 288 : walrcv_disconnect(wrconn);
823 :
824 : /* Wake up the startup process to notice promptly that we're gone */
825 482 : WakeupRecovery();
826 482 : }
827 :
828 : /*
829 : * Accept the message from XLOG stream, and process it.
830 : */
831 : static void
832 203954 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
833 : {
834 : int hdrlen;
835 : XLogRecPtr dataStart;
836 : XLogRecPtr walEnd;
837 : TimestampTz sendTime;
838 : bool replyRequested;
839 :
840 203954 : switch (type)
841 : {
842 197184 : case PqReplMsg_WALData:
843 : {
844 : StringInfoData incoming_message;
845 :
846 197184 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
847 197184 : if (len < hdrlen)
848 0 : ereport(ERROR,
849 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
850 : errmsg_internal("invalid WAL message received from primary")));
851 :
852 : /* initialize a StringInfo with the given buffer */
853 197184 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
854 :
855 : /* read the fields */
856 197184 : dataStart = pq_getmsgint64(&incoming_message);
857 197184 : walEnd = pq_getmsgint64(&incoming_message);
858 197184 : sendTime = pq_getmsgint64(&incoming_message);
859 197184 : ProcessWalSndrMessage(walEnd, sendTime);
860 :
861 197184 : buf += hdrlen;
862 197184 : len -= hdrlen;
863 197184 : XLogWalRcvWrite(buf, len, dataStart, tli);
864 197184 : break;
865 : }
866 6770 : case PqReplMsg_Keepalive:
867 : {
868 : StringInfoData incoming_message;
869 :
870 6770 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
871 6770 : if (len != hdrlen)
872 0 : ereport(ERROR,
873 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
874 : errmsg_internal("invalid keepalive message received from primary")));
875 :
876 : /* initialize a StringInfo with the given buffer */
877 6770 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
878 :
879 : /* read the fields */
880 6770 : walEnd = pq_getmsgint64(&incoming_message);
881 6770 : sendTime = pq_getmsgint64(&incoming_message);
882 6770 : replyRequested = pq_getmsgbyte(&incoming_message);
883 :
884 6770 : ProcessWalSndrMessage(walEnd, sendTime);
885 :
886 : /* If the primary requested a reply, send one immediately */
887 6770 : if (replyRequested)
888 6770 : XLogWalRcvSendReply(true, false);
889 6770 : break;
890 : }
891 0 : default:
892 0 : ereport(ERROR,
893 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
894 : errmsg_internal("invalid replication message type %d",
895 : type)));
896 : }
897 203954 : }
898 :
899 : /*
900 : * Write XLOG data to disk.
901 : */
902 : static void
903 197184 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
904 : {
905 : int startoff;
906 : int byteswritten;
907 : instr_time start;
908 :
909 : Assert(tli != 0);
910 :
911 395374 : while (nbytes > 0)
912 : {
913 : int segbytes;
914 :
915 : /* Close the current segment if it's completed */
916 198190 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
917 1006 : XLogWalRcvClose(recptr, tli);
918 :
919 198190 : if (recvFile < 0)
920 : {
921 : /* Create/use new log file */
922 1732 : XLByteToSeg(recptr, recvSegNo, wal_segment_size);
923 1732 : recvFile = XLogFileInit(recvSegNo, tli);
924 1732 : recvFileTLI = tli;
925 : }
926 :
927 : /* Calculate the start offset of the received logs */
928 198190 : startoff = XLogSegmentOffset(recptr, wal_segment_size);
929 :
930 198190 : if (startoff + nbytes > wal_segment_size)
931 1006 : segbytes = wal_segment_size - startoff;
932 : else
933 197184 : segbytes = nbytes;
934 :
935 : /* OK to write the logs */
936 198190 : errno = 0;
937 :
938 : /*
939 : * Measure I/O timing to write WAL data, for pg_stat_io.
940 : */
941 198190 : start = pgstat_prepare_io_time(track_wal_io_timing);
942 :
943 198190 : pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
944 198190 : byteswritten = pg_pwrite(recvFile, buf, segbytes, (pgoff_t) startoff);
945 198190 : pgstat_report_wait_end();
946 :
947 198190 : pgstat_count_io_op_time(IOOBJECT_WAL, IOCONTEXT_NORMAL,
948 : IOOP_WRITE, start, 1, byteswritten);
949 :
950 198190 : if (byteswritten <= 0)
951 : {
952 : char xlogfname[MAXFNAMELEN];
953 : int save_errno;
954 :
955 : /* if write didn't set errno, assume no disk space */
956 0 : if (errno == 0)
957 0 : errno = ENOSPC;
958 :
959 0 : save_errno = errno;
960 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
961 0 : errno = save_errno;
962 0 : ereport(PANIC,
963 : (errcode_for_file_access(),
964 : errmsg("could not write to WAL segment %s "
965 : "at offset %d, length %d: %m",
966 : xlogfname, startoff, segbytes)));
967 : }
968 :
969 : /* Update state for write */
970 198190 : recptr += byteswritten;
971 :
972 198190 : nbytes -= byteswritten;
973 198190 : buf += byteswritten;
974 :
975 198190 : LogstreamResult.Write = recptr;
976 : }
977 :
978 : /* Update shared-memory status */
979 197184 : pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
980 :
981 : /*
982 : * If we wrote an LSN that someone was waiting for, notify the waiters.
983 : */
984 394368 : if (waitLSNState &&
985 197184 : (LogstreamResult.Write >=
986 197184 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_WRITE])))
987 4 : WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, LogstreamResult.Write);
988 :
989 : /*
990 : * Close the current segment if it's fully written up in the last cycle of
991 : * the loop, to create its archive notification file soon. Otherwise WAL
992 : * archiving of the segment will be delayed until any data in the next
993 : * segment is received and written.
994 : */
995 197184 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
996 456 : XLogWalRcvClose(recptr, tli);
997 197184 : }
998 :
999 : /*
1000 : * Flush the log to disk.
1001 : *
1002 : * If we're in the midst of dying, it's unwise to do anything that might throw
1003 : * an error, so we skip sending a reply in that case.
1004 : */
1005 : static void
1006 66312 : XLogWalRcvFlush(bool dying, TimeLineID tli)
1007 : {
1008 : Assert(tli != 0);
1009 :
1010 66312 : if (LogstreamResult.Flush < LogstreamResult.Write)
1011 : {
1012 64872 : WalRcvData *walrcv = WalRcv;
1013 :
1014 64872 : issue_xlog_fsync(recvFile, recvSegNo, tli);
1015 :
1016 64872 : LogstreamResult.Flush = LogstreamResult.Write;
1017 :
1018 : /* Update shared-memory status */
1019 64872 : SpinLockAcquire(&walrcv->mutex);
1020 64872 : if (walrcv->flushedUpto < LogstreamResult.Flush)
1021 : {
1022 64872 : walrcv->latestChunkStart = walrcv->flushedUpto;
1023 64872 : walrcv->flushedUpto = LogstreamResult.Flush;
1024 64872 : walrcv->receivedTLI = tli;
1025 : }
1026 64872 : SpinLockRelease(&walrcv->mutex);
1027 :
1028 : /*
1029 : * If we flushed an LSN that someone was waiting for, notify the
1030 : * waiters.
1031 : */
1032 129744 : if (waitLSNState &&
1033 64872 : (LogstreamResult.Flush >=
1034 64872 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_FLUSH])))
1035 4 : WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, LogstreamResult.Flush);
1036 :
1037 : /* Signal the startup process and walsender that new WAL has arrived */
1038 64872 : WakeupRecovery();
1039 64872 : if (AllowCascadeReplication())
1040 64872 : WalSndWakeup(true, false);
1041 :
1042 : /* Report XLOG streaming progress in PS display */
1043 64872 : if (update_process_title)
1044 : {
1045 : char activitymsg[50];
1046 :
1047 64872 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1048 64872 : LSN_FORMAT_ARGS(LogstreamResult.Write));
1049 64872 : set_ps_display(activitymsg);
1050 : }
1051 :
1052 : /* Also let the primary know that we made some progress */
1053 64872 : if (!dying)
1054 : {
1055 64868 : XLogWalRcvSendReply(false, false);
1056 64868 : XLogWalRcvSendHSFeedback(false);
1057 : }
1058 : }
1059 66312 : }
1060 :
1061 : /*
1062 : * Close the current segment.
1063 : *
1064 : * Flush the segment to disk before closing it. Otherwise we have to
1065 : * reopen and fsync it later.
1066 : *
1067 : * Create an archive notification file since the segment is known completed.
1068 : */
1069 : static void
1070 1462 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
1071 : {
1072 : char xlogfname[MAXFNAMELEN];
1073 :
1074 : Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1075 : Assert(tli != 0);
1076 :
1077 : /*
1078 : * fsync() and close current file before we switch to next one. We would
1079 : * otherwise have to reopen this file to fsync it later
1080 : */
1081 1462 : XLogWalRcvFlush(false, tli);
1082 :
1083 1462 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1084 :
1085 : /*
1086 : * XLOG segment files will be re-read by recovery in startup process soon,
1087 : * so we don't advise the OS to release cache pages associated with the
1088 : * file like XLogFileClose() does.
1089 : */
1090 1462 : if (close(recvFile) != 0)
1091 0 : ereport(PANIC,
1092 : (errcode_for_file_access(),
1093 : errmsg("could not close WAL segment %s: %m",
1094 : xlogfname)));
1095 :
1096 : /*
1097 : * Create .done file forcibly to prevent the streamed segment from being
1098 : * archived later.
1099 : */
1100 1462 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1101 1462 : XLogArchiveForceDone(xlogfname);
1102 : else
1103 0 : XLogArchiveNotify(xlogfname);
1104 :
1105 1462 : recvFile = -1;
1106 1462 : }
1107 :
1108 : /*
1109 : * Send reply message to primary, indicating our current WAL locations, oldest
1110 : * xmin and the current time.
1111 : *
1112 : * If 'force' is not set, the message is only sent if enough time has
1113 : * passed since last status update to reach wal_receiver_status_interval.
1114 : * If wal_receiver_status_interval is disabled altogether and 'force' is
1115 : * false, this is a no-op.
1116 : *
1117 : * If 'requestReply' is true, requests the server to reply immediately upon
1118 : * receiving this message. This is used for heartbeats, when approaching
1119 : * wal_receiver_timeout.
1120 : */
1121 : static void
1122 148404 : XLogWalRcvSendReply(bool force, bool requestReply)
1123 : {
1124 : static XLogRecPtr writePtr = InvalidXLogRecPtr;
1125 : static XLogRecPtr flushPtr = InvalidXLogRecPtr;
1126 : XLogRecPtr applyPtr;
1127 : TimestampTz now;
1128 :
1129 : /*
1130 : * If the user doesn't want status to be reported to the primary, be sure
1131 : * to exit before doing anything at all.
1132 : */
1133 148404 : if (!force && wal_receiver_status_interval <= 0)
1134 0 : return;
1135 :
1136 : /* Get current timestamp. */
1137 148404 : now = GetCurrentTimestamp();
1138 :
1139 : /*
1140 : * We can compare the write and flush positions to the last message we
1141 : * sent without taking any lock, but the apply position requires a spin
1142 : * lock, so we don't check that unless something else has changed or 10
1143 : * seconds have passed. This means that the apply WAL location will
1144 : * appear, from the primary's point of view, to lag slightly, but since
1145 : * this is only for reporting purposes and only on idle systems, that's
1146 : * probably OK.
1147 : */
1148 148404 : if (!force
1149 129224 : && writePtr == LogstreamResult.Write
1150 63752 : && flushPtr == LogstreamResult.Flush
1151 346 : && now < wakeup[WALRCV_WAKEUP_REPLY])
1152 346 : return;
1153 :
1154 : /* Make sure we wake up when it's time to send another reply. */
1155 148058 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
1156 :
1157 : /* Construct a new message */
1158 148058 : writePtr = LogstreamResult.Write;
1159 148058 : flushPtr = LogstreamResult.Flush;
1160 148058 : applyPtr = GetXLogReplayRecPtr(NULL);
1161 :
1162 148058 : resetStringInfo(&reply_message);
1163 148058 : pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
1164 148058 : pq_sendint64(&reply_message, writePtr);
1165 148058 : pq_sendint64(&reply_message, flushPtr);
1166 148058 : pq_sendint64(&reply_message, applyPtr);
1167 148058 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1168 148058 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1169 :
1170 : /* Send it */
1171 148058 : elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1172 : LSN_FORMAT_ARGS(writePtr),
1173 : LSN_FORMAT_ARGS(flushPtr),
1174 : LSN_FORMAT_ARGS(applyPtr),
1175 : requestReply ? " (reply requested)" : "");
1176 :
1177 148058 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1178 : }
1179 :
1180 : /*
1181 : * Send hot standby feedback message to primary, plus the current time,
1182 : * in case they don't have a watch.
1183 : *
1184 : * If the user disables feedback, send one final message to tell sender
1185 : * to forget about the xmin on this standby. We also send this message
1186 : * on first connect because a previous connection might have set xmin
1187 : * on a replication slot. (If we're not using a slot it's harmless to
1188 : * send a feedback message explicitly setting InvalidTransactionId).
1189 : */
1190 : static void
1191 65244 : XLogWalRcvSendHSFeedback(bool immed)
1192 : {
1193 : TimestampTz now;
1194 : FullTransactionId nextFullXid;
1195 : TransactionId nextXid;
1196 : uint32 xmin_epoch,
1197 : catalog_xmin_epoch;
1198 : TransactionId xmin,
1199 : catalog_xmin;
1200 :
1201 : /* initially true so we always send at least one feedback message */
1202 : static bool primary_has_standby_xmin = true;
1203 :
1204 : /*
1205 : * If the user doesn't want status to be reported to the primary, be sure
1206 : * to exit before doing anything at all.
1207 : */
1208 65244 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1209 63974 : !primary_has_standby_xmin)
1210 64908 : return;
1211 :
1212 : /* Get current timestamp. */
1213 1534 : now = GetCurrentTimestamp();
1214 :
1215 : /* Send feedback at most once per wal_receiver_status_interval. */
1216 1534 : if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1217 1196 : return;
1218 :
1219 : /* Make sure we wake up when it's time to send feedback again. */
1220 338 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1221 :
1222 : /*
1223 : * If Hot Standby is not yet accepting connections there is nothing to
1224 : * send. Check this after the interval has expired to reduce number of
1225 : * calls.
1226 : *
1227 : * Bailing out here also ensures that we don't send feedback until we've
1228 : * read our own replication slot state, so we don't tell the primary to
1229 : * discard needed xmin or catalog_xmin from any slots that may exist on
1230 : * this replica.
1231 : */
1232 338 : if (!HotStandbyActive())
1233 2 : return;
1234 :
1235 : /*
1236 : * Make the expensive call to get the oldest xmin once we are certain
1237 : * everything else has been checked.
1238 : */
1239 336 : if (hot_standby_feedback)
1240 : {
1241 104 : GetReplicationHorizons(&xmin, &catalog_xmin);
1242 : }
1243 : else
1244 : {
1245 232 : xmin = InvalidTransactionId;
1246 232 : catalog_xmin = InvalidTransactionId;
1247 : }
1248 :
1249 : /*
1250 : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1251 : * the epoch boundary.
1252 : */
1253 336 : nextFullXid = ReadNextFullTransactionId();
1254 336 : nextXid = XidFromFullTransactionId(nextFullXid);
1255 336 : xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1256 336 : catalog_xmin_epoch = xmin_epoch;
1257 336 : if (nextXid < xmin)
1258 0 : xmin_epoch--;
1259 336 : if (nextXid < catalog_xmin)
1260 0 : catalog_xmin_epoch--;
1261 :
1262 336 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1263 : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1264 :
1265 : /* Construct the message and send it. */
1266 336 : resetStringInfo(&reply_message);
1267 336 : pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
1268 336 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1269 336 : pq_sendint32(&reply_message, xmin);
1270 336 : pq_sendint32(&reply_message, xmin_epoch);
1271 336 : pq_sendint32(&reply_message, catalog_xmin);
1272 336 : pq_sendint32(&reply_message, catalog_xmin_epoch);
1273 336 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1274 336 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1275 104 : primary_has_standby_xmin = true;
1276 : else
1277 232 : primary_has_standby_xmin = false;
1278 : }
1279 :
1280 : /*
1281 : * Update shared memory status upon receiving a message from primary.
1282 : *
1283 : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1284 : * message, reported by primary.
1285 : */
1286 : static void
1287 203954 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1288 : {
1289 203954 : WalRcvData *walrcv = WalRcv;
1290 203954 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1291 :
1292 : /* Update shared-memory status */
1293 203954 : SpinLockAcquire(&walrcv->mutex);
1294 203954 : if (walrcv->latestWalEnd < walEnd)
1295 25296 : walrcv->latestWalEndTime = sendTime;
1296 203954 : walrcv->latestWalEnd = walEnd;
1297 203954 : walrcv->lastMsgSendTime = sendTime;
1298 203954 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1299 203954 : SpinLockRelease(&walrcv->mutex);
1300 :
1301 203954 : if (message_level_is_interesting(DEBUG2))
1302 : {
1303 : char *sendtime;
1304 : char *receipttime;
1305 : int applyDelay;
1306 :
1307 : /* Copy because timestamptz_to_str returns a static buffer */
1308 800 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1309 800 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1310 800 : applyDelay = GetReplicationApplyDelay();
1311 :
1312 : /* apply delay is not available */
1313 800 : if (applyDelay == -1)
1314 2 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1315 : sendtime,
1316 : receipttime,
1317 : GetReplicationTransferLatency());
1318 : else
1319 798 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1320 : sendtime,
1321 : receipttime,
1322 : applyDelay,
1323 : GetReplicationTransferLatency());
1324 :
1325 800 : pfree(sendtime);
1326 800 : pfree(receipttime);
1327 : }
1328 203954 : }
1329 :
1330 : /*
1331 : * Compute the next wakeup time for a given wakeup reason. Can be called to
1332 : * initialize a wakeup time, to adjust it for the next wakeup, or to
1333 : * reinitialize it when GUCs have changed. We ask the caller to pass in the
1334 : * value of "now" because this frequently avoids multiple calls of
1335 : * GetCurrentTimestamp(). It had better be a reasonably up-to-date value
1336 : * though.
1337 : */
1338 : static void
1339 557768 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1340 : {
1341 557768 : switch (reason)
1342 : {
1343 204320 : case WALRCV_WAKEUP_TERMINATE:
1344 204320 : if (wal_receiver_timeout <= 0)
1345 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1346 : else
1347 204320 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
1348 204320 : break;
1349 204320 : case WALRCV_WAKEUP_PING:
1350 204320 : if (wal_receiver_timeout <= 0)
1351 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1352 : else
1353 204320 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
1354 204320 : break;
1355 704 : case WALRCV_WAKEUP_HSFEEDBACK:
1356 704 : if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
1357 506 : wakeup[reason] = TIMESTAMP_INFINITY;
1358 : else
1359 198 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1360 704 : break;
1361 148424 : case WALRCV_WAKEUP_REPLY:
1362 148424 : if (wal_receiver_status_interval <= 0)
1363 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1364 : else
1365 148424 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1366 148424 : break;
1367 : /* there's intentionally no default: here */
1368 : }
1369 557768 : }
1370 :
1371 : /*
1372 : * Wake up the walreceiver main loop.
1373 : *
1374 : * This is called by the startup process whenever interesting xlog records
1375 : * are applied, so that walreceiver can check if it needs to send an apply
1376 : * notification back to the primary which may be waiting in a COMMIT with
1377 : * synchronous_commit = remote_apply.
1378 : */
1379 : void
1380 11794 : WalRcvForceReply(void)
1381 : {
1382 : ProcNumber procno;
1383 :
1384 11794 : WalRcv->force_reply = true;
1385 : /* fetching the proc number is probably atomic, but don't rely on it */
1386 11794 : SpinLockAcquire(&WalRcv->mutex);
1387 11794 : procno = WalRcv->procno;
1388 11794 : SpinLockRelease(&WalRcv->mutex);
1389 11794 : if (procno != INVALID_PROC_NUMBER)
1390 11424 : SetLatch(&GetPGProcByNumber(procno)->procLatch);
1391 11794 : }
1392 :
1393 : /*
1394 : * Return a string constant representing the state. This is used
1395 : * in system functions and views, and should *not* be translated.
1396 : */
1397 : static const char *
1398 22 : WalRcvGetStateString(WalRcvState state)
1399 : {
1400 22 : switch (state)
1401 : {
1402 0 : case WALRCV_STOPPED:
1403 0 : return "stopped";
1404 0 : case WALRCV_STARTING:
1405 0 : return "starting";
1406 0 : case WALRCV_CONNECTING:
1407 0 : return "connecting";
1408 22 : case WALRCV_STREAMING:
1409 22 : return "streaming";
1410 0 : case WALRCV_WAITING:
1411 0 : return "waiting";
1412 0 : case WALRCV_RESTARTING:
1413 0 : return "restarting";
1414 0 : case WALRCV_STOPPING:
1415 0 : return "stopping";
1416 : }
1417 0 : return "UNKNOWN";
1418 : }
1419 :
1420 : /*
1421 : * Returns activity of WAL receiver, including pid, state and xlog locations
1422 : * received from the WAL sender of another server.
1423 : */
1424 : Datum
1425 42 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1426 : {
1427 : TupleDesc tupdesc;
1428 : Datum *values;
1429 : bool *nulls;
1430 : int pid;
1431 : bool ready_to_display;
1432 : WalRcvState state;
1433 : XLogRecPtr receive_start_lsn;
1434 : TimeLineID receive_start_tli;
1435 : XLogRecPtr written_lsn;
1436 : XLogRecPtr flushed_lsn;
1437 : TimeLineID received_tli;
1438 : TimestampTz last_send_time;
1439 : TimestampTz last_receipt_time;
1440 : XLogRecPtr latest_end_lsn;
1441 : TimestampTz latest_end_time;
1442 : char sender_host[NI_MAXHOST];
1443 42 : int sender_port = 0;
1444 : char slotname[NAMEDATALEN];
1445 : char conninfo[MAXCONNINFO];
1446 :
1447 : /* Take a lock to ensure value consistency */
1448 42 : SpinLockAcquire(&WalRcv->mutex);
1449 42 : pid = (int) WalRcv->pid;
1450 42 : ready_to_display = WalRcv->ready_to_display;
1451 42 : state = WalRcv->walRcvState;
1452 42 : receive_start_lsn = WalRcv->receiveStart;
1453 42 : receive_start_tli = WalRcv->receiveStartTLI;
1454 42 : flushed_lsn = WalRcv->flushedUpto;
1455 42 : received_tli = WalRcv->receivedTLI;
1456 42 : last_send_time = WalRcv->lastMsgSendTime;
1457 42 : last_receipt_time = WalRcv->lastMsgReceiptTime;
1458 42 : latest_end_lsn = WalRcv->latestWalEnd;
1459 42 : latest_end_time = WalRcv->latestWalEndTime;
1460 42 : strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1461 42 : strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1462 42 : sender_port = WalRcv->sender_port;
1463 42 : strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1464 42 : SpinLockRelease(&WalRcv->mutex);
1465 :
1466 : /*
1467 : * No WAL receiver (or not ready yet), just return a tuple with NULL
1468 : * values
1469 : */
1470 42 : if (pid == 0 || !ready_to_display)
1471 20 : PG_RETURN_NULL();
1472 :
1473 : /*
1474 : * Read "writtenUpto" without holding a spinlock. Note that it may not be
1475 : * consistent with the other shared variables of the WAL receiver
1476 : * protected by a spinlock, but this should not be used for data integrity
1477 : * checks.
1478 : */
1479 22 : written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1480 :
1481 : /* determine result type */
1482 22 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1483 0 : elog(ERROR, "return type must be a row type");
1484 :
1485 22 : values = palloc0_array(Datum, tupdesc->natts);
1486 22 : nulls = palloc0_array(bool, tupdesc->natts);
1487 :
1488 : /* Fetch values */
1489 22 : values[0] = Int32GetDatum(pid);
1490 :
1491 22 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1492 : {
1493 : /*
1494 : * Only superusers and roles with privileges of pg_read_all_stats can
1495 : * see details. Other users only get the pid value to know whether it
1496 : * is a WAL receiver, but no details.
1497 : */
1498 0 : memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1499 : }
1500 : else
1501 : {
1502 22 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1503 :
1504 22 : if (!XLogRecPtrIsValid(receive_start_lsn))
1505 0 : nulls[2] = true;
1506 : else
1507 22 : values[2] = LSNGetDatum(receive_start_lsn);
1508 22 : values[3] = Int32GetDatum(receive_start_tli);
1509 22 : if (!XLogRecPtrIsValid(written_lsn))
1510 0 : nulls[4] = true;
1511 : else
1512 22 : values[4] = LSNGetDatum(written_lsn);
1513 22 : if (!XLogRecPtrIsValid(flushed_lsn))
1514 0 : nulls[5] = true;
1515 : else
1516 22 : values[5] = LSNGetDatum(flushed_lsn);
1517 22 : values[6] = Int32GetDatum(received_tli);
1518 22 : if (last_send_time == 0)
1519 0 : nulls[7] = true;
1520 : else
1521 22 : values[7] = TimestampTzGetDatum(last_send_time);
1522 22 : if (last_receipt_time == 0)
1523 0 : nulls[8] = true;
1524 : else
1525 22 : values[8] = TimestampTzGetDatum(last_receipt_time);
1526 22 : if (!XLogRecPtrIsValid(latest_end_lsn))
1527 0 : nulls[9] = true;
1528 : else
1529 22 : values[9] = LSNGetDatum(latest_end_lsn);
1530 22 : if (latest_end_time == 0)
1531 0 : nulls[10] = true;
1532 : else
1533 22 : values[10] = TimestampTzGetDatum(latest_end_time);
1534 22 : if (*slotname == '\0')
1535 22 : nulls[11] = true;
1536 : else
1537 0 : values[11] = CStringGetTextDatum(slotname);
1538 22 : if (*sender_host == '\0')
1539 0 : nulls[12] = true;
1540 : else
1541 22 : values[12] = CStringGetTextDatum(sender_host);
1542 22 : if (sender_port == 0)
1543 0 : nulls[13] = true;
1544 : else
1545 22 : values[13] = Int32GetDatum(sender_port);
1546 22 : if (*conninfo == '\0')
1547 0 : nulls[14] = true;
1548 : else
1549 22 : values[14] = CStringGetTextDatum(conninfo);
1550 : }
1551 :
1552 : /* Returns the record as Datum */
1553 22 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1554 : }
|