Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiver.c
4 : *
5 : * The WAL receiver process (walreceiver) is new as of Postgres 9.0. It
6 : * is the process in the standby server that takes charge of receiving
7 : * XLOG records from a primary server during streaming replication.
8 : *
9 : * When the startup process determines that it's time to start streaming,
10 : * it instructs postmaster to start walreceiver. Walreceiver first connects
11 : * to the primary server (it will be served by a walsender process
12 : * in the primary server), and then keeps receiving XLOG records and
13 : * writing them to the disk as long as the connection is alive. As XLOG
14 : * records are received and flushed to disk, it updates the
15 : * WalRcv->flushedUpto variable in shared memory, to inform the startup
16 : * process of how far it can proceed with XLOG replay.
17 : *
18 : * A WAL receiver cannot directly load GUC parameters used when establishing
19 : * its connection to the primary. Instead it relies on parameter values
20 : * that are passed down by the startup process when streaming is requested.
21 : * This applies, for example, to the replication slot and the connection
22 : * string to be used for the connection with the primary.
23 : *
24 : * If the primary server ends streaming, but doesn't disconnect, walreceiver
25 : * goes into "waiting" mode, and waits for the startup process to give new
26 : * instructions. The startup process will treat that the same as
27 : * disconnection, and will rescan the archive/pg_wal directory. But when the
28 : * startup process wants to try streaming replication again, it will just
29 : * nudge the existing walreceiver process that's waiting, instead of launching
30 : * a new one.
31 : *
32 : * Normal termination is by SIGTERM, which instructs the walreceiver to
33 : * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
34 : * process, the walreceiver will simply abort and exit on SIGQUIT. A close
35 : * of the connection and a FATAL error are treated not as a crash but as
36 : * normal operation.
37 : *
38 : * This file contains the server-facing parts of walreceiver. The libpq-
39 : * specific parts are in the libpqwalreceiver module. It's loaded
40 : * dynamically to avoid linking the server with libpq.
41 : *
42 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
43 : *
44 : *
45 : * IDENTIFICATION
46 : * src/backend/replication/walreceiver.c
47 : *
48 : *-------------------------------------------------------------------------
49 : */
50 : #include "postgres.h"
51 :
52 : #include <unistd.h>
53 :
54 : #include "access/htup_details.h"
55 : #include "access/timeline.h"
56 : #include "access/transam.h"
57 : #include "access/xlog_internal.h"
58 : #include "access/xlogarchive.h"
59 : #include "access/xlogrecovery.h"
60 : #include "access/xlogwait.h"
61 : #include "catalog/pg_authid.h"
62 : #include "funcapi.h"
63 : #include "libpq/pqformat.h"
64 : #include "libpq/pqsignal.h"
65 : #include "miscadmin.h"
66 : #include "pgstat.h"
67 : #include "postmaster/auxprocess.h"
68 : #include "postmaster/interrupt.h"
69 : #include "replication/walreceiver.h"
70 : #include "replication/walsender.h"
71 : #include "storage/ipc.h"
72 : #include "storage/proc.h"
73 : #include "storage/procarray.h"
74 : #include "storage/procsignal.h"
75 : #include "tcop/tcopprot.h"
76 : #include "utils/acl.h"
77 : #include "utils/builtins.h"
78 : #include "utils/guc.h"
79 : #include "utils/pg_lsn.h"
80 : #include "utils/ps_status.h"
81 : #include "utils/timestamp.h"
82 :
83 :
84 : /*
85 : * GUC variables. (Other variables that affect walreceiver are in xlog.c
86 : * because they're passed down from the startup process, for better
87 : * synchronization.)
88 : */
89 : int wal_receiver_status_interval;
90 : int wal_receiver_timeout;
91 : bool hot_standby_feedback;
92 :
93 : /* libpqwalreceiver connection */
94 : static WalReceiverConn *wrconn = NULL;
95 : WalReceiverFunctionsType *WalReceiverFunctions = NULL;
96 :
97 : /*
98 : * These variables are used similarly to openLogFile/SegNo,
99 : * but for walreceiver to write the XLOG. recvFileTLI is the TimeLineID
100 : * corresponding the filename of recvFile.
101 : */
102 : static int recvFile = -1;
103 : static TimeLineID recvFileTLI = 0;
104 : static XLogSegNo recvSegNo = 0;
105 :
106 : /*
107 : * LogstreamResult indicates the byte positions that we have already
108 : * written/fsynced.
109 : */
110 : static struct
111 : {
112 : XLogRecPtr Write; /* last byte + 1 written out in the standby */
113 : XLogRecPtr Flush; /* last byte + 1 flushed in the standby */
114 : } LogstreamResult;
115 :
116 : /*
117 : * Reasons to wake up and perform periodic tasks.
118 : */
119 : typedef enum WalRcvWakeupReason
120 : {
121 : WALRCV_WAKEUP_TERMINATE,
122 : WALRCV_WAKEUP_PING,
123 : WALRCV_WAKEUP_REPLY,
124 : WALRCV_WAKEUP_HSFEEDBACK,
125 : #define NUM_WALRCV_WAKEUPS (WALRCV_WAKEUP_HSFEEDBACK + 1)
126 : } WalRcvWakeupReason;
127 :
128 : /*
129 : * Wake up times for periodic tasks.
130 : */
131 : static TimestampTz wakeup[NUM_WALRCV_WAKEUPS];
132 :
133 : static StringInfoData reply_message;
134 :
135 : /* Prototypes for private functions */
136 : static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
137 : static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
138 : static void WalRcvDie(int code, Datum arg);
139 : static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len,
140 : TimeLineID tli);
141 : static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr,
142 : TimeLineID tli);
143 : static void XLogWalRcvFlush(bool dying, TimeLineID tli);
144 : static void XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli);
145 : static void XLogWalRcvSendReply(bool force, bool requestReply);
146 : static void XLogWalRcvSendHSFeedback(bool immed);
147 : static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
148 : static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
149 :
150 :
151 : /* Main entry point for walreceiver process */
152 : void
153 480 : WalReceiverMain(const void *startup_data, size_t startup_data_len)
154 : {
155 : char conninfo[MAXCONNINFO];
156 : char *tmp_conninfo;
157 : char slotname[NAMEDATALEN];
158 : bool is_temp_slot;
159 : XLogRecPtr startpoint;
160 : TimeLineID startpointTLI;
161 : TimeLineID primaryTLI;
162 : bool first_stream;
163 : WalRcvData *walrcv;
164 : TimestampTz now;
165 : char *err;
166 480 : char *sender_host = NULL;
167 480 : int sender_port = 0;
168 : char *appname;
169 :
170 : Assert(startup_data_len == 0);
171 :
172 480 : AuxiliaryProcessMainCommon();
173 :
174 : /*
175 : * WalRcv should be set up already (if we are a backend, we inherit this
176 : * by fork() or EXEC_BACKEND mechanism from the postmaster).
177 : */
178 480 : walrcv = WalRcv;
179 : Assert(walrcv != NULL);
180 :
181 : /*
182 : * Mark walreceiver as running in shared memory.
183 : *
184 : * Do this as early as possible, so that if we fail later on, we'll set
185 : * state to STOPPED. If we die before this, the startup process will keep
186 : * waiting for us to start up, until it times out.
187 : */
188 480 : SpinLockAcquire(&walrcv->mutex);
189 : Assert(walrcv->pid == 0);
190 480 : switch (walrcv->walRcvState)
191 : {
192 0 : case WALRCV_STOPPING:
193 : /* If we've already been requested to stop, don't start up. */
194 0 : walrcv->walRcvState = WALRCV_STOPPED;
195 : /* fall through */
196 :
197 14 : case WALRCV_STOPPED:
198 14 : SpinLockRelease(&walrcv->mutex);
199 14 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
200 14 : proc_exit(1);
201 : break;
202 :
203 466 : case WALRCV_STARTING:
204 : /* The usual case */
205 466 : break;
206 :
207 0 : case WALRCV_CONNECTING:
208 : case WALRCV_WAITING:
209 : case WALRCV_STREAMING:
210 : case WALRCV_RESTARTING:
211 : default:
212 : /* Shouldn't happen */
213 0 : SpinLockRelease(&walrcv->mutex);
214 0 : elog(PANIC, "walreceiver still running according to shared memory state");
215 : }
216 : /* Advertise our PID so that the startup process can kill us */
217 466 : walrcv->pid = MyProcPid;
218 466 : walrcv->walRcvState = WALRCV_CONNECTING;
219 :
220 : /* Fetch information required to start streaming */
221 466 : walrcv->ready_to_display = false;
222 466 : strlcpy(conninfo, walrcv->conninfo, MAXCONNINFO);
223 466 : strlcpy(slotname, walrcv->slotname, NAMEDATALEN);
224 466 : is_temp_slot = walrcv->is_temp_slot;
225 466 : startpoint = walrcv->receiveStart;
226 466 : startpointTLI = walrcv->receiveStartTLI;
227 :
228 : /*
229 : * At most one of is_temp_slot and slotname can be set; otherwise,
230 : * RequestXLogStreaming messed up.
231 : */
232 : Assert(!is_temp_slot || (slotname[0] == '\0'));
233 :
234 : /* Initialise to a sanish value */
235 466 : now = GetCurrentTimestamp();
236 466 : walrcv->lastMsgSendTime =
237 466 : walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = now;
238 :
239 : /* Report our proc number so that others can wake us up */
240 466 : walrcv->procno = MyProcNumber;
241 :
242 466 : SpinLockRelease(&walrcv->mutex);
243 :
244 466 : pg_atomic_write_u64(&WalRcv->writtenUpto, 0);
245 :
246 : /* Arrange to clean up at walreceiver exit */
247 466 : on_shmem_exit(WalRcvDie, PointerGetDatum(&startpointTLI));
248 :
249 : /* Properly accept or ignore signals the postmaster might send us */
250 466 : pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
251 : * file */
252 466 : pqsignal(SIGINT, SIG_IGN);
253 466 : pqsignal(SIGTERM, die); /* request shutdown */
254 : /* SIGQUIT handler was already set up by InitPostmasterChild */
255 466 : pqsignal(SIGALRM, SIG_IGN);
256 466 : pqsignal(SIGPIPE, SIG_IGN);
257 466 : pqsignal(SIGUSR1, procsignal_sigusr1_handler);
258 466 : pqsignal(SIGUSR2, SIG_IGN);
259 :
260 : /* Reset some signals that are accepted by postmaster but not here */
261 466 : pqsignal(SIGCHLD, SIG_DFL);
262 :
263 : /* Load the libpq-specific functions */
264 466 : load_file("libpqwalreceiver", false);
265 466 : if (WalReceiverFunctions == NULL)
266 0 : elog(ERROR, "libpqwalreceiver didn't initialize correctly");
267 :
268 : /* Unblock signals (they were blocked when the postmaster forked us) */
269 466 : sigprocmask(SIG_SETMASK, &UnBlockSig, NULL);
270 :
271 : /* Establish the connection to the primary for XLOG streaming */
272 466 : appname = cluster_name[0] ? cluster_name : "walreceiver";
273 466 : wrconn = walrcv_connect(conninfo, true, false, false, appname, &err);
274 466 : if (!wrconn)
275 178 : ereport(ERROR,
276 : (errcode(ERRCODE_CONNECTION_FAILURE),
277 : errmsg("streaming replication receiver \"%s\" could not connect to the primary server: %s",
278 : appname, err)));
279 :
280 : /*
281 : * Save user-visible connection string. This clobbers the original
282 : * conninfo, for security. Also save host and port of the sender server
283 : * this walreceiver is connected to.
284 : */
285 288 : tmp_conninfo = walrcv_get_conninfo(wrconn);
286 288 : walrcv_get_senderinfo(wrconn, &sender_host, &sender_port);
287 288 : SpinLockAcquire(&walrcv->mutex);
288 288 : memset(walrcv->conninfo, 0, MAXCONNINFO);
289 288 : if (tmp_conninfo)
290 288 : strlcpy(walrcv->conninfo, tmp_conninfo, MAXCONNINFO);
291 :
292 288 : memset(walrcv->sender_host, 0, NI_MAXHOST);
293 288 : if (sender_host)
294 288 : strlcpy(walrcv->sender_host, sender_host, NI_MAXHOST);
295 :
296 288 : walrcv->sender_port = sender_port;
297 288 : walrcv->ready_to_display = true;
298 288 : SpinLockRelease(&walrcv->mutex);
299 :
300 288 : if (tmp_conninfo)
301 288 : pfree(tmp_conninfo);
302 :
303 288 : if (sender_host)
304 288 : pfree(sender_host);
305 :
306 288 : first_stream = true;
307 : for (;;)
308 26 : {
309 : char *primary_sysid;
310 : char standby_sysid[32];
311 : WalRcvStreamOptions options;
312 :
313 : /*
314 : * Check that we're connected to a valid server using the
315 : * IDENTIFY_SYSTEM replication command.
316 : */
317 314 : primary_sysid = walrcv_identify_system(wrconn, &primaryTLI);
318 :
319 314 : snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
320 : GetSystemIdentifier());
321 314 : if (strcmp(primary_sysid, standby_sysid) != 0)
322 : {
323 0 : ereport(ERROR,
324 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
325 : errmsg("database system identifier differs between the primary and standby"),
326 : errdetail("The primary's identifier is %s, the standby's identifier is %s.",
327 : primary_sysid, standby_sysid)));
328 : }
329 :
330 : /*
331 : * Confirm that the current timeline of the primary is the same or
332 : * ahead of ours.
333 : */
334 314 : if (primaryTLI < startpointTLI)
335 0 : ereport(ERROR,
336 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
337 : errmsg("highest timeline %u of the primary is behind recovery timeline %u",
338 : primaryTLI, startpointTLI)));
339 :
340 : /*
341 : * Get any missing history files. We do this always, even when we're
342 : * not interested in that timeline, so that if we're promoted to
343 : * become the primary later on, we don't select the same timeline that
344 : * was already used in the current primary. This isn't bullet-proof -
345 : * you'll need some external software to manage your cluster if you
346 : * need to ensure that a unique timeline id is chosen in every case,
347 : * but let's avoid the confusion of timeline id collisions where we
348 : * can.
349 : */
350 314 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
351 :
352 : /*
353 : * Create temporary replication slot if requested, and update slot
354 : * name in shared memory. (Note the slot name cannot already be set
355 : * in this case.)
356 : */
357 314 : if (is_temp_slot)
358 : {
359 0 : snprintf(slotname, sizeof(slotname),
360 : "pg_walreceiver_%lld",
361 0 : (long long int) walrcv_get_backend_pid(wrconn));
362 :
363 0 : walrcv_create_slot(wrconn, slotname, true, false, false, 0, NULL);
364 :
365 0 : SpinLockAcquire(&walrcv->mutex);
366 0 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
367 0 : SpinLockRelease(&walrcv->mutex);
368 : }
369 :
370 : /*
371 : * Start streaming.
372 : *
373 : * We'll try to start at the requested starting point and timeline,
374 : * even if it's different from the server's latest timeline. In case
375 : * we've already reached the end of the old timeline, the server will
376 : * finish the streaming immediately, and we will go back to await
377 : * orders from the startup process. If recovery_target_timeline is
378 : * 'latest', the startup process will scan pg_wal and find the new
379 : * history file, bump recovery target timeline, and ask us to restart
380 : * on the new timeline.
381 : */
382 314 : options.logical = false;
383 314 : options.startpoint = startpoint;
384 314 : options.slotname = slotname[0] != '\0' ? slotname : NULL;
385 314 : options.proto.physical.startpointTLI = startpointTLI;
386 314 : if (walrcv_startstreaming(wrconn, &options))
387 : {
388 312 : if (first_stream)
389 286 : ereport(LOG,
390 : errmsg("started streaming WAL from primary at %X/%08X on timeline %u",
391 : LSN_FORMAT_ARGS(startpoint), startpointTLI));
392 : else
393 26 : ereport(LOG,
394 : errmsg("restarted WAL streaming at %X/%08X on timeline %u",
395 : LSN_FORMAT_ARGS(startpoint), startpointTLI));
396 312 : first_stream = false;
397 :
398 : /*
399 : * Switch to STREAMING after a successful connection if current
400 : * state is CONNECTING. This switch happens after an initial
401 : * startup, or after a restart as determined by
402 : * WalRcvWaitForStartPosition().
403 : */
404 312 : SpinLockAcquire(&walrcv->mutex);
405 312 : if (walrcv->walRcvState == WALRCV_CONNECTING)
406 312 : walrcv->walRcvState = WALRCV_STREAMING;
407 312 : SpinLockRelease(&walrcv->mutex);
408 :
409 : /* Initialize LogstreamResult and buffers for processing messages */
410 312 : LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
411 312 : initStringInfo(&reply_message);
412 :
413 : /* Initialize nap wakeup times. */
414 312 : now = GetCurrentTimestamp();
415 1560 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
416 1248 : WalRcvComputeNextWakeup(i, now);
417 :
418 : /* Send initial reply/feedback messages. */
419 312 : XLogWalRcvSendReply(true, false);
420 312 : XLogWalRcvSendHSFeedback(true);
421 :
422 : /* Loop until end-of-streaming or error */
423 : for (;;)
424 79014 : {
425 : char *buf;
426 : int len;
427 79326 : bool endofwal = false;
428 79326 : pgsocket wait_fd = PGINVALID_SOCKET;
429 : int rc;
430 : TimestampTz nextWakeup;
431 : long nap;
432 :
433 : /*
434 : * Exit walreceiver if we're not in recovery. This should not
435 : * happen, but cross-check the status here.
436 : */
437 79326 : if (!RecoveryInProgress())
438 0 : ereport(FATAL,
439 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
440 : errmsg("cannot continue WAL streaming, recovery has already ended")));
441 :
442 : /* Process any requests or signals received recently */
443 79326 : CHECK_FOR_INTERRUPTS();
444 :
445 79324 : if (ConfigReloadPending)
446 : {
447 56 : ConfigReloadPending = false;
448 56 : ProcessConfigFile(PGC_SIGHUP);
449 : /* recompute wakeup times */
450 56 : now = GetCurrentTimestamp();
451 280 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
452 224 : WalRcvComputeNextWakeup(i, now);
453 56 : XLogWalRcvSendHSFeedback(true);
454 : }
455 :
456 : /* See if we can read data immediately */
457 79324 : len = walrcv_receive(wrconn, &buf, &wait_fd);
458 79262 : if (len != 0)
459 : {
460 : /*
461 : * Process the received data, and any subsequent data we
462 : * can read without blocking.
463 : */
464 : for (;;)
465 : {
466 265836 : if (len > 0)
467 : {
468 : /*
469 : * Something was received from primary, so adjust
470 : * the ping and terminate wakeup times.
471 : */
472 205920 : now = GetCurrentTimestamp();
473 205920 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_TERMINATE,
474 : now);
475 205920 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_PING, now);
476 205920 : XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1,
477 : startpointTLI);
478 : }
479 59916 : else if (len == 0)
480 59832 : break;
481 84 : else if (len < 0)
482 : {
483 84 : ereport(LOG,
484 : (errmsg("replication terminated by primary server"),
485 : errdetail("End of WAL reached on timeline %u at %X/%08X.",
486 : startpointTLI,
487 : LSN_FORMAT_ARGS(LogstreamResult.Write))));
488 84 : endofwal = true;
489 84 : break;
490 : }
491 205920 : len = walrcv_receive(wrconn, &buf, &wait_fd);
492 : }
493 :
494 : /* Let the primary know that we received some data. */
495 59916 : XLogWalRcvSendReply(false, false);
496 :
497 : /*
498 : * If we've written some records, flush them to disk and
499 : * let the startup process and primary server know about
500 : * them.
501 : */
502 59916 : XLogWalRcvFlush(false, startpointTLI);
503 : }
504 :
505 : /* Check if we need to exit the streaming loop. */
506 79260 : if (endofwal)
507 84 : break;
508 :
509 : /* Find the soonest wakeup time, to limit our nap. */
510 79176 : nextWakeup = TIMESTAMP_INFINITY;
511 395880 : for (int i = 0; i < NUM_WALRCV_WAKEUPS; ++i)
512 316704 : nextWakeup = Min(wakeup[i], nextWakeup);
513 :
514 : /* Calculate the nap time, clamping as necessary. */
515 79176 : now = GetCurrentTimestamp();
516 79176 : nap = TimestampDifferenceMilliseconds(now, nextWakeup);
517 :
518 : /*
519 : * Ideally we would reuse a WaitEventSet object repeatedly
520 : * here to avoid the overheads of WaitLatchOrSocket on epoll
521 : * systems, but we can't be sure that libpq (or any other
522 : * walreceiver implementation) has the same socket (even if
523 : * the fd is the same number, it may have been closed and
524 : * reopened since the last time). In future, if there is a
525 : * function for removing sockets from WaitEventSet, then we
526 : * could add and remove just the socket each time, potentially
527 : * avoiding some system calls.
528 : */
529 : Assert(wait_fd != PGINVALID_SOCKET);
530 79176 : rc = WaitLatchOrSocket(MyLatch,
531 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
532 : WL_TIMEOUT | WL_LATCH_SET,
533 : wait_fd,
534 : nap,
535 : WAIT_EVENT_WAL_RECEIVER_MAIN);
536 79176 : if (rc & WL_LATCH_SET)
537 : {
538 15566 : ResetLatch(MyLatch);
539 15566 : CHECK_FOR_INTERRUPTS();
540 :
541 15404 : if (walrcv->force_reply)
542 : {
543 : /*
544 : * The recovery process has asked us to send apply
545 : * feedback now. Make sure the flag is really set to
546 : * false in shared memory before sending the reply, so
547 : * we don't miss a new request for a reply.
548 : */
549 15300 : walrcv->force_reply = false;
550 15300 : pg_memory_barrier();
551 15300 : XLogWalRcvSendReply(true, false);
552 : }
553 : }
554 79014 : if (rc & WL_TIMEOUT)
555 : {
556 : /*
557 : * We didn't receive anything new. If we haven't heard
558 : * anything from the server for more than
559 : * wal_receiver_timeout / 2, ping the server. Also, if
560 : * it's been longer than wal_receiver_status_interval
561 : * since the last update we sent, send a status update to
562 : * the primary anyway, to report any progress in applying
563 : * WAL.
564 : */
565 8 : bool requestReply = false;
566 :
567 : /*
568 : * Report pending statistics to the cumulative stats
569 : * system. This location is useful for the report as it
570 : * is not within a tight loop in the WAL receiver, to
571 : * avoid bloating pgstats with requests, while also making
572 : * sure that the reports happen each time a status update
573 : * is sent.
574 : */
575 8 : pgstat_report_wal(false);
576 :
577 : /*
578 : * Check if time since last receive from primary has
579 : * reached the configured limit.
580 : */
581 8 : now = GetCurrentTimestamp();
582 8 : if (now >= wakeup[WALRCV_WAKEUP_TERMINATE])
583 0 : ereport(ERROR,
584 : (errcode(ERRCODE_CONNECTION_FAILURE),
585 : errmsg("terminating walreceiver due to timeout")));
586 :
587 : /*
588 : * If we didn't receive anything new for half of receiver
589 : * replication timeout, then ping the server.
590 : */
591 8 : if (now >= wakeup[WALRCV_WAKEUP_PING])
592 : {
593 0 : requestReply = true;
594 0 : wakeup[WALRCV_WAKEUP_PING] = TIMESTAMP_INFINITY;
595 : }
596 :
597 8 : XLogWalRcvSendReply(requestReply, requestReply);
598 8 : XLogWalRcvSendHSFeedback(false);
599 : }
600 : }
601 :
602 : /*
603 : * The backend finished streaming. Exit streaming COPY-mode from
604 : * our side, too.
605 : */
606 84 : walrcv_endstreaming(wrconn, &primaryTLI);
607 :
608 : /*
609 : * If the server had switched to a new timeline that we didn't
610 : * know about when we began streaming, fetch its timeline history
611 : * file now.
612 : */
613 26 : WalRcvFetchTimeLineHistoryFiles(startpointTLI, primaryTLI);
614 : }
615 : else
616 0 : ereport(LOG,
617 : (errmsg("primary server contains no more WAL on requested timeline %u",
618 : startpointTLI)));
619 :
620 : /*
621 : * End of WAL reached on the requested timeline. Close the last
622 : * segment, and await for new orders from the startup process.
623 : */
624 26 : if (recvFile >= 0)
625 : {
626 : char xlogfname[MAXFNAMELEN];
627 :
628 24 : XLogWalRcvFlush(false, startpointTLI);
629 24 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
630 24 : if (close(recvFile) != 0)
631 0 : ereport(PANIC,
632 : (errcode_for_file_access(),
633 : errmsg("could not close WAL segment %s: %m",
634 : xlogfname)));
635 :
636 : /*
637 : * Create .done file forcibly to prevent the streamed segment from
638 : * being archived later.
639 : */
640 24 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
641 24 : XLogArchiveForceDone(xlogfname);
642 : else
643 0 : XLogArchiveNotify(xlogfname);
644 : }
645 26 : recvFile = -1;
646 :
647 26 : elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
648 26 : WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
649 : }
650 : /* not reached */
651 : }
652 :
653 : /*
654 : * Wait for startup process to set receiveStart and receiveStartTLI.
655 : */
656 : static void
657 26 : WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
658 : {
659 26 : WalRcvData *walrcv = WalRcv;
660 : int state;
661 :
662 26 : SpinLockAcquire(&walrcv->mutex);
663 26 : state = walrcv->walRcvState;
664 26 : if (state != WALRCV_STREAMING && state != WALRCV_CONNECTING)
665 : {
666 0 : SpinLockRelease(&walrcv->mutex);
667 0 : if (state == WALRCV_STOPPING)
668 0 : proc_exit(0);
669 : else
670 0 : elog(FATAL, "unexpected walreceiver state");
671 : }
672 26 : walrcv->walRcvState = WALRCV_WAITING;
673 26 : walrcv->receiveStart = InvalidXLogRecPtr;
674 26 : walrcv->receiveStartTLI = 0;
675 26 : SpinLockRelease(&walrcv->mutex);
676 :
677 26 : set_ps_display("idle");
678 :
679 : /*
680 : * nudge startup process to notice that we've stopped streaming and are
681 : * now waiting for instructions.
682 : */
683 26 : WakeupRecovery();
684 : for (;;)
685 : {
686 46 : ResetLatch(MyLatch);
687 :
688 46 : CHECK_FOR_INTERRUPTS();
689 :
690 46 : SpinLockAcquire(&walrcv->mutex);
691 : Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
692 : walrcv->walRcvState == WALRCV_WAITING ||
693 : walrcv->walRcvState == WALRCV_STOPPING);
694 46 : if (walrcv->walRcvState == WALRCV_RESTARTING)
695 : {
696 : /*
697 : * No need to handle changes in primary_conninfo or
698 : * primary_slot_name here. Startup process will signal us to
699 : * terminate in case those change.
700 : */
701 26 : *startpoint = walrcv->receiveStart;
702 26 : *startpointTLI = walrcv->receiveStartTLI;
703 26 : walrcv->walRcvState = WALRCV_CONNECTING;
704 26 : SpinLockRelease(&walrcv->mutex);
705 26 : break;
706 : }
707 20 : if (walrcv->walRcvState == WALRCV_STOPPING)
708 : {
709 : /*
710 : * We should've received SIGTERM if the startup process wants us
711 : * to die, but might as well check it here too.
712 : */
713 0 : SpinLockRelease(&walrcv->mutex);
714 0 : exit(1);
715 : }
716 20 : SpinLockRelease(&walrcv->mutex);
717 :
718 20 : (void) WaitLatch(MyLatch, WL_LATCH_SET | WL_EXIT_ON_PM_DEATH, 0,
719 : WAIT_EVENT_WAL_RECEIVER_WAIT_START);
720 : }
721 :
722 26 : if (update_process_title)
723 : {
724 : char activitymsg[50];
725 :
726 26 : snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%08X",
727 26 : LSN_FORMAT_ARGS(*startpoint));
728 26 : set_ps_display(activitymsg);
729 : }
730 26 : }
731 :
732 : /*
733 : * Fetch any missing timeline history files between 'first' and 'last'
734 : * (inclusive) from the server.
735 : */
736 : static void
737 340 : WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
738 : {
739 : TimeLineID tli;
740 :
741 728 : for (tli = first; tli <= last; tli++)
742 : {
743 : /* there's no history file for timeline 1 */
744 388 : if (tli != 1 && !existsTimeLineHistory(tli))
745 : {
746 : char *fname;
747 : char *content;
748 : int len;
749 : char expectedfname[MAXFNAMELEN];
750 :
751 24 : ereport(LOG,
752 : (errmsg("fetching timeline history file for timeline %u from primary server",
753 : tli)));
754 :
755 24 : walrcv_readtimelinehistoryfile(wrconn, tli, &fname, &content, &len);
756 :
757 : /*
758 : * Check that the filename on the primary matches what we
759 : * calculated ourselves. This is just a sanity check, it should
760 : * always match.
761 : */
762 24 : TLHistoryFileName(expectedfname, tli);
763 24 : if (strcmp(fname, expectedfname) != 0)
764 0 : ereport(ERROR,
765 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
766 : errmsg_internal("primary reported unexpected file name for timeline history file of timeline %u",
767 : tli)));
768 :
769 : /*
770 : * Write the file to pg_wal.
771 : */
772 24 : writeTimeLineHistoryFile(tli, content, len);
773 :
774 : /*
775 : * Mark the streamed history file as ready for archiving if
776 : * archive_mode is always.
777 : */
778 24 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
779 24 : XLogArchiveForceDone(fname);
780 : else
781 0 : XLogArchiveNotify(fname);
782 :
783 24 : pfree(fname);
784 24 : pfree(content);
785 : }
786 : }
787 340 : }
788 :
789 : /*
790 : * Mark us as STOPPED in shared memory at exit.
791 : */
792 : static void
793 466 : WalRcvDie(int code, Datum arg)
794 : {
795 466 : WalRcvData *walrcv = WalRcv;
796 466 : TimeLineID *startpointTLI_p = (TimeLineID *) DatumGetPointer(arg);
797 :
798 : Assert(*startpointTLI_p != 0);
799 :
800 : /* Ensure that all WAL records received are flushed to disk */
801 466 : XLogWalRcvFlush(true, *startpointTLI_p);
802 :
803 : /* Mark ourselves inactive in shared memory */
804 466 : SpinLockAcquire(&walrcv->mutex);
805 : Assert(walrcv->walRcvState == WALRCV_STREAMING ||
806 : walrcv->walRcvState == WALRCV_CONNECTING ||
807 : walrcv->walRcvState == WALRCV_RESTARTING ||
808 : walrcv->walRcvState == WALRCV_STARTING ||
809 : walrcv->walRcvState == WALRCV_WAITING ||
810 : walrcv->walRcvState == WALRCV_STOPPING);
811 : Assert(walrcv->pid == MyProcPid);
812 466 : walrcv->walRcvState = WALRCV_STOPPED;
813 466 : walrcv->pid = 0;
814 466 : walrcv->procno = INVALID_PROC_NUMBER;
815 466 : walrcv->ready_to_display = false;
816 466 : SpinLockRelease(&walrcv->mutex);
817 :
818 466 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
819 :
820 : /* Terminate the connection gracefully. */
821 466 : if (wrconn != NULL)
822 288 : walrcv_disconnect(wrconn);
823 :
824 : /* Wake up the startup process to notice promptly that we're gone */
825 466 : WakeupRecovery();
826 466 : }
827 :
828 : /*
829 : * Accept the message from XLOG stream, and process it.
830 : */
831 : static void
832 205920 : XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len, TimeLineID tli)
833 : {
834 : int hdrlen;
835 : XLogRecPtr dataStart;
836 : XLogRecPtr walEnd;
837 : TimestampTz sendTime;
838 : bool replyRequested;
839 :
840 205920 : switch (type)
841 : {
842 198594 : case PqReplMsg_WALData:
843 : {
844 : StringInfoData incoming_message;
845 :
846 198594 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(int64);
847 198594 : if (len < hdrlen)
848 0 : ereport(ERROR,
849 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
850 : errmsg_internal("invalid WAL message received from primary")));
851 :
852 : /* initialize a StringInfo with the given buffer */
853 198594 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
854 :
855 : /* read the fields */
856 198594 : dataStart = pq_getmsgint64(&incoming_message);
857 198594 : walEnd = pq_getmsgint64(&incoming_message);
858 198594 : sendTime = pq_getmsgint64(&incoming_message);
859 198594 : ProcessWalSndrMessage(walEnd, sendTime);
860 :
861 198594 : buf += hdrlen;
862 198594 : len -= hdrlen;
863 198594 : XLogWalRcvWrite(buf, len, dataStart, tli);
864 198594 : break;
865 : }
866 7326 : case PqReplMsg_Keepalive:
867 : {
868 : StringInfoData incoming_message;
869 :
870 7326 : hdrlen = sizeof(int64) + sizeof(int64) + sizeof(char);
871 7326 : if (len != hdrlen)
872 0 : ereport(ERROR,
873 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
874 : errmsg_internal("invalid keepalive message received from primary")));
875 :
876 : /* initialize a StringInfo with the given buffer */
877 7326 : initReadOnlyStringInfo(&incoming_message, buf, hdrlen);
878 :
879 : /* read the fields */
880 7326 : walEnd = pq_getmsgint64(&incoming_message);
881 7326 : sendTime = pq_getmsgint64(&incoming_message);
882 7326 : replyRequested = pq_getmsgbyte(&incoming_message);
883 :
884 7326 : ProcessWalSndrMessage(walEnd, sendTime);
885 :
886 : /* If the primary requested a reply, send one immediately */
887 7326 : if (replyRequested)
888 7326 : XLogWalRcvSendReply(true, false);
889 7326 : break;
890 : }
891 0 : default:
892 0 : ereport(ERROR,
893 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
894 : errmsg_internal("invalid replication message type %d",
895 : type)));
896 : }
897 205920 : }
898 :
899 : /*
900 : * Write XLOG data to disk.
901 : */
902 : static void
903 198594 : XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr, TimeLineID tli)
904 : {
905 : int startoff;
906 : int byteswritten;
907 : instr_time start;
908 :
909 : Assert(tli != 0);
910 :
911 398172 : while (nbytes > 0)
912 : {
913 : int segbytes;
914 :
915 : /* Close the current segment if it's completed */
916 199578 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
917 984 : XLogWalRcvClose(recptr, tli);
918 :
919 199578 : if (recvFile < 0)
920 : {
921 : /* Create/use new log file */
922 1728 : XLByteToSeg(recptr, recvSegNo, wal_segment_size);
923 1728 : recvFile = XLogFileInit(recvSegNo, tli);
924 1728 : recvFileTLI = tli;
925 : }
926 :
927 : /* Calculate the start offset of the received logs */
928 199578 : startoff = XLogSegmentOffset(recptr, wal_segment_size);
929 :
930 199578 : if (startoff + nbytes > wal_segment_size)
931 984 : segbytes = wal_segment_size - startoff;
932 : else
933 198594 : segbytes = nbytes;
934 :
935 : /* OK to write the logs */
936 199578 : errno = 0;
937 :
938 : /*
939 : * Measure I/O timing to write WAL data, for pg_stat_io.
940 : */
941 199578 : start = pgstat_prepare_io_time(track_wal_io_timing);
942 :
943 199578 : pgstat_report_wait_start(WAIT_EVENT_WAL_WRITE);
944 199578 : byteswritten = pg_pwrite(recvFile, buf, segbytes, (pgoff_t) startoff);
945 199578 : pgstat_report_wait_end();
946 :
947 199578 : pgstat_count_io_op_time(IOOBJECT_WAL, IOCONTEXT_NORMAL,
948 : IOOP_WRITE, start, 1, byteswritten);
949 :
950 199578 : if (byteswritten <= 0)
951 : {
952 : char xlogfname[MAXFNAMELEN];
953 : int save_errno;
954 :
955 : /* if write didn't set errno, assume no disk space */
956 0 : if (errno == 0)
957 0 : errno = ENOSPC;
958 :
959 0 : save_errno = errno;
960 0 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
961 0 : errno = save_errno;
962 0 : ereport(PANIC,
963 : (errcode_for_file_access(),
964 : errmsg("could not write to WAL segment %s "
965 : "at offset %d, length %d: %m",
966 : xlogfname, startoff, segbytes)));
967 : }
968 :
969 : /* Update state for write */
970 199578 : recptr += byteswritten;
971 :
972 199578 : nbytes -= byteswritten;
973 199578 : buf += byteswritten;
974 :
975 199578 : LogstreamResult.Write = recptr;
976 : }
977 :
978 : /* Update shared-memory status */
979 198594 : pg_atomic_write_u64(&WalRcv->writtenUpto, LogstreamResult.Write);
980 :
981 : /*
982 : * If we wrote an LSN that someone was waiting for, notify the waiters.
983 : */
984 397188 : if (waitLSNState &&
985 198594 : (LogstreamResult.Write >=
986 198594 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_WRITE])))
987 4 : WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_WRITE, LogstreamResult.Write);
988 :
989 : /*
990 : * Close the current segment if it's fully written up in the last cycle of
991 : * the loop, to create its archive notification file soon. Otherwise WAL
992 : * archiving of the segment will be delayed until any data in the next
993 : * segment is received and written.
994 : */
995 198594 : if (recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size))
996 478 : XLogWalRcvClose(recptr, tli);
997 198594 : }
998 :
999 : /*
1000 : * Flush the log to disk.
1001 : *
1002 : * If we're in the midst of dying, it's unwise to do anything that might throw
1003 : * an error, so we skip sending a reply in that case.
1004 : */
1005 : static void
1006 61868 : XLogWalRcvFlush(bool dying, TimeLineID tli)
1007 : {
1008 : Assert(tli != 0);
1009 :
1010 61868 : if (LogstreamResult.Flush < LogstreamResult.Write)
1011 : {
1012 60406 : WalRcvData *walrcv = WalRcv;
1013 :
1014 60406 : issue_xlog_fsync(recvFile, recvSegNo, tli);
1015 :
1016 60406 : LogstreamResult.Flush = LogstreamResult.Write;
1017 :
1018 : /* Update shared-memory status */
1019 60406 : SpinLockAcquire(&walrcv->mutex);
1020 60406 : if (walrcv->flushedUpto < LogstreamResult.Flush)
1021 : {
1022 60406 : walrcv->latestChunkStart = walrcv->flushedUpto;
1023 60406 : walrcv->flushedUpto = LogstreamResult.Flush;
1024 60406 : walrcv->receivedTLI = tli;
1025 : }
1026 60406 : SpinLockRelease(&walrcv->mutex);
1027 :
1028 : /*
1029 : * If we flushed an LSN that someone was waiting for, notify the
1030 : * waiters.
1031 : */
1032 120812 : if (waitLSNState &&
1033 60406 : (LogstreamResult.Flush >=
1034 60406 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[WAIT_LSN_TYPE_STANDBY_FLUSH])))
1035 4 : WaitLSNWakeup(WAIT_LSN_TYPE_STANDBY_FLUSH, LogstreamResult.Flush);
1036 :
1037 : /* Signal the startup process and walsender that new WAL has arrived */
1038 60406 : WakeupRecovery();
1039 60406 : if (AllowCascadeReplication())
1040 60406 : WalSndWakeup(true, false);
1041 :
1042 : /* Report XLOG streaming progress in PS display */
1043 60406 : if (update_process_title)
1044 : {
1045 : char activitymsg[50];
1046 :
1047 60406 : snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%08X",
1048 60406 : LSN_FORMAT_ARGS(LogstreamResult.Write));
1049 60406 : set_ps_display(activitymsg);
1050 : }
1051 :
1052 : /* Also let the primary know that we made some progress */
1053 60406 : if (!dying)
1054 : {
1055 60404 : XLogWalRcvSendReply(false, false);
1056 60404 : XLogWalRcvSendHSFeedback(false);
1057 : }
1058 : }
1059 61868 : }
1060 :
1061 : /*
1062 : * Close the current segment.
1063 : *
1064 : * Flush the segment to disk before closing it. Otherwise we have to
1065 : * reopen and fsync it later.
1066 : *
1067 : * Create an archive notification file since the segment is known completed.
1068 : */
1069 : static void
1070 1462 : XLogWalRcvClose(XLogRecPtr recptr, TimeLineID tli)
1071 : {
1072 : char xlogfname[MAXFNAMELEN];
1073 :
1074 : Assert(recvFile >= 0 && !XLByteInSeg(recptr, recvSegNo, wal_segment_size));
1075 : Assert(tli != 0);
1076 :
1077 : /*
1078 : * fsync() and close current file before we switch to next one. We would
1079 : * otherwise have to reopen this file to fsync it later
1080 : */
1081 1462 : XLogWalRcvFlush(false, tli);
1082 :
1083 1462 : XLogFileName(xlogfname, recvFileTLI, recvSegNo, wal_segment_size);
1084 :
1085 : /*
1086 : * XLOG segment files will be re-read by recovery in startup process soon,
1087 : * so we don't advise the OS to release cache pages associated with the
1088 : * file like XLogFileClose() does.
1089 : */
1090 1462 : if (close(recvFile) != 0)
1091 0 : ereport(PANIC,
1092 : (errcode_for_file_access(),
1093 : errmsg("could not close WAL segment %s: %m",
1094 : xlogfname)));
1095 :
1096 : /*
1097 : * Create .done file forcibly to prevent the streamed segment from being
1098 : * archived later.
1099 : */
1100 1462 : if (XLogArchiveMode != ARCHIVE_MODE_ALWAYS)
1101 1462 : XLogArchiveForceDone(xlogfname);
1102 : else
1103 0 : XLogArchiveNotify(xlogfname);
1104 :
1105 1462 : recvFile = -1;
1106 1462 : }
1107 :
1108 : /*
1109 : * Send reply message to primary, indicating our current WAL locations, oldest
1110 : * xmin and the current time.
1111 : *
1112 : * If 'force' is not set, the message is only sent if enough time has
1113 : * passed since last status update to reach wal_receiver_status_interval.
1114 : * If wal_receiver_status_interval is disabled altogether and 'force' is
1115 : * false, this is a no-op.
1116 : *
1117 : * If 'requestReply' is true, requests the server to reply immediately upon
1118 : * receiving this message. This is used for heartbeats, when approaching
1119 : * wal_receiver_timeout.
1120 : */
1121 : static void
1122 143266 : XLogWalRcvSendReply(bool force, bool requestReply)
1123 : {
1124 : static XLogRecPtr writePtr = InvalidXLogRecPtr;
1125 : static XLogRecPtr flushPtr = InvalidXLogRecPtr;
1126 : XLogRecPtr applyPtr;
1127 : TimestampTz now;
1128 :
1129 : /*
1130 : * If the user doesn't want status to be reported to the primary, be sure
1131 : * to exit before doing anything at all.
1132 : */
1133 143266 : if (!force && wal_receiver_status_interval <= 0)
1134 0 : return;
1135 :
1136 : /* Get current timestamp. */
1137 143266 : now = GetCurrentTimestamp();
1138 :
1139 : /*
1140 : * We can compare the write and flush positions to the last message we
1141 : * sent without taking any lock, but the apply position requires a spin
1142 : * lock, so we don't check that unless something else has changed or 10
1143 : * seconds have passed. This means that the apply WAL location will
1144 : * appear, from the primary's point of view, to lag slightly, but since
1145 : * this is only for reporting purposes and only on idle systems, that's
1146 : * probably OK.
1147 : */
1148 143266 : if (!force
1149 120328 : && writePtr == LogstreamResult.Write
1150 59328 : && flushPtr == LogstreamResult.Flush
1151 386 : && now < wakeup[WALRCV_WAKEUP_REPLY])
1152 386 : return;
1153 :
1154 : /* Make sure we wake up when it's time to send another reply. */
1155 142880 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_REPLY, now);
1156 :
1157 : /* Construct a new message */
1158 142880 : writePtr = LogstreamResult.Write;
1159 142880 : flushPtr = LogstreamResult.Flush;
1160 142880 : applyPtr = GetXLogReplayRecPtr(NULL);
1161 :
1162 142880 : resetStringInfo(&reply_message);
1163 142880 : pq_sendbyte(&reply_message, PqReplMsg_StandbyStatusUpdate);
1164 142880 : pq_sendint64(&reply_message, writePtr);
1165 142880 : pq_sendint64(&reply_message, flushPtr);
1166 142880 : pq_sendint64(&reply_message, applyPtr);
1167 142880 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1168 142880 : pq_sendbyte(&reply_message, requestReply ? 1 : 0);
1169 :
1170 : /* Send it */
1171 142880 : elog(DEBUG2, "sending write %X/%08X flush %X/%08X apply %X/%08X%s",
1172 : LSN_FORMAT_ARGS(writePtr),
1173 : LSN_FORMAT_ARGS(flushPtr),
1174 : LSN_FORMAT_ARGS(applyPtr),
1175 : requestReply ? " (reply requested)" : "");
1176 :
1177 142880 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1178 : }
1179 :
1180 : /*
1181 : * Send hot standby feedback message to primary, plus the current time,
1182 : * in case they don't have a watch.
1183 : *
1184 : * If the user disables feedback, send one final message to tell sender
1185 : * to forget about the xmin on this standby. We also send this message
1186 : * on first connect because a previous connection might have set xmin
1187 : * on a replication slot. (If we're not using a slot it's harmless to
1188 : * send a feedback message explicitly setting InvalidTransactionId).
1189 : */
1190 : static void
1191 60780 : XLogWalRcvSendHSFeedback(bool immed)
1192 : {
1193 : TimestampTz now;
1194 : FullTransactionId nextFullXid;
1195 : TransactionId nextXid;
1196 : uint32 xmin_epoch,
1197 : catalog_xmin_epoch;
1198 : TransactionId xmin,
1199 : catalog_xmin;
1200 :
1201 : /* initially true so we always send at least one feedback message */
1202 : static bool primary_has_standby_xmin = true;
1203 :
1204 : /*
1205 : * If the user doesn't want status to be reported to the primary, be sure
1206 : * to exit before doing anything at all.
1207 : */
1208 60780 : if ((wal_receiver_status_interval <= 0 || !hot_standby_feedback) &&
1209 59320 : !primary_has_standby_xmin)
1210 60448 : return;
1211 :
1212 : /* Get current timestamp. */
1213 1800 : now = GetCurrentTimestamp();
1214 :
1215 : /* Send feedback at most once per wal_receiver_status_interval. */
1216 1800 : if (!immed && now < wakeup[WALRCV_WAKEUP_HSFEEDBACK])
1217 1466 : return;
1218 :
1219 : /* Make sure we wake up when it's time to send feedback again. */
1220 334 : WalRcvComputeNextWakeup(WALRCV_WAKEUP_HSFEEDBACK, now);
1221 :
1222 : /*
1223 : * If Hot Standby is not yet accepting connections there is nothing to
1224 : * send. Check this after the interval has expired to reduce number of
1225 : * calls.
1226 : *
1227 : * Bailing out here also ensures that we don't send feedback until we've
1228 : * read our own replication slot state, so we don't tell the primary to
1229 : * discard needed xmin or catalog_xmin from any slots that may exist on
1230 : * this replica.
1231 : */
1232 334 : if (!HotStandbyActive())
1233 2 : return;
1234 :
1235 : /*
1236 : * Make the expensive call to get the oldest xmin once we are certain
1237 : * everything else has been checked.
1238 : */
1239 332 : if (hot_standby_feedback)
1240 : {
1241 100 : GetReplicationHorizons(&xmin, &catalog_xmin);
1242 : }
1243 : else
1244 : {
1245 232 : xmin = InvalidTransactionId;
1246 232 : catalog_xmin = InvalidTransactionId;
1247 : }
1248 :
1249 : /*
1250 : * Get epoch and adjust if nextXid and oldestXmin are different sides of
1251 : * the epoch boundary.
1252 : */
1253 332 : nextFullXid = ReadNextFullTransactionId();
1254 332 : nextXid = XidFromFullTransactionId(nextFullXid);
1255 332 : xmin_epoch = EpochFromFullTransactionId(nextFullXid);
1256 332 : catalog_xmin_epoch = xmin_epoch;
1257 332 : if (nextXid < xmin)
1258 0 : xmin_epoch--;
1259 332 : if (nextXid < catalog_xmin)
1260 0 : catalog_xmin_epoch--;
1261 :
1262 332 : elog(DEBUG2, "sending hot standby feedback xmin %u epoch %u catalog_xmin %u catalog_xmin_epoch %u",
1263 : xmin, xmin_epoch, catalog_xmin, catalog_xmin_epoch);
1264 :
1265 : /* Construct the message and send it. */
1266 332 : resetStringInfo(&reply_message);
1267 332 : pq_sendbyte(&reply_message, PqReplMsg_HotStandbyFeedback);
1268 332 : pq_sendint64(&reply_message, GetCurrentTimestamp());
1269 332 : pq_sendint32(&reply_message, xmin);
1270 332 : pq_sendint32(&reply_message, xmin_epoch);
1271 332 : pq_sendint32(&reply_message, catalog_xmin);
1272 332 : pq_sendint32(&reply_message, catalog_xmin_epoch);
1273 332 : walrcv_send(wrconn, reply_message.data, reply_message.len);
1274 332 : if (TransactionIdIsValid(xmin) || TransactionIdIsValid(catalog_xmin))
1275 100 : primary_has_standby_xmin = true;
1276 : else
1277 232 : primary_has_standby_xmin = false;
1278 : }
1279 :
1280 : /*
1281 : * Update shared memory status upon receiving a message from primary.
1282 : *
1283 : * 'walEnd' and 'sendTime' are the end-of-WAL and timestamp of the latest
1284 : * message, reported by primary.
1285 : */
1286 : static void
1287 205920 : ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime)
1288 : {
1289 205920 : WalRcvData *walrcv = WalRcv;
1290 205920 : TimestampTz lastMsgReceiptTime = GetCurrentTimestamp();
1291 :
1292 : /* Update shared-memory status */
1293 205920 : SpinLockAcquire(&walrcv->mutex);
1294 205920 : if (walrcv->latestWalEnd < walEnd)
1295 26440 : walrcv->latestWalEndTime = sendTime;
1296 205920 : walrcv->latestWalEnd = walEnd;
1297 205920 : walrcv->lastMsgSendTime = sendTime;
1298 205920 : walrcv->lastMsgReceiptTime = lastMsgReceiptTime;
1299 205920 : SpinLockRelease(&walrcv->mutex);
1300 :
1301 205920 : if (message_level_is_interesting(DEBUG2))
1302 : {
1303 : char *sendtime;
1304 : char *receipttime;
1305 : int applyDelay;
1306 :
1307 : /* Copy because timestamptz_to_str returns a static buffer */
1308 792 : sendtime = pstrdup(timestamptz_to_str(sendTime));
1309 792 : receipttime = pstrdup(timestamptz_to_str(lastMsgReceiptTime));
1310 792 : applyDelay = GetReplicationApplyDelay();
1311 :
1312 : /* apply delay is not available */
1313 792 : if (applyDelay == -1)
1314 2 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay (N/A) transfer latency %d ms",
1315 : sendtime,
1316 : receipttime,
1317 : GetReplicationTransferLatency());
1318 : else
1319 790 : elog(DEBUG2, "sendtime %s receipttime %s replication apply delay %d ms transfer latency %d ms",
1320 : sendtime,
1321 : receipttime,
1322 : applyDelay,
1323 : GetReplicationTransferLatency());
1324 :
1325 792 : pfree(sendtime);
1326 792 : pfree(receipttime);
1327 : }
1328 205920 : }
1329 :
1330 : /*
1331 : * Compute the next wakeup time for a given wakeup reason. Can be called to
1332 : * initialize a wakeup time, to adjust it for the next wakeup, or to
1333 : * reinitialize it when GUCs have changed. We ask the caller to pass in the
1334 : * value of "now" because this frequently avoids multiple calls of
1335 : * GetCurrentTimestamp(). It had better be a reasonably up-to-date value
1336 : * though.
1337 : */
1338 : static void
1339 556526 : WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now)
1340 : {
1341 556526 : switch (reason)
1342 : {
1343 206288 : case WALRCV_WAKEUP_TERMINATE:
1344 206288 : if (wal_receiver_timeout <= 0)
1345 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1346 : else
1347 206288 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout);
1348 206288 : break;
1349 206288 : case WALRCV_WAKEUP_PING:
1350 206288 : if (wal_receiver_timeout <= 0)
1351 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1352 : else
1353 206288 : wakeup[reason] = TimestampTzPlusMilliseconds(now, wal_receiver_timeout / 2);
1354 206288 : break;
1355 702 : case WALRCV_WAKEUP_HSFEEDBACK:
1356 702 : if (!hot_standby_feedback || wal_receiver_status_interval <= 0)
1357 510 : wakeup[reason] = TIMESTAMP_INFINITY;
1358 : else
1359 192 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1360 702 : break;
1361 143248 : case WALRCV_WAKEUP_REPLY:
1362 143248 : if (wal_receiver_status_interval <= 0)
1363 0 : wakeup[reason] = TIMESTAMP_INFINITY;
1364 : else
1365 143248 : wakeup[reason] = TimestampTzPlusSeconds(now, wal_receiver_status_interval);
1366 143248 : break;
1367 : /* there's intentionally no default: here */
1368 : }
1369 556526 : }
1370 :
1371 : /*
1372 : * Wake up the walreceiver main loop.
1373 : *
1374 : * This is called by the startup process whenever interesting xlog records
1375 : * are applied, so that walreceiver can check if it needs to send an apply
1376 : * notification back to the primary which may be waiting in a COMMIT with
1377 : * synchronous_commit = remote_apply.
1378 : */
1379 : void
1380 14944 : WalRcvForceReply(void)
1381 : {
1382 : ProcNumber procno;
1383 :
1384 14944 : WalRcv->force_reply = true;
1385 : /* fetching the proc number is probably atomic, but don't rely on it */
1386 14944 : SpinLockAcquire(&WalRcv->mutex);
1387 14944 : procno = WalRcv->procno;
1388 14944 : SpinLockRelease(&WalRcv->mutex);
1389 14944 : if (procno != INVALID_PROC_NUMBER)
1390 14574 : SetLatch(&GetPGProcByNumber(procno)->procLatch);
1391 14944 : }
1392 :
1393 : /*
1394 : * Return a string constant representing the state. This is used
1395 : * in system functions and views, and should *not* be translated.
1396 : */
1397 : static const char *
1398 24 : WalRcvGetStateString(WalRcvState state)
1399 : {
1400 24 : switch (state)
1401 : {
1402 0 : case WALRCV_STOPPED:
1403 0 : return "stopped";
1404 0 : case WALRCV_STARTING:
1405 0 : return "starting";
1406 0 : case WALRCV_CONNECTING:
1407 0 : return "connecting";
1408 24 : case WALRCV_STREAMING:
1409 24 : return "streaming";
1410 0 : case WALRCV_WAITING:
1411 0 : return "waiting";
1412 0 : case WALRCV_RESTARTING:
1413 0 : return "restarting";
1414 0 : case WALRCV_STOPPING:
1415 0 : return "stopping";
1416 : }
1417 0 : return "UNKNOWN";
1418 : }
1419 :
1420 : /*
1421 : * Returns activity of WAL receiver, including pid, state and xlog locations
1422 : * received from the WAL sender of another server.
1423 : */
1424 : Datum
1425 44 : pg_stat_get_wal_receiver(PG_FUNCTION_ARGS)
1426 : {
1427 : TupleDesc tupdesc;
1428 : Datum *values;
1429 : bool *nulls;
1430 : int pid;
1431 : bool ready_to_display;
1432 : WalRcvState state;
1433 : XLogRecPtr receive_start_lsn;
1434 : TimeLineID receive_start_tli;
1435 : XLogRecPtr written_lsn;
1436 : XLogRecPtr flushed_lsn;
1437 : TimeLineID received_tli;
1438 : TimestampTz last_send_time;
1439 : TimestampTz last_receipt_time;
1440 : XLogRecPtr latest_end_lsn;
1441 : TimestampTz latest_end_time;
1442 : char sender_host[NI_MAXHOST];
1443 44 : int sender_port = 0;
1444 : char slotname[NAMEDATALEN];
1445 : char conninfo[MAXCONNINFO];
1446 :
1447 : /* Take a lock to ensure value consistency */
1448 44 : SpinLockAcquire(&WalRcv->mutex);
1449 44 : pid = (int) WalRcv->pid;
1450 44 : ready_to_display = WalRcv->ready_to_display;
1451 44 : state = WalRcv->walRcvState;
1452 44 : receive_start_lsn = WalRcv->receiveStart;
1453 44 : receive_start_tli = WalRcv->receiveStartTLI;
1454 44 : flushed_lsn = WalRcv->flushedUpto;
1455 44 : received_tli = WalRcv->receivedTLI;
1456 44 : last_send_time = WalRcv->lastMsgSendTime;
1457 44 : last_receipt_time = WalRcv->lastMsgReceiptTime;
1458 44 : latest_end_lsn = WalRcv->latestWalEnd;
1459 44 : latest_end_time = WalRcv->latestWalEndTime;
1460 44 : strlcpy(slotname, WalRcv->slotname, sizeof(slotname));
1461 44 : strlcpy(sender_host, WalRcv->sender_host, sizeof(sender_host));
1462 44 : sender_port = WalRcv->sender_port;
1463 44 : strlcpy(conninfo, WalRcv->conninfo, sizeof(conninfo));
1464 44 : SpinLockRelease(&WalRcv->mutex);
1465 :
1466 : /*
1467 : * No WAL receiver (or not ready yet), just return a tuple with NULL
1468 : * values
1469 : */
1470 44 : if (pid == 0 || !ready_to_display)
1471 20 : PG_RETURN_NULL();
1472 :
1473 : /*
1474 : * Read "writtenUpto" without holding a spinlock. Note that it may not be
1475 : * consistent with the other shared variables of the WAL receiver
1476 : * protected by a spinlock, but this should not be used for data integrity
1477 : * checks.
1478 : */
1479 24 : written_lsn = pg_atomic_read_u64(&WalRcv->writtenUpto);
1480 :
1481 : /* determine result type */
1482 24 : if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
1483 0 : elog(ERROR, "return type must be a row type");
1484 :
1485 24 : values = palloc0_array(Datum, tupdesc->natts);
1486 24 : nulls = palloc0_array(bool, tupdesc->natts);
1487 :
1488 : /* Fetch values */
1489 24 : values[0] = Int32GetDatum(pid);
1490 :
1491 24 : if (!has_privs_of_role(GetUserId(), ROLE_PG_READ_ALL_STATS))
1492 : {
1493 : /*
1494 : * Only superusers and roles with privileges of pg_read_all_stats can
1495 : * see details. Other users only get the pid value to know whether it
1496 : * is a WAL receiver, but no details.
1497 : */
1498 0 : memset(&nulls[1], true, sizeof(bool) * (tupdesc->natts - 1));
1499 : }
1500 : else
1501 : {
1502 24 : values[1] = CStringGetTextDatum(WalRcvGetStateString(state));
1503 :
1504 24 : if (!XLogRecPtrIsValid(receive_start_lsn))
1505 0 : nulls[2] = true;
1506 : else
1507 24 : values[2] = LSNGetDatum(receive_start_lsn);
1508 24 : values[3] = Int32GetDatum(receive_start_tli);
1509 24 : if (!XLogRecPtrIsValid(written_lsn))
1510 0 : nulls[4] = true;
1511 : else
1512 24 : values[4] = LSNGetDatum(written_lsn);
1513 24 : if (!XLogRecPtrIsValid(flushed_lsn))
1514 0 : nulls[5] = true;
1515 : else
1516 24 : values[5] = LSNGetDatum(flushed_lsn);
1517 24 : values[6] = Int32GetDatum(received_tli);
1518 24 : if (last_send_time == 0)
1519 0 : nulls[7] = true;
1520 : else
1521 24 : values[7] = TimestampTzGetDatum(last_send_time);
1522 24 : if (last_receipt_time == 0)
1523 0 : nulls[8] = true;
1524 : else
1525 24 : values[8] = TimestampTzGetDatum(last_receipt_time);
1526 24 : if (!XLogRecPtrIsValid(latest_end_lsn))
1527 0 : nulls[9] = true;
1528 : else
1529 24 : values[9] = LSNGetDatum(latest_end_lsn);
1530 24 : if (latest_end_time == 0)
1531 0 : nulls[10] = true;
1532 : else
1533 24 : values[10] = TimestampTzGetDatum(latest_end_time);
1534 24 : if (*slotname == '\0')
1535 24 : nulls[11] = true;
1536 : else
1537 0 : values[11] = CStringGetTextDatum(slotname);
1538 24 : if (*sender_host == '\0')
1539 0 : nulls[12] = true;
1540 : else
1541 24 : values[12] = CStringGetTextDatum(sender_host);
1542 24 : if (sender_port == 0)
1543 0 : nulls[13] = true;
1544 : else
1545 24 : values[13] = Int32GetDatum(sender_port);
1546 24 : if (*conninfo == '\0')
1547 0 : nulls[14] = true;
1548 : else
1549 24 : values[14] = CStringGetTextDatum(conninfo);
1550 : }
1551 :
1552 : /* Returns the record as Datum */
1553 24 : PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
1554 : }
|