Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * walreceiverfuncs.c
4 : : *
5 : : * This file contains functions used by the startup process to communicate
6 : : * with the walreceiver process. Functions implementing walreceiver itself
7 : : * are in walreceiver.c.
8 : : *
9 : : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
10 : : *
11 : : *
12 : : * IDENTIFICATION
13 : : * src/backend/replication/walreceiverfuncs.c
14 : : *
15 : : *-------------------------------------------------------------------------
16 : : */
17 : : #include "postgres.h"
18 : :
19 : : #include <sys/stat.h>
20 : : #include <sys/time.h>
21 : : #include <time.h>
22 : : #include <unistd.h>
23 : : #include <signal.h>
24 : :
25 : : #include "access/xlog_internal.h"
26 : : #include "access/xlogrecovery.h"
27 : : #include "pgstat.h"
28 : : #include "replication/walreceiver.h"
29 : : #include "storage/pmsignal.h"
30 : : #include "storage/proc.h"
31 : : #include "storage/shmem.h"
32 : : #include "storage/subsystems.h"
33 : : #include "utils/timestamp.h"
34 : : #include "utils/wait_event.h"
35 : :
36 : : WalRcvData *WalRcv = NULL;
37 : :
38 : : static void WalRcvShmemRequest(void *arg);
39 : : static void WalRcvShmemInit(void *arg);
40 : :
41 : : const ShmemCallbacks WalRcvShmemCallbacks = {
42 : : .request_fn = WalRcvShmemRequest,
43 : : .init_fn = WalRcvShmemInit,
44 : : };
45 : :
46 : : /*
47 : : * How long to wait for walreceiver to start up after requesting
48 : : * postmaster to launch it. In seconds.
49 : : */
50 : : #define WALRCV_STARTUP_TIMEOUT 10
51 : :
52 : : /* Register shared memory space needed by walreceiver */
53 : : static void
85 heikki.linnakangas@i 54 :GNC 1212 : WalRcvShmemRequest(void *arg)
55 : : {
56 : 1212 : ShmemRequestStruct(.name = "Wal Receiver Ctl",
57 : : .size = sizeof(WalRcvData),
58 : : .ptr = (void **) &WalRcv,
59 : : );
6010 heikki.linnakangas@i 60 :GIC 1212 : }
61 : :
62 : : /* Initialize walreceiver-related shared memory */
63 : : static void
85 heikki.linnakangas@i 64 :GNC 1209 : WalRcvShmemInit(void *arg)
65 : : {
66 [ + - + - : 1209 : MemSet(WalRcv, 0, sizeof(WalRcvData));
+ - - + -
- ]
67 : 1209 : WalRcv->walRcvState = WALRCV_STOPPED;
68 : 1209 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
69 : 1209 : SpinLockInit(&WalRcv->mutex);
70 : 1209 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
71 : 1209 : WalRcv->procno = INVALID_PROC_NUMBER;
6010 heikki.linnakangas@i 72 :CBC 1209 : }
73 : :
74 : : /* Is walreceiver running (or starting up)? */
75 : : bool
4947 76 : 1102 : WalRcvRunning(void)
77 : : {
3920 rhaas@postgresql.org 78 : 1102 : WalRcvData *walrcv = WalRcv;
79 : : WalRcvState state;
80 : : pg_time_t startTime;
81 : :
6010 heikki.linnakangas@i 82 [ - + ]: 1102 : SpinLockAcquire(&walrcv->mutex);
83 : :
5998 84 : 1102 : state = walrcv->walRcvState;
85 : 1102 : startTime = walrcv->startTime;
86 : :
87 : 1102 : SpinLockRelease(&walrcv->mutex);
88 : :
89 : : /*
90 : : * If it has taken too long for walreceiver to start up, give up. Setting
91 : : * the state to STOPPED ensures that if walreceiver later does start up
92 : : * after all, it will see that it's not supposed to be running and die
93 : : * without doing anything.
94 : : */
95 [ - + ]: 1102 : if (state == WALRCV_STARTING)
96 : : {
5968 bruce@momjian.us 97 :LBC (1) : pg_time_t now = (pg_time_t) time(NULL);
98 : :
5998 heikki.linnakangas@i 99 [ # # ]: (1) : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 : : {
1936 tmunro@postgresql.or 101 :UBC 0 : bool stopped = false;
102 : :
103 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
5998 heikki.linnakangas@i 104 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
105 : : {
106 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1936 tmunro@postgresql.or 107 : 0 : stopped = true;
108 : : }
5998 heikki.linnakangas@i 109 : 0 : SpinLockRelease(&walrcv->mutex);
110 : :
1936 tmunro@postgresql.or 111 [ # # ]: 0 : if (stopped)
112 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113 : : }
114 : : }
115 : :
5998 heikki.linnakangas@i 116 [ + + ]:CBC 1102 : if (state != WALRCV_STOPPED)
117 : 46 : return true;
118 : : else
119 : 1056 : return false;
120 : : }
121 : :
122 : : /* Return the state of the walreceiver. */
123 : : WalRcvState
238 michael@paquier.xyz 124 :GNC 170 : WalRcvGetState(void)
125 : : {
126 : 170 : WalRcvData *walrcv = WalRcv;
127 : : WalRcvState state;
128 : :
129 : 170 : SpinLockAcquire(&walrcv->mutex);
130 : 170 : state = walrcv->walRcvState;
131 : 170 : SpinLockRelease(&walrcv->mutex);
132 : :
133 : 170 : return state;
134 : : }
135 : :
136 : : /*
137 : : * Is walreceiver running and streaming (or at least attempting to connect,
138 : : * or starting up)?
139 : : */
140 : : bool
4947 heikki.linnakangas@i 141 :CBC 35722 : WalRcvStreaming(void)
142 : : {
3920 rhaas@postgresql.org 143 : 35722 : WalRcvData *walrcv = WalRcv;
144 : : WalRcvState state;
145 : : pg_time_t startTime;
146 : :
4947 heikki.linnakangas@i 147 [ - + ]: 35722 : SpinLockAcquire(&walrcv->mutex);
148 : :
149 : 35722 : state = walrcv->walRcvState;
150 : 35722 : startTime = walrcv->startTime;
151 : :
152 : 35722 : SpinLockRelease(&walrcv->mutex);
153 : :
154 : : /*
155 : : * If it has taken too long for walreceiver to start up, give up. Setting
156 : : * the state to STOPPED ensures that if walreceiver later does start up
157 : : * after all, it will see that it's not supposed to be running and die
158 : : * without doing anything.
159 : : */
160 [ + + ]: 35722 : if (state == WALRCV_STARTING)
161 : : {
162 : 325 : pg_time_t now = (pg_time_t) time(NULL);
163 : :
164 [ - + ]: 325 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
165 : : {
1936 tmunro@postgresql.or 166 :UBC 0 : bool stopped = false;
167 : :
168 [ # # ]: 0 : SpinLockAcquire(&walrcv->mutex);
4947 heikki.linnakangas@i 169 [ # # ]: 0 : if (walrcv->walRcvState == WALRCV_STARTING)
170 : : {
171 : 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
1936 tmunro@postgresql.or 172 : 0 : stopped = true;
173 : : }
4947 heikki.linnakangas@i 174 : 0 : SpinLockRelease(&walrcv->mutex);
175 : :
1936 tmunro@postgresql.or 176 [ # # ]: 0 : if (stopped)
177 : 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
178 : : }
179 : : }
180 : :
4947 heikki.linnakangas@i 181 [ + + + + :CBC 35722 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
+ + ]
158 michael@paquier.xyz 182 [ + + ]:GNC 3269 : state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
4947 heikki.linnakangas@i 183 :CBC 32461 : return true;
184 : : else
185 : 3261 : return false;
186 : : }
187 : :
188 : : /*
189 : : * Stop walreceiver (if running) and wait for it to die.
190 : : * Executed by the Startup process.
191 : : */
192 : : void
6010 193 : 1051 : ShutdownWalRcv(void)
194 : : {
3920 rhaas@postgresql.org 195 : 1051 : WalRcvData *walrcv = WalRcv;
5968 bruce@momjian.us 196 : 1051 : pid_t walrcvpid = 0;
1936 tmunro@postgresql.or 197 : 1051 : bool stopped = false;
198 : :
199 : : /*
200 : : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
201 : : * mode once it's finished, and will also request postmaster to not
202 : : * restart itself.
203 : : */
6010 heikki.linnakangas@i 204 [ - + ]: 1051 : SpinLockAcquire(&walrcv->mutex);
5968 bruce@momjian.us 205 [ + + + - : 1051 : switch (walrcv->walRcvState)
- ]
206 : : {
5998 heikki.linnakangas@i 207 : 1004 : case WALRCV_STOPPED:
208 : 1004 : break;
209 : 5 : case WALRCV_STARTING:
210 : 5 : walrcv->walRcvState = WALRCV_STOPPED;
1936 tmunro@postgresql.or 211 : 5 : stopped = true;
5998 heikki.linnakangas@i 212 : 5 : break;
213 : :
158 michael@paquier.xyz 214 :GNC 42 : case WALRCV_CONNECTING:
4947 heikki.linnakangas@i 215 :ECB (30) : case WALRCV_STREAMING:
216 : : case WALRCV_WAITING:
217 : : case WALRCV_RESTARTING:
5998 heikki.linnakangas@i 218 :CBC 42 : walrcv->walRcvState = WALRCV_STOPPING;
219 : : pg_fallthrough;
220 : 42 : case WALRCV_STOPPING:
221 : 42 : walrcvpid = walrcv->pid;
222 : 42 : break;
223 : : }
6010 224 : 1051 : SpinLockRelease(&walrcv->mutex);
225 : :
226 : : /* Unnecessary but consistent. */
1936 tmunro@postgresql.or 227 [ + + ]: 1051 : if (stopped)
228 : 5 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
229 : :
230 : : /*
231 : : * Signal walreceiver process if it was still running.
232 : : */
6010 heikki.linnakangas@i 233 [ + + ]: 1051 : if (walrcvpid != 0)
234 : 42 : kill(walrcvpid, SIGTERM);
235 : :
236 : : /*
237 : : * Wait for walreceiver to acknowledge its death by setting state to
238 : : * WALRCV_STOPPED.
239 : : */
1936 tmunro@postgresql.or 240 : 1051 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
4947 heikki.linnakangas@i 241 [ + + ]: 1089 : while (WalRcvRunning())
1936 tmunro@postgresql.or 242 : 38 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
243 : : WAIT_EVENT_WAL_RECEIVER_EXIT);
244 : 1051 : ConditionVariableCancelSleep();
6010 heikki.linnakangas@i 245 : 1051 : }
246 : :
247 : : /*
248 : : * Request postmaster to start walreceiver.
249 : : *
250 : : * "recptr" indicates the position where streaming should begin. "conninfo"
251 : : * is a libpq connection string to use. "slotname" is, optionally, the name
252 : : * of a replication slot to acquire. "create_temp_slot" indicates to create
253 : : * a temporary slot when no "slotname" is given.
254 : : *
255 : : * WAL receivers do not directly load GUC parameters used for the connection
256 : : * to the primary, and rely on the values passed down by the caller of this
257 : : * routine instead. Hence, the addition of any new parameters should happen
258 : : * through this code path.
259 : : */
260 : : void
4533 rhaas@postgresql.org 261 : 212 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
262 : : const char *slotname, bool create_temp_slot)
263 : : {
3920 264 : 212 : WalRcvData *walrcv = WalRcv;
4947 heikki.linnakangas@i 265 : 212 : bool launch = false;
5968 bruce@momjian.us 266 : 212 : pg_time_t now = (pg_time_t) time(NULL);
267 : : ProcNumber walrcv_proc;
268 : :
269 : : /*
270 : : * We always start at the beginning of the segment. That prevents a broken
271 : : * segment (i.e., with no records in the first half of a segment) from
272 : : * being created by XLOG streaming, which might cause trouble later on if
273 : : * the segment is e.g archived.
274 : : */
3206 andres@anarazel.de 275 [ + - ]: 212 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
276 : 212 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
277 : :
5841 tgl@sss.pgh.pa.us 278 [ - + ]: 212 : SpinLockAcquire(&walrcv->mutex);
279 : :
280 : : /* It better be stopped if we try to restart it */
4947 heikki.linnakangas@i 281 [ + + - + ]: 212 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
282 : : walrcv->walRcvState == WALRCV_WAITING);
283 : :
284 : : /*
285 : : * Use configured replication slot if present, and ignore the value of
286 : : * create_temp_slot as the slot name should be persistent. Otherwise, use
287 : : * create_temp_slot to determine whether this WAL receiver should create a
288 : : * temporary slot by itself and use it, or not.
289 : : */
2286 alvherre@alvh.no-ip. 290 [ + - + + ]: 212 : if (slotname != NULL && slotname[0] != '\0')
291 : : {
503 peter@eisentraut.org 292 : 62 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
2286 alvherre@alvh.no-ip. 293 : 62 : walrcv->is_temp_slot = false;
294 : : }
295 : : else
296 : : {
4533 rhaas@postgresql.org 297 : 150 : walrcv->slotname[0] = '\0';
2286 alvherre@alvh.no-ip. 298 : 150 : walrcv->is_temp_slot = create_temp_slot;
299 : : }
300 : :
301 : : /*
302 : : * While waiting for instructions, the WAL receiver uses the same
303 : : * connection, so do not clobber the user-visible conninfo already saved.
304 : : */
4947 heikki.linnakangas@i 305 [ + + ]: 212 : if (walrcv->walRcvState == WALRCV_STOPPED)
306 : : {
307 : 203 : launch = true;
308 : 203 : walrcv->walRcvState = WALRCV_STARTING;
309 : :
38 michael@paquier.xyz 310 [ + - ]: 203 : if (conninfo != NULL)
311 : 203 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
312 : : else
38 michael@paquier.xyz 313 :UBC 0 : walrcv->conninfo[0] = '\0';
314 : : }
315 : : else
4947 heikki.linnakangas@i 316 :CBC 9 : walrcv->walRcvState = WALRCV_RESTARTING;
5998 317 : 212 : walrcv->startTime = now;
318 : :
319 : : /*
320 : : * If this is the first startup of walreceiver (on this timeline),
321 : : * initialize flushedUpto and latestChunkStart to the starting point.
322 : : */
236 alvherre@kurilemu.de 323 [ + + - + ]:GNC 212 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
324 : : {
2274 tmunro@postgresql.or 325 :CBC 117 : walrcv->flushedUpto = recptr;
4801 heikki.linnakangas@i 326 : 117 : walrcv->receivedTLI = tli;
5600 327 : 117 : walrcv->latestChunkStart = recptr;
328 : :
329 : : /*
330 : : * Pairs with pg_atomic_read_membarrier_u64() in
331 : : * GetWalRcvWriteRecPtr().
332 : : */
58 akorotkov@postgresql 333 :GNC 117 : pg_atomic_write_membarrier_u64(&walrcv->writtenUpto, recptr);
334 : : }
5600 heikki.linnakangas@i 335 :CBC 212 : walrcv->receiveStart = recptr;
4947 336 : 212 : walrcv->receiveStartTLI = tli;
337 : :
606 338 : 212 : walrcv_proc = walrcv->procno;
339 : :
6010 340 : 212 : SpinLockRelease(&walrcv->mutex);
341 : :
4947 342 [ + + ]: 212 : if (launch)
343 : 203 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
606 344 [ + - ]: 9 : else if (walrcv_proc != INVALID_PROC_NUMBER)
345 : 9 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
6010 346 : 212 : }
347 : :
348 : : /*
349 : : * Returns the last+1 byte position that walreceiver has flushed.
350 : : *
351 : : * Optionally, returns the previous chunk start, that is the first byte
352 : : * written in the most recent walreceiver flush cycle. Callers not
353 : : * interested in that value may pass NULL for latestChunkStart. Same for
354 : : * receiveTLI.
355 : : */
356 : : XLogRecPtr
2274 tmunro@postgresql.or 357 : 32105 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
358 : : {
3920 rhaas@postgresql.org 359 : 32105 : WalRcvData *walrcv = WalRcv;
360 : : XLogRecPtr recptr;
361 : :
6010 heikki.linnakangas@i 362 [ - + ]: 32105 : SpinLockAcquire(&walrcv->mutex);
2274 tmunro@postgresql.or 363 : 32105 : recptr = walrcv->flushedUpto;
5841 tgl@sss.pgh.pa.us 364 [ + + ]: 32105 : if (latestChunkStart)
365 : 30688 : *latestChunkStart = walrcv->latestChunkStart;
4947 heikki.linnakangas@i 366 [ + + ]: 32105 : if (receiveTLI)
367 : 31843 : *receiveTLI = walrcv->receivedTLI;
6010 368 : 32105 : SpinLockRelease(&walrcv->mutex);
369 : :
370 : 32105 : return recptr;
371 : : }
372 : :
373 : : /*
374 : : * Returns the last+1 byte position that walreceiver has written.
375 : : *
376 : : * Use pg_atomic_read_membarrier_u64() to ensure that callers see up-to-date
377 : : * shared memory state, matching the barrier semantics provided by the
378 : : * spinlock in GetWalRcvFlushRecPtr() and other LSN-position functions.
379 : : */
380 : : XLogRecPtr
2274 tmunro@postgresql.or 381 :GBC 46 : GetWalRcvWriteRecPtr(void)
382 : : {
383 : 46 : WalRcvData *walrcv = WalRcv;
384 : :
58 akorotkov@postgresql 385 :GNC 46 : return pg_atomic_read_membarrier_u64(&walrcv->writtenUpto);
386 : : }
387 : :
388 : : /*
389 : : * Returns the replication apply delay in ms or -1
390 : : * if the apply delay info is not available
391 : : */
392 : : int
5295 simon@2ndQuadrant.co 393 :CBC 422 : GetReplicationApplyDelay(void)
394 : : {
3920 rhaas@postgresql.org 395 : 422 : WalRcvData *walrcv = WalRcv;
396 : : XLogRecPtr receivePtr;
397 : : XLogRecPtr replayPtr;
398 : : TimestampTz chunkReplayStartTime;
399 : :
5295 simon@2ndQuadrant.co 400 [ - + ]: 422 : SpinLockAcquire(&walrcv->mutex);
2274 tmunro@postgresql.or 401 : 422 : receivePtr = walrcv->flushedUpto;
5295 simon@2ndQuadrant.co 402 : 422 : SpinLockRelease(&walrcv->mutex);
403 : :
4940 heikki.linnakangas@i 404 : 422 : replayPtr = GetXLogReplayRecPtr(NULL);
405 : :
4932 alvherre@alvh.no-ip. 406 [ + + ]: 422 : if (receivePtr == replayPtr)
5295 simon@2ndQuadrant.co 407 : 149 : return 0;
408 : :
3395 peter_e@gmx.net 409 : 273 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
410 : :
411 [ + + ]: 273 : if (chunkReplayStartTime == 0)
4126 ishii@postgresql.org 412 : 5 : return -1;
413 : :
2058 tgl@sss.pgh.pa.us 414 : 268 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
415 : : GetCurrentTimestamp());
416 : : }
417 : :
418 : : /*
419 : : * Returns the network latency in ms, note that this includes any
420 : : * difference in clock settings between the servers, as well as timezone.
421 : : */
422 : : int
5295 simon@2ndQuadrant.co 423 : 422 : GetReplicationTransferLatency(void)
424 : : {
3920 rhaas@postgresql.org 425 : 422 : WalRcvData *walrcv = WalRcv;
426 : : TimestampTz lastMsgSendTime;
427 : : TimestampTz lastMsgReceiptTime;
428 : :
5295 simon@2ndQuadrant.co 429 [ - + ]: 422 : SpinLockAcquire(&walrcv->mutex);
430 : 422 : lastMsgSendTime = walrcv->lastMsgSendTime;
431 : 422 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
432 : 422 : SpinLockRelease(&walrcv->mutex);
433 : :
2058 tgl@sss.pgh.pa.us 434 : 422 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
435 : : lastMsgReceiptTime);
436 : : }
|