Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiverfuncs.c
4 : *
5 : * This file contains functions used by the startup process to communicate
6 : * with the walreceiver process. Functions implementing walreceiver itself
7 : * are in walreceiver.c.
8 : *
9 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/walreceiverfuncs.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <sys/stat.h>
20 : #include <sys/time.h>
21 : #include <time.h>
22 : #include <unistd.h>
23 : #include <signal.h>
24 :
25 : #include "access/xlog_internal.h"
26 : #include "access/xlogrecovery.h"
27 : #include "pgstat.h"
28 : #include "replication/walreceiver.h"
29 : #include "storage/pmsignal.h"
30 : #include "storage/proc.h"
31 : #include "storage/shmem.h"
32 : #include "storage/subsystems.h"
33 : #include "utils/timestamp.h"
34 : #include "utils/wait_event.h"
35 :
36 : WalRcvData *WalRcv = NULL;
37 :
38 : static void WalRcvShmemRequest(void *arg);
39 : static void WalRcvShmemInit(void *arg);
40 :
41 : const ShmemCallbacks WalRcvShmemCallbacks = {
42 : .request_fn = WalRcvShmemRequest,
43 : .init_fn = WalRcvShmemInit,
44 : };
45 :
46 : /*
47 : * How long to wait for walreceiver to start up after requesting
48 : * postmaster to launch it. In seconds.
49 : */
50 : #define WALRCV_STARTUP_TIMEOUT 10
51 :
52 : /* Register shared memory space needed by walreceiver */
53 : static void
54 1251 : WalRcvShmemRequest(void *arg)
55 : {
56 1251 : ShmemRequestStruct(.name = "Wal Receiver Ctl",
57 : .size = sizeof(WalRcvData),
58 : .ptr = (void **) &WalRcv,
59 : );
60 1251 : }
61 :
62 : /* Initialize walreceiver-related shared memory */
63 : static void
64 1248 : WalRcvShmemInit(void *arg)
65 : {
66 1248 : MemSet(WalRcv, 0, sizeof(WalRcvData));
67 1248 : WalRcv->walRcvState = WALRCV_STOPPED;
68 1248 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
69 1248 : SpinLockInit(&WalRcv->mutex);
70 1248 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
71 1248 : WalRcv->procno = INVALID_PROC_NUMBER;
72 1248 : }
73 :
74 : /* Is walreceiver running (or starting up)? */
75 : bool
76 1135 : WalRcvRunning(void)
77 : {
78 1135 : WalRcvData *walrcv = WalRcv;
79 : WalRcvState state;
80 : pg_time_t startTime;
81 :
82 1135 : SpinLockAcquire(&walrcv->mutex);
83 :
84 1135 : state = walrcv->walRcvState;
85 1135 : startTime = walrcv->startTime;
86 :
87 1135 : SpinLockRelease(&walrcv->mutex);
88 :
89 : /*
90 : * If it has taken too long for walreceiver to start up, give up. Setting
91 : * the state to STOPPED ensures that if walreceiver later does start up
92 : * after all, it will see that it's not supposed to be running and die
93 : * without doing anything.
94 : */
95 1135 : if (state == WALRCV_STARTING)
96 : {
97 0 : pg_time_t now = (pg_time_t) time(NULL);
98 :
99 0 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 : {
101 0 : bool stopped = false;
102 :
103 0 : SpinLockAcquire(&walrcv->mutex);
104 0 : if (walrcv->walRcvState == WALRCV_STARTING)
105 : {
106 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
107 0 : stopped = true;
108 : }
109 0 : SpinLockRelease(&walrcv->mutex);
110 :
111 0 : if (stopped)
112 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113 : }
114 : }
115 :
116 1135 : if (state != WALRCV_STOPPED)
117 45 : return true;
118 : else
119 1090 : return false;
120 : }
121 :
122 : /* Return the state of the walreceiver. */
123 : WalRcvState
124 0 : WalRcvGetState(void)
125 : {
126 0 : WalRcvData *walrcv = WalRcv;
127 : WalRcvState state;
128 :
129 0 : SpinLockAcquire(&walrcv->mutex);
130 0 : state = walrcv->walRcvState;
131 0 : SpinLockRelease(&walrcv->mutex);
132 :
133 0 : return state;
134 : }
135 :
136 : /*
137 : * Is walreceiver running and streaming (or at least attempting to connect,
138 : * or starting up)?
139 : */
140 : bool
141 35552 : WalRcvStreaming(void)
142 : {
143 35552 : WalRcvData *walrcv = WalRcv;
144 : WalRcvState state;
145 : pg_time_t startTime;
146 :
147 35552 : SpinLockAcquire(&walrcv->mutex);
148 :
149 35552 : state = walrcv->walRcvState;
150 35552 : startTime = walrcv->startTime;
151 :
152 35552 : SpinLockRelease(&walrcv->mutex);
153 :
154 : /*
155 : * If it has taken too long for walreceiver to start up, give up. Setting
156 : * the state to STOPPED ensures that if walreceiver later does start up
157 : * after all, it will see that it's not supposed to be running and die
158 : * without doing anything.
159 : */
160 35552 : if (state == WALRCV_STARTING)
161 : {
162 321 : pg_time_t now = (pg_time_t) time(NULL);
163 :
164 321 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
165 : {
166 0 : bool stopped = false;
167 :
168 0 : SpinLockAcquire(&walrcv->mutex);
169 0 : if (walrcv->walRcvState == WALRCV_STARTING)
170 : {
171 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
172 0 : stopped = true;
173 : }
174 0 : SpinLockRelease(&walrcv->mutex);
175 :
176 0 : if (stopped)
177 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
178 : }
179 : }
180 :
181 35552 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
182 349 : state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
183 35210 : return true;
184 : else
185 342 : return false;
186 : }
187 :
188 : /*
189 : * Stop walreceiver (if running) and wait for it to die.
190 : * Executed by the Startup process.
191 : */
192 : void
193 1085 : ShutdownWalRcv(void)
194 : {
195 1085 : WalRcvData *walrcv = WalRcv;
196 1085 : pid_t walrcvpid = 0;
197 1085 : bool stopped = false;
198 :
199 : /*
200 : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
201 : * mode once it's finished, and will also request postmaster to not
202 : * restart itself.
203 : */
204 1085 : SpinLockAcquire(&walrcv->mutex);
205 1085 : switch (walrcv->walRcvState)
206 : {
207 1041 : case WALRCV_STOPPED:
208 1041 : break;
209 3 : case WALRCV_STARTING:
210 3 : walrcv->walRcvState = WALRCV_STOPPED;
211 3 : stopped = true;
212 3 : break;
213 :
214 41 : case WALRCV_CONNECTING:
215 : case WALRCV_STREAMING:
216 : case WALRCV_WAITING:
217 : case WALRCV_RESTARTING:
218 41 : walrcv->walRcvState = WALRCV_STOPPING;
219 : pg_fallthrough;
220 41 : case WALRCV_STOPPING:
221 41 : walrcvpid = walrcv->pid;
222 41 : break;
223 : }
224 1085 : SpinLockRelease(&walrcv->mutex);
225 :
226 : /* Unnecessary but consistent. */
227 1085 : if (stopped)
228 3 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
229 :
230 : /*
231 : * Signal walreceiver process if it was still running.
232 : */
233 1085 : if (walrcvpid != 0)
234 41 : kill(walrcvpid, SIGTERM);
235 :
236 : /*
237 : * Wait for walreceiver to acknowledge its death by setting state to
238 : * WALRCV_STOPPED.
239 : */
240 1085 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
241 1122 : while (WalRcvRunning())
242 37 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
243 : WAIT_EVENT_WAL_RECEIVER_EXIT);
244 1085 : ConditionVariableCancelSleep();
245 1085 : }
246 :
247 : /*
248 : * Request postmaster to start walreceiver.
249 : *
250 : * "recptr" indicates the position where streaming should begin. "conninfo"
251 : * is a libpq connection string to use. "slotname" is, optionally, the name
252 : * of a replication slot to acquire. "create_temp_slot" indicates to create
253 : * a temporary slot when no "slotname" is given.
254 : *
255 : * WAL receivers do not directly load GUC parameters used for the connection
256 : * to the primary, and rely on the values passed down by the caller of this
257 : * routine instead. Hence, the addition of any new parameters should happen
258 : * through this code path.
259 : */
260 : void
261 208 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
262 : const char *slotname, bool create_temp_slot)
263 : {
264 208 : WalRcvData *walrcv = WalRcv;
265 208 : bool launch = false;
266 208 : pg_time_t now = (pg_time_t) time(NULL);
267 : ProcNumber walrcv_proc;
268 :
269 : /*
270 : * We always start at the beginning of the segment. That prevents a broken
271 : * segment (i.e., with no records in the first half of a segment) from
272 : * being created by XLOG streaming, which might cause trouble later on if
273 : * the segment is e.g archived.
274 : */
275 208 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
276 208 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
277 :
278 208 : SpinLockAcquire(&walrcv->mutex);
279 :
280 : /* It better be stopped if we try to restart it */
281 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
282 : walrcv->walRcvState == WALRCV_WAITING);
283 :
284 : /*
285 : * Use configured replication slot if present, and ignore the value of
286 : * create_temp_slot as the slot name should be persistent. Otherwise, use
287 : * create_temp_slot to determine whether this WAL receiver should create a
288 : * temporary slot by itself and use it, or not.
289 : */
290 208 : if (slotname != NULL && slotname[0] != '\0')
291 : {
292 63 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
293 63 : walrcv->is_temp_slot = false;
294 : }
295 : else
296 : {
297 145 : walrcv->slotname[0] = '\0';
298 145 : walrcv->is_temp_slot = create_temp_slot;
299 : }
300 :
301 : /*
302 : * While waiting for instructions, the WAL receiver uses the same
303 : * connection, so do not clobber the user-visible conninfo already saved.
304 : */
305 208 : if (walrcv->walRcvState == WALRCV_STOPPED)
306 : {
307 200 : launch = true;
308 200 : walrcv->walRcvState = WALRCV_STARTING;
309 :
310 200 : if (conninfo != NULL)
311 200 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
312 : else
313 0 : walrcv->conninfo[0] = '\0';
314 : }
315 : else
316 8 : walrcv->walRcvState = WALRCV_RESTARTING;
317 208 : walrcv->startTime = now;
318 :
319 : /*
320 : * If this is the first startup of walreceiver (on this timeline),
321 : * initialize flushedUpto and latestChunkStart to the starting point.
322 : */
323 208 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
324 : {
325 114 : walrcv->flushedUpto = recptr;
326 114 : walrcv->receivedTLI = tli;
327 114 : walrcv->latestChunkStart = recptr;
328 :
329 : /*
330 : * Pairs with pg_atomic_read_membarrier_u64() in
331 : * GetWalRcvWriteRecPtr().
332 : */
333 114 : pg_atomic_write_membarrier_u64(&walrcv->writtenUpto, recptr);
334 : }
335 208 : walrcv->receiveStart = recptr;
336 208 : walrcv->receiveStartTLI = tli;
337 :
338 208 : walrcv_proc = walrcv->procno;
339 :
340 208 : SpinLockRelease(&walrcv->mutex);
341 :
342 208 : if (launch)
343 200 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
344 8 : else if (walrcv_proc != INVALID_PROC_NUMBER)
345 8 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
346 208 : }
347 :
348 : /*
349 : * Returns the last+1 byte position that walreceiver has flushed.
350 : *
351 : * Optionally, returns the previous chunk start, that is the first byte
352 : * written in the most recent walreceiver flush cycle. Callers not
353 : * interested in that value may pass NULL for latestChunkStart. Same for
354 : * receiveTLI.
355 : */
356 : XLogRecPtr
357 34750 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
358 : {
359 34750 : WalRcvData *walrcv = WalRcv;
360 : XLogRecPtr recptr;
361 :
362 34750 : SpinLockAcquire(&walrcv->mutex);
363 34750 : recptr = walrcv->flushedUpto;
364 34750 : if (latestChunkStart)
365 33393 : *latestChunkStart = walrcv->latestChunkStart;
366 34750 : if (receiveTLI)
367 34497 : *receiveTLI = walrcv->receivedTLI;
368 34750 : SpinLockRelease(&walrcv->mutex);
369 :
370 34750 : return recptr;
371 : }
372 :
373 : /*
374 : * Returns the last+1 byte position that walreceiver has written.
375 : *
376 : * Use pg_atomic_read_membarrier_u64() to ensure that callers see up-to-date
377 : * shared memory state, matching the barrier semantics provided by the
378 : * spinlock in GetWalRcvFlushRecPtr() and other LSN-position functions.
379 : */
380 : XLogRecPtr
381 47 : GetWalRcvWriteRecPtr(void)
382 : {
383 47 : WalRcvData *walrcv = WalRcv;
384 :
385 47 : return pg_atomic_read_membarrier_u64(&walrcv->writtenUpto);
386 : }
387 :
388 : /*
389 : * Returns the replication apply delay in ms or -1
390 : * if the apply delay info is not available
391 : */
392 : int
393 406 : GetReplicationApplyDelay(void)
394 : {
395 406 : WalRcvData *walrcv = WalRcv;
396 : XLogRecPtr receivePtr;
397 : XLogRecPtr replayPtr;
398 : TimestampTz chunkReplayStartTime;
399 :
400 406 : SpinLockAcquire(&walrcv->mutex);
401 406 : receivePtr = walrcv->flushedUpto;
402 406 : SpinLockRelease(&walrcv->mutex);
403 :
404 406 : replayPtr = GetXLogReplayRecPtr(NULL);
405 :
406 406 : if (receivePtr == replayPtr)
407 131 : return 0;
408 :
409 275 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
410 :
411 275 : if (chunkReplayStartTime == 0)
412 7 : return -1;
413 :
414 268 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
415 : GetCurrentTimestamp());
416 : }
417 :
418 : /*
419 : * Returns the network latency in ms, note that this includes any
420 : * difference in clock settings between the servers, as well as timezone.
421 : */
422 : int
423 406 : GetReplicationTransferLatency(void)
424 : {
425 406 : WalRcvData *walrcv = WalRcv;
426 : TimestampTz lastMsgSendTime;
427 : TimestampTz lastMsgReceiptTime;
428 :
429 406 : SpinLockAcquire(&walrcv->mutex);
430 406 : lastMsgSendTime = walrcv->lastMsgSendTime;
431 406 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
432 406 : SpinLockRelease(&walrcv->mutex);
433 :
434 406 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
435 : lastMsgReceiptTime);
436 : }
|