Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiverfuncs.c
4 : *
5 : * This file contains functions used by the startup process to communicate
6 : * with the walreceiver process. Functions implementing walreceiver itself
7 : * are in walreceiver.c.
8 : *
9 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/walreceiverfuncs.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <sys/stat.h>
20 : #include <sys/time.h>
21 : #include <time.h>
22 : #include <unistd.h>
23 : #include <signal.h>
24 :
25 : #include "access/xlog_internal.h"
26 : #include "access/xlogrecovery.h"
27 : #include "pgstat.h"
28 : #include "replication/walreceiver.h"
29 : #include "storage/pmsignal.h"
30 : #include "storage/proc.h"
31 : #include "storage/shmem.h"
32 : #include "utils/timestamp.h"
33 : #include "utils/wait_event.h"
34 :
35 : WalRcvData *WalRcv = NULL;
36 :
37 : /*
38 : * How long to wait for walreceiver to start up after requesting
39 : * postmaster to launch it. In seconds.
40 : */
41 : #define WALRCV_STARTUP_TIMEOUT 10
42 :
43 : /* Report shared memory space needed by WalRcvShmemInit */
44 : Size
45 4479 : WalRcvShmemSize(void)
46 : {
47 4479 : Size size = 0;
48 :
49 4479 : size = add_size(size, sizeof(WalRcvData));
50 :
51 4479 : return size;
52 : }
53 :
54 : /* Allocate and initialize walreceiver-related shared memory */
55 : void
56 1158 : WalRcvShmemInit(void)
57 : {
58 : bool found;
59 :
60 1158 : WalRcv = (WalRcvData *)
61 1158 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
62 :
63 1158 : if (!found)
64 : {
65 : /* First time through, so initialize */
66 1158 : MemSet(WalRcv, 0, WalRcvShmemSize());
67 1158 : WalRcv->walRcvState = WALRCV_STOPPED;
68 1158 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
69 1158 : SpinLockInit(&WalRcv->mutex);
70 1158 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
71 1158 : WalRcv->procno = INVALID_PROC_NUMBER;
72 : }
73 1158 : }
74 :
75 : /* Is walreceiver running (or starting up)? */
76 : bool
77 1056 : WalRcvRunning(void)
78 : {
79 1056 : WalRcvData *walrcv = WalRcv;
80 : WalRcvState state;
81 : pg_time_t startTime;
82 :
83 1056 : SpinLockAcquire(&walrcv->mutex);
84 :
85 1056 : state = walrcv->walRcvState;
86 1056 : startTime = walrcv->startTime;
87 :
88 1056 : SpinLockRelease(&walrcv->mutex);
89 :
90 : /*
91 : * If it has taken too long for walreceiver to start up, give up. Setting
92 : * the state to STOPPED ensures that if walreceiver later does start up
93 : * after all, it will see that it's not supposed to be running and die
94 : * without doing anything.
95 : */
96 1056 : if (state == WALRCV_STARTING)
97 : {
98 1 : pg_time_t now = (pg_time_t) time(NULL);
99 :
100 1 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
101 : {
102 0 : bool stopped = false;
103 :
104 0 : SpinLockAcquire(&walrcv->mutex);
105 0 : if (walrcv->walRcvState == WALRCV_STARTING)
106 : {
107 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
108 0 : stopped = true;
109 : }
110 0 : SpinLockRelease(&walrcv->mutex);
111 :
112 0 : if (stopped)
113 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
114 : }
115 : }
116 :
117 1056 : if (state != WALRCV_STOPPED)
118 42 : return true;
119 : else
120 1014 : return false;
121 : }
122 :
123 : /* Return the state of the walreceiver. */
124 : WalRcvState
125 0 : WalRcvGetState(void)
126 : {
127 0 : WalRcvData *walrcv = WalRcv;
128 : WalRcvState state;
129 :
130 0 : SpinLockAcquire(&walrcv->mutex);
131 0 : state = walrcv->walRcvState;
132 0 : SpinLockRelease(&walrcv->mutex);
133 :
134 0 : return state;
135 : }
136 :
137 : /*
138 : * Is walreceiver running and streaming (or at least attempting to connect,
139 : * or starting up)?
140 : */
141 : bool
142 25718 : WalRcvStreaming(void)
143 : {
144 25718 : WalRcvData *walrcv = WalRcv;
145 : WalRcvState state;
146 : pg_time_t startTime;
147 :
148 25718 : SpinLockAcquire(&walrcv->mutex);
149 :
150 25718 : state = walrcv->walRcvState;
151 25718 : startTime = walrcv->startTime;
152 :
153 25718 : SpinLockRelease(&walrcv->mutex);
154 :
155 : /*
156 : * If it has taken too long for walreceiver to start up, give up. Setting
157 : * the state to STOPPED ensures that if walreceiver later does start up
158 : * after all, it will see that it's not supposed to be running and die
159 : * without doing anything.
160 : */
161 25718 : if (state == WALRCV_STARTING)
162 : {
163 284 : pg_time_t now = (pg_time_t) time(NULL);
164 :
165 284 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
166 : {
167 0 : bool stopped = false;
168 :
169 0 : SpinLockAcquire(&walrcv->mutex);
170 0 : if (walrcv->walRcvState == WALRCV_STARTING)
171 : {
172 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
173 0 : stopped = true;
174 : }
175 0 : SpinLockRelease(&walrcv->mutex);
176 :
177 0 : if (stopped)
178 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
179 : }
180 : }
181 :
182 25718 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
183 293 : state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
184 25430 : return true;
185 : else
186 288 : return false;
187 : }
188 :
189 : /*
190 : * Stop walreceiver (if running) and wait for it to die.
191 : * Executed by the Startup process.
192 : */
193 : void
194 1010 : ShutdownWalRcv(void)
195 : {
196 1010 : WalRcvData *walrcv = WalRcv;
197 1010 : pid_t walrcvpid = 0;
198 1010 : bool stopped = false;
199 :
200 : /*
201 : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
202 : * mode once it's finished, and will also request postmaster to not
203 : * restart itself.
204 : */
205 1010 : SpinLockAcquire(&walrcv->mutex);
206 1010 : switch (walrcv->walRcvState)
207 : {
208 968 : case WALRCV_STOPPED:
209 968 : break;
210 4 : case WALRCV_STARTING:
211 4 : walrcv->walRcvState = WALRCV_STOPPED;
212 4 : stopped = true;
213 4 : break;
214 :
215 38 : case WALRCV_CONNECTING:
216 : case WALRCV_STREAMING:
217 : case WALRCV_WAITING:
218 : case WALRCV_RESTARTING:
219 38 : walrcv->walRcvState = WALRCV_STOPPING;
220 : pg_fallthrough;
221 38 : case WALRCV_STOPPING:
222 38 : walrcvpid = walrcv->pid;
223 38 : break;
224 : }
225 1010 : SpinLockRelease(&walrcv->mutex);
226 :
227 : /* Unnecessary but consistent. */
228 1010 : if (stopped)
229 4 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
230 :
231 : /*
232 : * Signal walreceiver process if it was still running.
233 : */
234 1010 : if (walrcvpid != 0)
235 38 : kill(walrcvpid, SIGTERM);
236 :
237 : /*
238 : * Wait for walreceiver to acknowledge its death by setting state to
239 : * WALRCV_STOPPED.
240 : */
241 1010 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
242 1045 : while (WalRcvRunning())
243 35 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
244 : WAIT_EVENT_WAL_RECEIVER_EXIT);
245 1010 : ConditionVariableCancelSleep();
246 1010 : }
247 :
248 : /*
249 : * Request postmaster to start walreceiver.
250 : *
251 : * "recptr" indicates the position where streaming should begin. "conninfo"
252 : * is a libpq connection string to use. "slotname" is, optionally, the name
253 : * of a replication slot to acquire. "create_temp_slot" indicates to create
254 : * a temporary slot when no "slotname" is given.
255 : *
256 : * WAL receivers do not directly load GUC parameters used for the connection
257 : * to the primary, and rely on the values passed down by the caller of this
258 : * routine instead. Hence, the addition of any new parameters should happen
259 : * through this code path.
260 : */
261 : void
262 190 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
263 : const char *slotname, bool create_temp_slot)
264 : {
265 190 : WalRcvData *walrcv = WalRcv;
266 190 : bool launch = false;
267 190 : pg_time_t now = (pg_time_t) time(NULL);
268 : ProcNumber walrcv_proc;
269 :
270 : /*
271 : * We always start at the beginning of the segment. That prevents a broken
272 : * segment (i.e., with no records in the first half of a segment) from
273 : * being created by XLOG streaming, which might cause trouble later on if
274 : * the segment is e.g archived.
275 : */
276 190 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
277 190 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
278 :
279 190 : SpinLockAcquire(&walrcv->mutex);
280 :
281 : /* It better be stopped if we try to restart it */
282 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
283 : walrcv->walRcvState == WALRCV_WAITING);
284 :
285 190 : if (conninfo != NULL)
286 190 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
287 : else
288 0 : walrcv->conninfo[0] = '\0';
289 :
290 : /*
291 : * Use configured replication slot if present, and ignore the value of
292 : * create_temp_slot as the slot name should be persistent. Otherwise, use
293 : * create_temp_slot to determine whether this WAL receiver should create a
294 : * temporary slot by itself and use it, or not.
295 : */
296 190 : if (slotname != NULL && slotname[0] != '\0')
297 : {
298 55 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
299 55 : walrcv->is_temp_slot = false;
300 : }
301 : else
302 : {
303 135 : walrcv->slotname[0] = '\0';
304 135 : walrcv->is_temp_slot = create_temp_slot;
305 : }
306 :
307 190 : if (walrcv->walRcvState == WALRCV_STOPPED)
308 : {
309 183 : launch = true;
310 183 : walrcv->walRcvState = WALRCV_STARTING;
311 : }
312 : else
313 7 : walrcv->walRcvState = WALRCV_RESTARTING;
314 190 : walrcv->startTime = now;
315 :
316 : /*
317 : * If this is the first startup of walreceiver (on this timeline),
318 : * initialize flushedUpto and latestChunkStart to the starting point.
319 : */
320 190 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
321 : {
322 107 : walrcv->flushedUpto = recptr;
323 107 : walrcv->receivedTLI = tli;
324 107 : walrcv->latestChunkStart = recptr;
325 : }
326 190 : walrcv->receiveStart = recptr;
327 190 : walrcv->receiveStartTLI = tli;
328 :
329 190 : walrcv_proc = walrcv->procno;
330 :
331 190 : SpinLockRelease(&walrcv->mutex);
332 :
333 190 : if (launch)
334 183 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
335 7 : else if (walrcv_proc != INVALID_PROC_NUMBER)
336 7 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
337 190 : }
338 :
339 : /*
340 : * Returns the last+1 byte position that walreceiver has flushed.
341 : *
342 : * Optionally, returns the previous chunk start, that is the first byte
343 : * written in the most recent walreceiver flush cycle. Callers not
344 : * interested in that value may pass NULL for latestChunkStart. Same for
345 : * receiveTLI.
346 : */
347 : XLogRecPtr
348 24905 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
349 : {
350 24905 : WalRcvData *walrcv = WalRcv;
351 : XLogRecPtr recptr;
352 :
353 24905 : SpinLockAcquire(&walrcv->mutex);
354 24905 : recptr = walrcv->flushedUpto;
355 24905 : if (latestChunkStart)
356 23684 : *latestChunkStart = walrcv->latestChunkStart;
357 24905 : if (receiveTLI)
358 24672 : *receiveTLI = walrcv->receivedTLI;
359 24905 : SpinLockRelease(&walrcv->mutex);
360 :
361 24905 : return recptr;
362 : }
363 :
364 : /*
365 : * Returns the last+1 byte position that walreceiver has written.
366 : * This returns a recently written value without taking a lock.
367 : */
368 : XLogRecPtr
369 26 : GetWalRcvWriteRecPtr(void)
370 : {
371 26 : WalRcvData *walrcv = WalRcv;
372 :
373 26 : return pg_atomic_read_u64(&walrcv->writtenUpto);
374 : }
375 :
376 : /*
377 : * Returns the replication apply delay in ms or -1
378 : * if the apply delay info is not available
379 : */
380 : int
381 402 : GetReplicationApplyDelay(void)
382 : {
383 402 : WalRcvData *walrcv = WalRcv;
384 : XLogRecPtr receivePtr;
385 : XLogRecPtr replayPtr;
386 : TimestampTz chunkReplayStartTime;
387 :
388 402 : SpinLockAcquire(&walrcv->mutex);
389 402 : receivePtr = walrcv->flushedUpto;
390 402 : SpinLockRelease(&walrcv->mutex);
391 :
392 402 : replayPtr = GetXLogReplayRecPtr(NULL);
393 :
394 402 : if (receivePtr == replayPtr)
395 127 : return 0;
396 :
397 275 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
398 :
399 275 : if (chunkReplayStartTime == 0)
400 17 : return -1;
401 :
402 258 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
403 : GetCurrentTimestamp());
404 : }
405 :
406 : /*
407 : * Returns the network latency in ms, note that this includes any
408 : * difference in clock settings between the servers, as well as timezone.
409 : */
410 : int
411 402 : GetReplicationTransferLatency(void)
412 : {
413 402 : WalRcvData *walrcv = WalRcv;
414 : TimestampTz lastMsgSendTime;
415 : TimestampTz lastMsgReceiptTime;
416 :
417 402 : SpinLockAcquire(&walrcv->mutex);
418 402 : lastMsgSendTime = walrcv->lastMsgSendTime;
419 402 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
420 402 : SpinLockRelease(&walrcv->mutex);
421 :
422 402 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
423 : lastMsgReceiptTime);
424 : }
|