Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiverfuncs.c
4 : *
5 : * This file contains functions used by the startup process to communicate
6 : * with the walreceiver process. Functions implementing walreceiver itself
7 : * are in walreceiver.c.
8 : *
9 : * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/walreceiverfuncs.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <sys/stat.h>
20 : #include <sys/time.h>
21 : #include <time.h>
22 : #include <unistd.h>
23 : #include <signal.h>
24 :
25 : #include "access/xlog_internal.h"
26 : #include "access/xlogrecovery.h"
27 : #include "pgstat.h"
28 : #include "replication/walreceiver.h"
29 : #include "storage/pmsignal.h"
30 : #include "storage/proc.h"
31 : #include "storage/shmem.h"
32 : #include "utils/timestamp.h"
33 :
34 : WalRcvData *WalRcv = NULL;
35 :
36 : /*
37 : * How long to wait for walreceiver to start up after requesting
38 : * postmaster to launch it. In seconds.
39 : */
40 : #define WALRCV_STARTUP_TIMEOUT 10
41 :
42 : /* Report shared memory space needed by WalRcvShmemInit */
43 : Size
44 8500 : WalRcvShmemSize(void)
45 : {
46 8500 : Size size = 0;
47 :
48 8500 : size = add_size(size, sizeof(WalRcvData));
49 :
50 8500 : return size;
51 : }
52 :
53 : /* Allocate and initialize walreceiver-related shared memory */
54 : void
55 2200 : WalRcvShmemInit(void)
56 : {
57 : bool found;
58 :
59 2200 : WalRcv = (WalRcvData *)
60 2200 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
61 :
62 2200 : if (!found)
63 : {
64 : /* First time through, so initialize */
65 2200 : MemSet(WalRcv, 0, WalRcvShmemSize());
66 2200 : WalRcv->walRcvState = WALRCV_STOPPED;
67 2200 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
68 2200 : SpinLockInit(&WalRcv->mutex);
69 2200 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
70 2200 : WalRcv->procno = INVALID_PROC_NUMBER;
71 : }
72 2200 : }
73 :
74 : /* Is walreceiver running (or starting up)? */
75 : bool
76 1976 : WalRcvRunning(void)
77 : {
78 1976 : WalRcvData *walrcv = WalRcv;
79 : WalRcvState state;
80 : pg_time_t startTime;
81 :
82 1976 : SpinLockAcquire(&walrcv->mutex);
83 :
84 1976 : state = walrcv->walRcvState;
85 1976 : startTime = walrcv->startTime;
86 :
87 1976 : SpinLockRelease(&walrcv->mutex);
88 :
89 : /*
90 : * If it has taken too long for walreceiver to start up, give up. Setting
91 : * the state to STOPPED ensures that if walreceiver later does start up
92 : * after all, it will see that it's not supposed to be running and die
93 : * without doing anything.
94 : */
95 1976 : if (state == WALRCV_STARTING)
96 : {
97 2 : pg_time_t now = (pg_time_t) time(NULL);
98 :
99 2 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
100 : {
101 0 : bool stopped = false;
102 :
103 0 : SpinLockAcquire(&walrcv->mutex);
104 0 : if (walrcv->walRcvState == WALRCV_STARTING)
105 : {
106 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
107 0 : stopped = true;
108 : }
109 0 : SpinLockRelease(&walrcv->mutex);
110 :
111 0 : if (stopped)
112 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
113 : }
114 : }
115 :
116 1976 : if (state != WALRCV_STOPPED)
117 72 : return true;
118 : else
119 1904 : return false;
120 : }
121 :
122 : /* Return the state of the walreceiver. */
123 : WalRcvState
124 0 : WalRcvGetState(void)
125 : {
126 0 : WalRcvData *walrcv = WalRcv;
127 : WalRcvState state;
128 :
129 0 : SpinLockAcquire(&walrcv->mutex);
130 0 : state = walrcv->walRcvState;
131 0 : SpinLockRelease(&walrcv->mutex);
132 :
133 0 : return state;
134 : }
135 :
136 : /*
137 : * Is walreceiver running and streaming (or at least attempting to connect,
138 : * or starting up)?
139 : */
140 : bool
141 20220 : WalRcvStreaming(void)
142 : {
143 20220 : WalRcvData *walrcv = WalRcv;
144 : WalRcvState state;
145 : pg_time_t startTime;
146 :
147 20220 : SpinLockAcquire(&walrcv->mutex);
148 :
149 20220 : state = walrcv->walRcvState;
150 20220 : startTime = walrcv->startTime;
151 :
152 20220 : SpinLockRelease(&walrcv->mutex);
153 :
154 : /*
155 : * If it has taken too long for walreceiver to start up, give up. Setting
156 : * the state to STOPPED ensures that if walreceiver later does start up
157 : * after all, it will see that it's not supposed to be running and die
158 : * without doing anything.
159 : */
160 20220 : if (state == WALRCV_STARTING)
161 : {
162 584 : pg_time_t now = (pg_time_t) time(NULL);
163 :
164 584 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
165 : {
166 0 : bool stopped = false;
167 :
168 0 : SpinLockAcquire(&walrcv->mutex);
169 0 : if (walrcv->walRcvState == WALRCV_STARTING)
170 : {
171 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
172 0 : stopped = true;
173 : }
174 0 : SpinLockRelease(&walrcv->mutex);
175 :
176 0 : if (stopped)
177 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
178 : }
179 : }
180 :
181 20220 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
182 : state == WALRCV_RESTARTING)
183 19644 : return true;
184 : else
185 576 : return false;
186 : }
187 :
188 : /*
189 : * Stop walreceiver (if running) and wait for it to die.
190 : * Executed by the Startup process.
191 : */
192 : void
193 1902 : ShutdownWalRcv(void)
194 : {
195 1902 : WalRcvData *walrcv = WalRcv;
196 1902 : pid_t walrcvpid = 0;
197 1902 : bool stopped = false;
198 :
199 : /*
200 : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
201 : * mode once it's finished, and will also request postmaster to not
202 : * restart itself.
203 : */
204 1902 : SpinLockAcquire(&walrcv->mutex);
205 1902 : switch (walrcv->walRcvState)
206 : {
207 1826 : case WALRCV_STOPPED:
208 1826 : break;
209 12 : case WALRCV_STARTING:
210 12 : walrcv->walRcvState = WALRCV_STOPPED;
211 12 : stopped = true;
212 12 : break;
213 :
214 64 : case WALRCV_STREAMING:
215 : case WALRCV_WAITING:
216 : case WALRCV_RESTARTING:
217 64 : walrcv->walRcvState = WALRCV_STOPPING;
218 : /* fall through */
219 64 : case WALRCV_STOPPING:
220 64 : walrcvpid = walrcv->pid;
221 64 : break;
222 : }
223 1902 : SpinLockRelease(&walrcv->mutex);
224 :
225 : /* Unnecessary but consistent. */
226 1902 : if (stopped)
227 12 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
228 :
229 : /*
230 : * Signal walreceiver process if it was still running.
231 : */
232 1902 : if (walrcvpid != 0)
233 64 : kill(walrcvpid, SIGTERM);
234 :
235 : /*
236 : * Wait for walreceiver to acknowledge its death by setting state to
237 : * WALRCV_STOPPED.
238 : */
239 1902 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
240 1966 : while (WalRcvRunning())
241 64 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
242 : WAIT_EVENT_WAL_RECEIVER_EXIT);
243 1902 : ConditionVariableCancelSleep();
244 1902 : }
245 :
246 : /*
247 : * Request postmaster to start walreceiver.
248 : *
249 : * "recptr" indicates the position where streaming should begin. "conninfo"
250 : * is a libpq connection string to use. "slotname" is, optionally, the name
251 : * of a replication slot to acquire. "create_temp_slot" indicates to create
252 : * a temporary slot when no "slotname" is given.
253 : *
254 : * WAL receivers do not directly load GUC parameters used for the connection
255 : * to the primary, and rely on the values passed down by the caller of this
256 : * routine instead. Hence, the addition of any new parameters should happen
257 : * through this code path.
258 : */
259 : void
260 370 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
261 : const char *slotname, bool create_temp_slot)
262 : {
263 370 : WalRcvData *walrcv = WalRcv;
264 370 : bool launch = false;
265 370 : pg_time_t now = (pg_time_t) time(NULL);
266 : ProcNumber walrcv_proc;
267 :
268 : /*
269 : * We always start at the beginning of the segment. That prevents a broken
270 : * segment (i.e., with no records in the first half of a segment) from
271 : * being created by XLOG streaming, which might cause trouble later on if
272 : * the segment is e.g archived.
273 : */
274 370 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
275 370 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
276 :
277 370 : SpinLockAcquire(&walrcv->mutex);
278 :
279 : /* It better be stopped if we try to restart it */
280 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
281 : walrcv->walRcvState == WALRCV_WAITING);
282 :
283 370 : if (conninfo != NULL)
284 370 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
285 : else
286 0 : walrcv->conninfo[0] = '\0';
287 :
288 : /*
289 : * Use configured replication slot if present, and ignore the value of
290 : * create_temp_slot as the slot name should be persistent. Otherwise, use
291 : * create_temp_slot to determine whether this WAL receiver should create a
292 : * temporary slot by itself and use it, or not.
293 : */
294 370 : if (slotname != NULL && slotname[0] != '\0')
295 : {
296 106 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
297 106 : walrcv->is_temp_slot = false;
298 : }
299 : else
300 : {
301 264 : walrcv->slotname[0] = '\0';
302 264 : walrcv->is_temp_slot = create_temp_slot;
303 : }
304 :
305 370 : if (walrcv->walRcvState == WALRCV_STOPPED)
306 : {
307 358 : launch = true;
308 358 : walrcv->walRcvState = WALRCV_STARTING;
309 : }
310 : else
311 12 : walrcv->walRcvState = WALRCV_RESTARTING;
312 370 : walrcv->startTime = now;
313 :
314 : /*
315 : * If this is the first startup of walreceiver (on this timeline),
316 : * initialize flushedUpto and latestChunkStart to the starting point.
317 : */
318 370 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
319 : {
320 198 : walrcv->flushedUpto = recptr;
321 198 : walrcv->receivedTLI = tli;
322 198 : walrcv->latestChunkStart = recptr;
323 : }
324 370 : walrcv->receiveStart = recptr;
325 370 : walrcv->receiveStartTLI = tli;
326 :
327 370 : walrcv_proc = walrcv->procno;
328 :
329 370 : SpinLockRelease(&walrcv->mutex);
330 :
331 370 : if (launch)
332 358 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
333 12 : else if (walrcv_proc != INVALID_PROC_NUMBER)
334 12 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
335 370 : }
336 :
337 : /*
338 : * Returns the last+1 byte position that walreceiver has flushed.
339 : *
340 : * Optionally, returns the previous chunk start, that is the first byte
341 : * written in the most recent walreceiver flush cycle. Callers not
342 : * interested in that value may pass NULL for latestChunkStart. Same for
343 : * receiveTLI.
344 : */
345 : XLogRecPtr
346 17730 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
347 : {
348 17730 : WalRcvData *walrcv = WalRcv;
349 : XLogRecPtr recptr;
350 :
351 17730 : SpinLockAcquire(&walrcv->mutex);
352 17730 : recptr = walrcv->flushedUpto;
353 17730 : if (latestChunkStart)
354 15696 : *latestChunkStart = walrcv->latestChunkStart;
355 17730 : if (receiveTLI)
356 17354 : *receiveTLI = walrcv->receivedTLI;
357 17730 : SpinLockRelease(&walrcv->mutex);
358 :
359 17730 : return recptr;
360 : }
361 :
362 : /*
363 : * Returns the last+1 byte position that walreceiver has written.
364 : * This returns a recently written value without taking a lock.
365 : */
366 : XLogRecPtr
367 0 : GetWalRcvWriteRecPtr(void)
368 : {
369 0 : WalRcvData *walrcv = WalRcv;
370 :
371 0 : return pg_atomic_read_u64(&walrcv->writtenUpto);
372 : }
373 :
374 : /*
375 : * Returns the replication apply delay in ms or -1
376 : * if the apply delay info is not available
377 : */
378 : int
379 746 : GetReplicationApplyDelay(void)
380 : {
381 746 : WalRcvData *walrcv = WalRcv;
382 : XLogRecPtr receivePtr;
383 : XLogRecPtr replayPtr;
384 : TimestampTz chunkReplayStartTime;
385 :
386 746 : SpinLockAcquire(&walrcv->mutex);
387 746 : receivePtr = walrcv->flushedUpto;
388 746 : SpinLockRelease(&walrcv->mutex);
389 :
390 746 : replayPtr = GetXLogReplayRecPtr(NULL);
391 :
392 746 : if (receivePtr == replayPtr)
393 124 : return 0;
394 :
395 622 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
396 :
397 622 : if (chunkReplayStartTime == 0)
398 2 : return -1;
399 :
400 620 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
401 : GetCurrentTimestamp());
402 : }
403 :
404 : /*
405 : * Returns the network latency in ms, note that this includes any
406 : * difference in clock settings between the servers, as well as timezone.
407 : */
408 : int
409 746 : GetReplicationTransferLatency(void)
410 : {
411 746 : WalRcvData *walrcv = WalRcv;
412 : TimestampTz lastMsgSendTime;
413 : TimestampTz lastMsgReceiptTime;
414 :
415 746 : SpinLockAcquire(&walrcv->mutex);
416 746 : lastMsgSendTime = walrcv->lastMsgSendTime;
417 746 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
418 746 : SpinLockRelease(&walrcv->mutex);
419 :
420 746 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
421 : lastMsgReceiptTime);
422 : }
|