Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiverfuncs.c
4 : *
5 : * This file contains functions used by the startup process to communicate
6 : * with the walreceiver process. Functions implementing walreceiver itself
7 : * are in walreceiver.c.
8 : *
9 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/walreceiverfuncs.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <sys/stat.h>
20 : #include <sys/time.h>
21 : #include <time.h>
22 : #include <unistd.h>
23 : #include <signal.h>
24 :
25 : #include "access/xlog_internal.h"
26 : #include "access/xlogrecovery.h"
27 : #include "pgstat.h"
28 : #include "replication/walreceiver.h"
29 : #include "storage/pmsignal.h"
30 : #include "storage/proc.h"
31 : #include "storage/shmem.h"
32 : #include "utils/timestamp.h"
33 : #include "utils/wait_event.h"
34 :
35 : WalRcvData *WalRcv = NULL;
36 :
37 : /*
38 : * How long to wait for walreceiver to start up after requesting
39 : * postmaster to launch it. In seconds.
40 : */
41 : #define WALRCV_STARTUP_TIMEOUT 10
42 :
43 : /* Report shared memory space needed by WalRcvShmemInit */
44 : Size
45 4563 : WalRcvShmemSize(void)
46 : {
47 4563 : Size size = 0;
48 :
49 4563 : size = add_size(size, sizeof(WalRcvData));
50 :
51 4563 : return size;
52 : }
53 :
54 : /* Allocate and initialize walreceiver-related shared memory */
55 : void
56 1179 : WalRcvShmemInit(void)
57 : {
58 : bool found;
59 :
60 1179 : WalRcv = (WalRcvData *)
61 1179 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
62 :
63 1179 : if (!found)
64 : {
65 : /* First time through, so initialize */
66 1179 : MemSet(WalRcv, 0, WalRcvShmemSize());
67 1179 : WalRcv->walRcvState = WALRCV_STOPPED;
68 1179 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
69 1179 : SpinLockInit(&WalRcv->mutex);
70 1179 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
71 1179 : WalRcv->procno = INVALID_PROC_NUMBER;
72 : }
73 1179 : }
74 :
75 : /* Is walreceiver running (or starting up)? */
76 : bool
77 1079 : WalRcvRunning(void)
78 : {
79 1079 : WalRcvData *walrcv = WalRcv;
80 : WalRcvState state;
81 : pg_time_t startTime;
82 :
83 1079 : SpinLockAcquire(&walrcv->mutex);
84 :
85 1079 : state = walrcv->walRcvState;
86 1079 : startTime = walrcv->startTime;
87 :
88 1079 : SpinLockRelease(&walrcv->mutex);
89 :
90 : /*
91 : * If it has taken too long for walreceiver to start up, give up. Setting
92 : * the state to STOPPED ensures that if walreceiver later does start up
93 : * after all, it will see that it's not supposed to be running and die
94 : * without doing anything.
95 : */
96 1079 : if (state == WALRCV_STARTING)
97 : {
98 0 : pg_time_t now = (pg_time_t) time(NULL);
99 :
100 0 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
101 : {
102 0 : bool stopped = false;
103 :
104 0 : SpinLockAcquire(&walrcv->mutex);
105 0 : if (walrcv->walRcvState == WALRCV_STARTING)
106 : {
107 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
108 0 : stopped = true;
109 : }
110 0 : SpinLockRelease(&walrcv->mutex);
111 :
112 0 : if (stopped)
113 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
114 : }
115 : }
116 :
117 1079 : if (state != WALRCV_STOPPED)
118 43 : return true;
119 : else
120 1036 : return false;
121 : }
122 :
123 : /* Return the state of the walreceiver. */
124 : WalRcvState
125 0 : WalRcvGetState(void)
126 : {
127 0 : WalRcvData *walrcv = WalRcv;
128 : WalRcvState state;
129 :
130 0 : SpinLockAcquire(&walrcv->mutex);
131 0 : state = walrcv->walRcvState;
132 0 : SpinLockRelease(&walrcv->mutex);
133 :
134 0 : return state;
135 : }
136 :
137 : /*
138 : * Is walreceiver running and streaming (or at least attempting to connect,
139 : * or starting up)?
140 : */
141 : bool
142 33978 : WalRcvStreaming(void)
143 : {
144 33978 : WalRcvData *walrcv = WalRcv;
145 : WalRcvState state;
146 : pg_time_t startTime;
147 :
148 33978 : SpinLockAcquire(&walrcv->mutex);
149 :
150 33978 : state = walrcv->walRcvState;
151 33978 : startTime = walrcv->startTime;
152 :
153 33978 : SpinLockRelease(&walrcv->mutex);
154 :
155 : /*
156 : * If it has taken too long for walreceiver to start up, give up. Setting
157 : * the state to STOPPED ensures that if walreceiver later does start up
158 : * after all, it will see that it's not supposed to be running and die
159 : * without doing anything.
160 : */
161 33978 : if (state == WALRCV_STARTING)
162 : {
163 290 : pg_time_t now = (pg_time_t) time(NULL);
164 :
165 290 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
166 : {
167 0 : bool stopped = false;
168 :
169 0 : SpinLockAcquire(&walrcv->mutex);
170 0 : if (walrcv->walRcvState == WALRCV_STARTING)
171 : {
172 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
173 0 : stopped = true;
174 : }
175 0 : SpinLockRelease(&walrcv->mutex);
176 :
177 0 : if (stopped)
178 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
179 : }
180 : }
181 :
182 33978 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
183 305 : state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
184 33680 : return true;
185 : else
186 298 : return false;
187 : }
188 :
189 : /*
190 : * Stop walreceiver (if running) and wait for it to die.
191 : * Executed by the Startup process.
192 : */
193 : void
194 1032 : ShutdownWalRcv(void)
195 : {
196 1032 : WalRcvData *walrcv = WalRcv;
197 1032 : pid_t walrcvpid = 0;
198 1032 : bool stopped = false;
199 :
200 : /*
201 : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
202 : * mode once it's finished, and will also request postmaster to not
203 : * restart itself.
204 : */
205 1032 : SpinLockAcquire(&walrcv->mutex);
206 1032 : switch (walrcv->walRcvState)
207 : {
208 989 : case WALRCV_STOPPED:
209 989 : break;
210 4 : case WALRCV_STARTING:
211 4 : walrcv->walRcvState = WALRCV_STOPPED;
212 4 : stopped = true;
213 4 : break;
214 :
215 39 : case WALRCV_CONNECTING:
216 : case WALRCV_STREAMING:
217 : case WALRCV_WAITING:
218 : case WALRCV_RESTARTING:
219 39 : walrcv->walRcvState = WALRCV_STOPPING;
220 : pg_fallthrough;
221 39 : case WALRCV_STOPPING:
222 39 : walrcvpid = walrcv->pid;
223 39 : break;
224 : }
225 1032 : SpinLockRelease(&walrcv->mutex);
226 :
227 : /* Unnecessary but consistent. */
228 1032 : if (stopped)
229 4 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
230 :
231 : /*
232 : * Signal walreceiver process if it was still running.
233 : */
234 1032 : if (walrcvpid != 0)
235 39 : kill(walrcvpid, SIGTERM);
236 :
237 : /*
238 : * Wait for walreceiver to acknowledge its death by setting state to
239 : * WALRCV_STOPPED.
240 : */
241 1032 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
242 1068 : while (WalRcvRunning())
243 36 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
244 : WAIT_EVENT_WAL_RECEIVER_EXIT);
245 1032 : ConditionVariableCancelSleep();
246 1032 : }
247 :
248 : /*
249 : * Request postmaster to start walreceiver.
250 : *
251 : * "recptr" indicates the position where streaming should begin. "conninfo"
252 : * is a libpq connection string to use. "slotname" is, optionally, the name
253 : * of a replication slot to acquire. "create_temp_slot" indicates to create
254 : * a temporary slot when no "slotname" is given.
255 : *
256 : * WAL receivers do not directly load GUC parameters used for the connection
257 : * to the primary, and rely on the values passed down by the caller of this
258 : * routine instead. Hence, the addition of any new parameters should happen
259 : * through this code path.
260 : */
261 : void
262 192 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
263 : const char *slotname, bool create_temp_slot)
264 : {
265 192 : WalRcvData *walrcv = WalRcv;
266 192 : bool launch = false;
267 192 : pg_time_t now = (pg_time_t) time(NULL);
268 : ProcNumber walrcv_proc;
269 :
270 : /*
271 : * We always start at the beginning of the segment. That prevents a broken
272 : * segment (i.e., with no records in the first half of a segment) from
273 : * being created by XLOG streaming, which might cause trouble later on if
274 : * the segment is e.g archived.
275 : */
276 192 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
277 192 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
278 :
279 192 : SpinLockAcquire(&walrcv->mutex);
280 :
281 : /* It better be stopped if we try to restart it */
282 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
283 : walrcv->walRcvState == WALRCV_WAITING);
284 :
285 192 : if (conninfo != NULL)
286 192 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
287 : else
288 0 : walrcv->conninfo[0] = '\0';
289 :
290 : /*
291 : * Use configured replication slot if present, and ignore the value of
292 : * create_temp_slot as the slot name should be persistent. Otherwise, use
293 : * create_temp_slot to determine whether this WAL receiver should create a
294 : * temporary slot by itself and use it, or not.
295 : */
296 192 : if (slotname != NULL && slotname[0] != '\0')
297 : {
298 54 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
299 54 : walrcv->is_temp_slot = false;
300 : }
301 : else
302 : {
303 138 : walrcv->slotname[0] = '\0';
304 138 : walrcv->is_temp_slot = create_temp_slot;
305 : }
306 :
307 192 : if (walrcv->walRcvState == WALRCV_STOPPED)
308 : {
309 185 : launch = true;
310 185 : walrcv->walRcvState = WALRCV_STARTING;
311 : }
312 : else
313 7 : walrcv->walRcvState = WALRCV_RESTARTING;
314 192 : walrcv->startTime = now;
315 :
316 : /*
317 : * If this is the first startup of walreceiver (on this timeline),
318 : * initialize flushedUpto and latestChunkStart to the starting point.
319 : */
320 192 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
321 : {
322 107 : walrcv->flushedUpto = recptr;
323 107 : walrcv->receivedTLI = tli;
324 107 : walrcv->latestChunkStart = recptr;
325 : }
326 192 : walrcv->receiveStart = recptr;
327 192 : walrcv->receiveStartTLI = tli;
328 :
329 192 : walrcv_proc = walrcv->procno;
330 :
331 192 : SpinLockRelease(&walrcv->mutex);
332 :
333 192 : if (launch)
334 185 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
335 7 : else if (walrcv_proc != INVALID_PROC_NUMBER)
336 7 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
337 192 : }
338 :
339 : /*
340 : * Returns the last+1 byte position that walreceiver has flushed.
341 : *
342 : * Optionally, returns the previous chunk start, that is the first byte
343 : * written in the most recent walreceiver flush cycle. Callers not
344 : * interested in that value may pass NULL for latestChunkStart. Same for
345 : * receiveTLI.
346 : */
347 : XLogRecPtr
348 33150 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
349 : {
350 33150 : WalRcvData *walrcv = WalRcv;
351 : XLogRecPtr recptr;
352 :
353 33150 : SpinLockAcquire(&walrcv->mutex);
354 33150 : recptr = walrcv->flushedUpto;
355 33150 : if (latestChunkStart)
356 31929 : *latestChunkStart = walrcv->latestChunkStart;
357 33150 : if (receiveTLI)
358 32912 : *receiveTLI = walrcv->receivedTLI;
359 33150 : SpinLockRelease(&walrcv->mutex);
360 :
361 33150 : return recptr;
362 : }
363 :
364 : /*
365 : * Returns the last+1 byte position that walreceiver has written.
366 : * This returns a recently written value without taking a lock.
367 : */
368 : XLogRecPtr
369 26 : GetWalRcvWriteRecPtr(void)
370 : {
371 26 : WalRcvData *walrcv = WalRcv;
372 :
373 26 : return pg_atomic_read_u64(&walrcv->writtenUpto);
374 : }
375 :
376 : /*
377 : * Returns the replication apply delay in ms or -1
378 : * if the apply delay info is not available
379 : */
380 : int
381 407 : GetReplicationApplyDelay(void)
382 : {
383 407 : WalRcvData *walrcv = WalRcv;
384 : XLogRecPtr receivePtr;
385 : XLogRecPtr replayPtr;
386 : TimestampTz chunkReplayStartTime;
387 :
388 407 : SpinLockAcquire(&walrcv->mutex);
389 407 : receivePtr = walrcv->flushedUpto;
390 407 : SpinLockRelease(&walrcv->mutex);
391 :
392 407 : replayPtr = GetXLogReplayRecPtr(NULL);
393 :
394 407 : if (receivePtr == replayPtr)
395 138 : return 0;
396 :
397 269 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
398 :
399 269 : if (chunkReplayStartTime == 0)
400 1 : return -1;
401 :
402 268 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
403 : GetCurrentTimestamp());
404 : }
405 :
406 : /*
407 : * Returns the network latency in ms, note that this includes any
408 : * difference in clock settings between the servers, as well as timezone.
409 : */
410 : int
411 407 : GetReplicationTransferLatency(void)
412 : {
413 407 : WalRcvData *walrcv = WalRcv;
414 : TimestampTz lastMsgSendTime;
415 : TimestampTz lastMsgReceiptTime;
416 :
417 407 : SpinLockAcquire(&walrcv->mutex);
418 407 : lastMsgSendTime = walrcv->lastMsgSendTime;
419 407 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
420 407 : SpinLockRelease(&walrcv->mutex);
421 :
422 407 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
423 : lastMsgReceiptTime);
424 : }
|