Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiverfuncs.c
4 : *
5 : * This file contains functions used by the startup process to communicate
6 : * with the walreceiver process. Functions implementing walreceiver itself
7 : * are in walreceiver.c.
8 : *
9 : * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/walreceiverfuncs.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <sys/stat.h>
20 : #include <sys/time.h>
21 : #include <time.h>
22 : #include <unistd.h>
23 : #include <signal.h>
24 :
25 : #include "access/xlog_internal.h"
26 : #include "access/xlogrecovery.h"
27 : #include "pgstat.h"
28 : #include "replication/walreceiver.h"
29 : #include "storage/pmsignal.h"
30 : #include "storage/proc.h"
31 : #include "storage/shmem.h"
32 : #include "utils/timestamp.h"
33 : #include "utils/wait_event.h"
34 :
35 : WalRcvData *WalRcv = NULL;
36 :
37 : /*
38 : * How long to wait for walreceiver to start up after requesting
39 : * postmaster to launch it. In seconds.
40 : */
41 : #define WALRCV_STARTUP_TIMEOUT 10
42 :
43 : /* Report shared memory space needed by WalRcvShmemInit */
44 : Size
45 4483 : WalRcvShmemSize(void)
46 : {
47 4483 : Size size = 0;
48 :
49 4483 : size = add_size(size, sizeof(WalRcvData));
50 :
51 4483 : return size;
52 : }
53 :
54 : /* Allocate and initialize walreceiver-related shared memory */
55 : void
56 1159 : WalRcvShmemInit(void)
57 : {
58 : bool found;
59 :
60 1159 : WalRcv = (WalRcvData *)
61 1159 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
62 :
63 1159 : if (!found)
64 : {
65 : /* First time through, so initialize */
66 1159 : MemSet(WalRcv, 0, WalRcvShmemSize());
67 1159 : WalRcv->walRcvState = WALRCV_STOPPED;
68 1159 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
69 1159 : SpinLockInit(&WalRcv->mutex);
70 1159 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
71 1159 : WalRcv->procno = INVALID_PROC_NUMBER;
72 : }
73 1159 : }
74 :
75 : /* Is walreceiver running (or starting up)? */
76 : bool
77 1055 : WalRcvRunning(void)
78 : {
79 1055 : WalRcvData *walrcv = WalRcv;
80 : WalRcvState state;
81 : pg_time_t startTime;
82 :
83 1055 : SpinLockAcquire(&walrcv->mutex);
84 :
85 1055 : state = walrcv->walRcvState;
86 1055 : startTime = walrcv->startTime;
87 :
88 1055 : SpinLockRelease(&walrcv->mutex);
89 :
90 : /*
91 : * If it has taken too long for walreceiver to start up, give up. Setting
92 : * the state to STOPPED ensures that if walreceiver later does start up
93 : * after all, it will see that it's not supposed to be running and die
94 : * without doing anything.
95 : */
96 1055 : if (state == WALRCV_STARTING)
97 : {
98 0 : pg_time_t now = (pg_time_t) time(NULL);
99 :
100 0 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
101 : {
102 0 : bool stopped = false;
103 :
104 0 : SpinLockAcquire(&walrcv->mutex);
105 0 : if (walrcv->walRcvState == WALRCV_STARTING)
106 : {
107 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
108 0 : stopped = true;
109 : }
110 0 : SpinLockRelease(&walrcv->mutex);
111 :
112 0 : if (stopped)
113 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
114 : }
115 : }
116 :
117 1055 : if (state != WALRCV_STOPPED)
118 41 : return true;
119 : else
120 1014 : return false;
121 : }
122 :
123 : /* Return the state of the walreceiver. */
124 : WalRcvState
125 0 : WalRcvGetState(void)
126 : {
127 0 : WalRcvData *walrcv = WalRcv;
128 : WalRcvState state;
129 :
130 0 : SpinLockAcquire(&walrcv->mutex);
131 0 : state = walrcv->walRcvState;
132 0 : SpinLockRelease(&walrcv->mutex);
133 :
134 0 : return state;
135 : }
136 :
137 : /*
138 : * Is walreceiver running and streaming (or at least attempting to connect,
139 : * or starting up)?
140 : */
141 : bool
142 32027 : WalRcvStreaming(void)
143 : {
144 32027 : WalRcvData *walrcv = WalRcv;
145 : WalRcvState state;
146 : pg_time_t startTime;
147 :
148 32027 : SpinLockAcquire(&walrcv->mutex);
149 :
150 32027 : state = walrcv->walRcvState;
151 32027 : startTime = walrcv->startTime;
152 :
153 32027 : SpinLockRelease(&walrcv->mutex);
154 :
155 : /*
156 : * If it has taken too long for walreceiver to start up, give up. Setting
157 : * the state to STOPPED ensures that if walreceiver later does start up
158 : * after all, it will see that it's not supposed to be running and die
159 : * without doing anything.
160 : */
161 32027 : if (state == WALRCV_STARTING)
162 : {
163 252 : pg_time_t now = (pg_time_t) time(NULL);
164 :
165 252 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
166 : {
167 0 : bool stopped = false;
168 :
169 0 : SpinLockAcquire(&walrcv->mutex);
170 0 : if (walrcv->walRcvState == WALRCV_STARTING)
171 : {
172 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
173 0 : stopped = true;
174 : }
175 0 : SpinLockRelease(&walrcv->mutex);
176 :
177 0 : if (stopped)
178 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
179 : }
180 : }
181 :
182 32027 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
183 276 : state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
184 31757 : return true;
185 : else
186 270 : return false;
187 : }
188 :
189 : /*
190 : * Stop walreceiver (if running) and wait for it to die.
191 : * Executed by the Startup process.
192 : */
193 : void
194 1010 : ShutdownWalRcv(void)
195 : {
196 1010 : WalRcvData *walrcv = WalRcv;
197 1010 : pid_t walrcvpid = 0;
198 1010 : bool stopped = false;
199 :
200 : /*
201 : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
202 : * mode once it's finished, and will also request postmaster to not
203 : * restart itself.
204 : */
205 1010 : SpinLockAcquire(&walrcv->mutex);
206 1010 : switch (walrcv->walRcvState)
207 : {
208 969 : case WALRCV_STOPPED:
209 969 : break;
210 1 : case WALRCV_STARTING:
211 1 : walrcv->walRcvState = WALRCV_STOPPED;
212 1 : stopped = true;
213 1 : break;
214 :
215 40 : case WALRCV_CONNECTING:
216 : case WALRCV_STREAMING:
217 : case WALRCV_WAITING:
218 : case WALRCV_RESTARTING:
219 40 : walrcv->walRcvState = WALRCV_STOPPING;
220 : pg_fallthrough;
221 40 : case WALRCV_STOPPING:
222 40 : walrcvpid = walrcv->pid;
223 40 : break;
224 : }
225 1010 : SpinLockRelease(&walrcv->mutex);
226 :
227 : /* Unnecessary but consistent. */
228 1010 : if (stopped)
229 1 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
230 :
231 : /*
232 : * Signal walreceiver process if it was still running.
233 : */
234 1010 : if (walrcvpid != 0)
235 40 : kill(walrcvpid, SIGTERM);
236 :
237 : /*
238 : * Wait for walreceiver to acknowledge its death by setting state to
239 : * WALRCV_STOPPED.
240 : */
241 1010 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
242 1044 : while (WalRcvRunning())
243 34 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
244 : WAIT_EVENT_WAL_RECEIVER_EXIT);
245 1010 : ConditionVariableCancelSleep();
246 1010 : }
247 :
248 : /*
249 : * Request postmaster to start walreceiver.
250 : *
251 : * "recptr" indicates the position where streaming should begin. "conninfo"
252 : * is a libpq connection string to use. "slotname" is, optionally, the name
253 : * of a replication slot to acquire. "create_temp_slot" indicates to create
254 : * a temporary slot when no "slotname" is given.
255 : *
256 : * WAL receivers do not directly load GUC parameters used for the connection
257 : * to the primary, and rely on the values passed down by the caller of this
258 : * routine instead. Hence, the addition of any new parameters should happen
259 : * through this code path.
260 : */
261 : void
262 178 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
263 : const char *slotname, bool create_temp_slot)
264 : {
265 178 : WalRcvData *walrcv = WalRcv;
266 178 : bool launch = false;
267 178 : pg_time_t now = (pg_time_t) time(NULL);
268 : ProcNumber walrcv_proc;
269 :
270 : /*
271 : * We always start at the beginning of the segment. That prevents a broken
272 : * segment (i.e., with no records in the first half of a segment) from
273 : * being created by XLOG streaming, which might cause trouble later on if
274 : * the segment is e.g archived.
275 : */
276 178 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
277 178 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
278 :
279 178 : SpinLockAcquire(&walrcv->mutex);
280 :
281 : /* It better be stopped if we try to restart it */
282 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
283 : walrcv->walRcvState == WALRCV_WAITING);
284 :
285 178 : if (conninfo != NULL)
286 178 : strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
287 : else
288 0 : walrcv->conninfo[0] = '\0';
289 :
290 : /*
291 : * Use configured replication slot if present, and ignore the value of
292 : * create_temp_slot as the slot name should be persistent. Otherwise, use
293 : * create_temp_slot to determine whether this WAL receiver should create a
294 : * temporary slot by itself and use it, or not.
295 : */
296 178 : if (slotname != NULL && slotname[0] != '\0')
297 : {
298 51 : strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
299 51 : walrcv->is_temp_slot = false;
300 : }
301 : else
302 : {
303 127 : walrcv->slotname[0] = '\0';
304 127 : walrcv->is_temp_slot = create_temp_slot;
305 : }
306 :
307 178 : if (walrcv->walRcvState == WALRCV_STOPPED)
308 : {
309 171 : launch = true;
310 171 : walrcv->walRcvState = WALRCV_STARTING;
311 : }
312 : else
313 7 : walrcv->walRcvState = WALRCV_RESTARTING;
314 178 : walrcv->startTime = now;
315 :
316 : /*
317 : * If this is the first startup of walreceiver (on this timeline),
318 : * initialize flushedUpto and latestChunkStart to the starting point.
319 : */
320 178 : if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
321 : {
322 107 : walrcv->flushedUpto = recptr;
323 107 : walrcv->receivedTLI = tli;
324 107 : walrcv->latestChunkStart = recptr;
325 : }
326 178 : walrcv->receiveStart = recptr;
327 178 : walrcv->receiveStartTLI = tli;
328 :
329 178 : walrcv_proc = walrcv->procno;
330 :
331 178 : SpinLockRelease(&walrcv->mutex);
332 :
333 178 : if (launch)
334 171 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
335 7 : else if (walrcv_proc != INVALID_PROC_NUMBER)
336 7 : SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
337 178 : }
338 :
339 : /*
340 : * Returns the last+1 byte position that walreceiver has flushed.
341 : *
342 : * Optionally, returns the previous chunk start, that is the first byte
343 : * written in the most recent walreceiver flush cycle. Callers not
344 : * interested in that value may pass NULL for latestChunkStart. Same for
345 : * receiveTLI.
346 : */
347 : XLogRecPtr
348 31235 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
349 : {
350 31235 : WalRcvData *walrcv = WalRcv;
351 : XLogRecPtr recptr;
352 :
353 31235 : SpinLockAcquire(&walrcv->mutex);
354 31235 : recptr = walrcv->flushedUpto;
355 31235 : if (latestChunkStart)
356 30005 : *latestChunkStart = walrcv->latestChunkStart;
357 31235 : if (receiveTLI)
358 31000 : *receiveTLI = walrcv->receivedTLI;
359 31235 : SpinLockRelease(&walrcv->mutex);
360 :
361 31235 : return recptr;
362 : }
363 :
364 : /*
365 : * Returns the last+1 byte position that walreceiver has written.
366 : * This returns a recently written value without taking a lock.
367 : */
368 : XLogRecPtr
369 26 : GetWalRcvWriteRecPtr(void)
370 : {
371 26 : WalRcvData *walrcv = WalRcv;
372 :
373 26 : return pg_atomic_read_u64(&walrcv->writtenUpto);
374 : }
375 :
376 : /*
377 : * Returns the replication apply delay in ms or -1
378 : * if the apply delay info is not available
379 : */
380 : int
381 396 : GetReplicationApplyDelay(void)
382 : {
383 396 : WalRcvData *walrcv = WalRcv;
384 : XLogRecPtr receivePtr;
385 : XLogRecPtr replayPtr;
386 : TimestampTz chunkReplayStartTime;
387 :
388 396 : SpinLockAcquire(&walrcv->mutex);
389 396 : receivePtr = walrcv->flushedUpto;
390 396 : SpinLockRelease(&walrcv->mutex);
391 :
392 396 : replayPtr = GetXLogReplayRecPtr(NULL);
393 :
394 396 : if (receivePtr == replayPtr)
395 103 : return 0;
396 :
397 293 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
398 :
399 293 : if (chunkReplayStartTime == 0)
400 12 : return -1;
401 :
402 281 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
403 : GetCurrentTimestamp());
404 : }
405 :
406 : /*
407 : * Returns the network latency in ms, note that this includes any
408 : * difference in clock settings between the servers, as well as timezone.
409 : */
410 : int
411 396 : GetReplicationTransferLatency(void)
412 : {
413 396 : WalRcvData *walrcv = WalRcv;
414 : TimestampTz lastMsgSendTime;
415 : TimestampTz lastMsgReceiptTime;
416 :
417 396 : SpinLockAcquire(&walrcv->mutex);
418 396 : lastMsgSendTime = walrcv->lastMsgSendTime;
419 396 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
420 396 : SpinLockRelease(&walrcv->mutex);
421 :
422 396 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
423 : lastMsgReceiptTime);
424 : }
|