Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * walreceiverfuncs.c
4 : *
5 : * This file contains functions used by the startup process to communicate
6 : * with the walreceiver process. Functions implementing walreceiver itself
7 : * are in walreceiver.c.
8 : *
9 : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
10 : *
11 : *
12 : * IDENTIFICATION
13 : * src/backend/replication/walreceiverfuncs.c
14 : *
15 : *-------------------------------------------------------------------------
16 : */
17 : #include "postgres.h"
18 :
19 : #include <sys/stat.h>
20 : #include <sys/time.h>
21 : #include <time.h>
22 : #include <unistd.h>
23 : #include <signal.h>
24 :
25 : #include "access/xlog_internal.h"
26 : #include "access/xlogrecovery.h"
27 : #include "pgstat.h"
28 : #include "replication/walreceiver.h"
29 : #include "storage/pmsignal.h"
30 : #include "storage/shmem.h"
31 : #include "utils/timestamp.h"
32 :
33 : WalRcvData *WalRcv = NULL;
34 :
35 : /*
36 : * How long to wait for walreceiver to start up after requesting
37 : * postmaster to launch it. In seconds.
38 : */
39 : #define WALRCV_STARTUP_TIMEOUT 10
40 :
41 : /* Report shared memory space needed by WalRcvShmemInit */
42 : Size
43 7002 : WalRcvShmemSize(void)
44 : {
45 7002 : Size size = 0;
46 :
47 7002 : size = add_size(size, sizeof(WalRcvData));
48 :
49 7002 : return size;
50 : }
51 :
52 : /* Allocate and initialize walreceiver-related shared memory */
53 : void
54 1810 : WalRcvShmemInit(void)
55 : {
56 : bool found;
57 :
58 1810 : WalRcv = (WalRcvData *)
59 1810 : ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
60 :
61 1810 : if (!found)
62 : {
63 : /* First time through, so initialize */
64 1810 : MemSet(WalRcv, 0, WalRcvShmemSize());
65 1810 : WalRcv->walRcvState = WALRCV_STOPPED;
66 1810 : ConditionVariableInit(&WalRcv->walRcvStoppedCV);
67 1810 : SpinLockInit(&WalRcv->mutex);
68 1810 : pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
69 1810 : WalRcv->latch = NULL;
70 : }
71 1810 : }
72 :
73 : /* Is walreceiver running (or starting up)? */
74 : bool
75 1808 : WalRcvRunning(void)
76 : {
77 1808 : WalRcvData *walrcv = WalRcv;
78 : WalRcvState state;
79 : pg_time_t startTime;
80 :
81 1808 : SpinLockAcquire(&walrcv->mutex);
82 :
83 1808 : state = walrcv->walRcvState;
84 1808 : startTime = walrcv->startTime;
85 :
86 1808 : SpinLockRelease(&walrcv->mutex);
87 :
88 : /*
89 : * If it has taken too long for walreceiver to start up, give up. Setting
90 : * the state to STOPPED ensures that if walreceiver later does start up
91 : * after all, it will see that it's not supposed to be running and die
92 : * without doing anything.
93 : */
94 1808 : if (state == WALRCV_STARTING)
95 : {
96 0 : pg_time_t now = (pg_time_t) time(NULL);
97 :
98 0 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
99 : {
100 0 : bool stopped = false;
101 :
102 0 : SpinLockAcquire(&walrcv->mutex);
103 0 : if (walrcv->walRcvState == WALRCV_STARTING)
104 : {
105 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
106 0 : stopped = true;
107 : }
108 0 : SpinLockRelease(&walrcv->mutex);
109 :
110 0 : if (stopped)
111 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
112 : }
113 : }
114 :
115 1808 : if (state != WALRCV_STOPPED)
116 74 : return true;
117 : else
118 1734 : return false;
119 : }
120 :
121 : /*
122 : * Is walreceiver running and streaming (or at least attempting to connect,
123 : * or starting up)?
124 : */
125 : bool
126 60666 : WalRcvStreaming(void)
127 : {
128 60666 : WalRcvData *walrcv = WalRcv;
129 : WalRcvState state;
130 : pg_time_t startTime;
131 :
132 60666 : SpinLockAcquire(&walrcv->mutex);
133 :
134 60666 : state = walrcv->walRcvState;
135 60666 : startTime = walrcv->startTime;
136 :
137 60666 : SpinLockRelease(&walrcv->mutex);
138 :
139 : /*
140 : * If it has taken too long for walreceiver to start up, give up. Setting
141 : * the state to STOPPED ensures that if walreceiver later does start up
142 : * after all, it will see that it's not supposed to be running and die
143 : * without doing anything.
144 : */
145 60666 : if (state == WALRCV_STARTING)
146 : {
147 438 : pg_time_t now = (pg_time_t) time(NULL);
148 :
149 438 : if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
150 : {
151 0 : bool stopped = false;
152 :
153 0 : SpinLockAcquire(&walrcv->mutex);
154 0 : if (walrcv->walRcvState == WALRCV_STARTING)
155 : {
156 0 : state = walrcv->walRcvState = WALRCV_STOPPED;
157 0 : stopped = true;
158 : }
159 0 : SpinLockRelease(&walrcv->mutex);
160 :
161 0 : if (stopped)
162 0 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
163 : }
164 : }
165 :
166 60666 : if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
167 : state == WALRCV_RESTARTING)
168 60488 : return true;
169 : else
170 178 : return false;
171 : }
172 :
173 : /*
174 : * Stop walreceiver (if running) and wait for it to die.
175 : * Executed by the Startup process.
176 : */
177 : void
178 1734 : ShutdownWalRcv(void)
179 : {
180 1734 : WalRcvData *walrcv = WalRcv;
181 1734 : pid_t walrcvpid = 0;
182 1734 : bool stopped = false;
183 :
184 : /*
185 : * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
186 : * mode once it's finished, and will also request postmaster to not
187 : * restart itself.
188 : */
189 1734 : SpinLockAcquire(&walrcv->mutex);
190 1734 : switch (walrcv->walRcvState)
191 : {
192 1656 : case WALRCV_STOPPED:
193 1656 : break;
194 10 : case WALRCV_STARTING:
195 10 : walrcv->walRcvState = WALRCV_STOPPED;
196 10 : stopped = true;
197 10 : break;
198 :
199 68 : case WALRCV_STREAMING:
200 : case WALRCV_WAITING:
201 : case WALRCV_RESTARTING:
202 68 : walrcv->walRcvState = WALRCV_STOPPING;
203 : /* fall through */
204 68 : case WALRCV_STOPPING:
205 68 : walrcvpid = walrcv->pid;
206 68 : break;
207 : }
208 1734 : SpinLockRelease(&walrcv->mutex);
209 :
210 : /* Unnecessary but consistent. */
211 1734 : if (stopped)
212 10 : ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
213 :
214 : /*
215 : * Signal walreceiver process if it was still running.
216 : */
217 1734 : if (walrcvpid != 0)
218 68 : kill(walrcvpid, SIGTERM);
219 :
220 : /*
221 : * Wait for walreceiver to acknowledge its death by setting state to
222 : * WALRCV_STOPPED.
223 : */
224 1734 : ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
225 1802 : while (WalRcvRunning())
226 68 : ConditionVariableSleep(&walrcv->walRcvStoppedCV,
227 : WAIT_EVENT_WAL_RECEIVER_EXIT);
228 1734 : ConditionVariableCancelSleep();
229 1734 : }
230 :
231 : /*
232 : * Request postmaster to start walreceiver.
233 : *
234 : * "recptr" indicates the position where streaming should begin. "conninfo"
235 : * is a libpq connection string to use. "slotname" is, optionally, the name
236 : * of a replication slot to acquire. "create_temp_slot" indicates to create
237 : * a temporary slot when no "slotname" is given.
238 : *
239 : * WAL receivers do not directly load GUC parameters used for the connection
240 : * to the primary, and rely on the values passed down by the caller of this
241 : * routine instead. Hence, the addition of any new parameters should happen
242 : * through this code path.
243 : */
244 : void
245 292 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
246 : const char *slotname, bool create_temp_slot)
247 : {
248 292 : WalRcvData *walrcv = WalRcv;
249 292 : bool launch = false;
250 292 : pg_time_t now = (pg_time_t) time(NULL);
251 : Latch *latch;
252 :
253 : /*
254 : * We always start at the beginning of the segment. That prevents a broken
255 : * segment (i.e., with no records in the first half of a segment) from
256 : * being created by XLOG streaming, which might cause trouble later on if
257 : * the segment is e.g archived.
258 : */
259 292 : if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
260 292 : recptr -= XLogSegmentOffset(recptr, wal_segment_size);
261 :
262 292 : SpinLockAcquire(&walrcv->mutex);
263 :
264 : /* It better be stopped if we try to restart it */
265 : Assert(walrcv->walRcvState == WALRCV_STOPPED ||
266 : walrcv->walRcvState == WALRCV_WAITING);
267 :
268 292 : if (conninfo != NULL)
269 292 : strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
270 : else
271 0 : walrcv->conninfo[0] = '\0';
272 :
273 : /*
274 : * Use configured replication slot if present, and ignore the value of
275 : * create_temp_slot as the slot name should be persistent. Otherwise, use
276 : * create_temp_slot to determine whether this WAL receiver should create a
277 : * temporary slot by itself and use it, or not.
278 : */
279 292 : if (slotname != NULL && slotname[0] != '\0')
280 : {
281 90 : strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
282 90 : walrcv->is_temp_slot = false;
283 : }
284 : else
285 : {
286 202 : walrcv->slotname[0] = '\0';
287 202 : walrcv->is_temp_slot = create_temp_slot;
288 : }
289 :
290 292 : if (walrcv->walRcvState == WALRCV_STOPPED)
291 : {
292 292 : launch = true;
293 292 : walrcv->walRcvState = WALRCV_STARTING;
294 : }
295 : else
296 0 : walrcv->walRcvState = WALRCV_RESTARTING;
297 292 : walrcv->startTime = now;
298 :
299 : /*
300 : * If this is the first startup of walreceiver (on this timeline),
301 : * initialize flushedUpto and latestChunkStart to the starting point.
302 : */
303 292 : if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
304 : {
305 184 : walrcv->flushedUpto = recptr;
306 184 : walrcv->receivedTLI = tli;
307 184 : walrcv->latestChunkStart = recptr;
308 : }
309 292 : walrcv->receiveStart = recptr;
310 292 : walrcv->receiveStartTLI = tli;
311 :
312 292 : latch = walrcv->latch;
313 :
314 292 : SpinLockRelease(&walrcv->mutex);
315 :
316 292 : if (launch)
317 292 : SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
318 0 : else if (latch)
319 0 : SetLatch(latch);
320 292 : }
321 :
322 : /*
323 : * Returns the last+1 byte position that walreceiver has flushed.
324 : *
325 : * Optionally, returns the previous chunk start, that is the first byte
326 : * written in the most recent walreceiver flush cycle. Callers not
327 : * interested in that value may pass NULL for latestChunkStart. Same for
328 : * receiveTLI.
329 : */
330 : XLogRecPtr
331 62128 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
332 : {
333 62128 : WalRcvData *walrcv = WalRcv;
334 : XLogRecPtr recptr;
335 :
336 62128 : SpinLockAcquire(&walrcv->mutex);
337 62128 : recptr = walrcv->flushedUpto;
338 62128 : if (latestChunkStart)
339 60174 : *latestChunkStart = walrcv->latestChunkStart;
340 62128 : if (receiveTLI)
341 62042 : *receiveTLI = walrcv->receivedTLI;
342 62128 : SpinLockRelease(&walrcv->mutex);
343 :
344 62128 : return recptr;
345 : }
346 :
347 : /*
348 : * Returns the last+1 byte position that walreceiver has written.
349 : * This returns a recently written value without taking a lock.
350 : */
351 : XLogRecPtr
352 0 : GetWalRcvWriteRecPtr(void)
353 : {
354 0 : WalRcvData *walrcv = WalRcv;
355 :
356 0 : return pg_atomic_read_u64(&walrcv->writtenUpto);
357 : }
358 :
359 : /*
360 : * Returns the replication apply delay in ms or -1
361 : * if the apply delay info is not available
362 : */
363 : int
364 780 : GetReplicationApplyDelay(void)
365 : {
366 780 : WalRcvData *walrcv = WalRcv;
367 : XLogRecPtr receivePtr;
368 : XLogRecPtr replayPtr;
369 : TimestampTz chunkReplayStartTime;
370 :
371 780 : SpinLockAcquire(&walrcv->mutex);
372 780 : receivePtr = walrcv->flushedUpto;
373 780 : SpinLockRelease(&walrcv->mutex);
374 :
375 780 : replayPtr = GetXLogReplayRecPtr(NULL);
376 :
377 780 : if (receivePtr == replayPtr)
378 300 : return 0;
379 :
380 480 : chunkReplayStartTime = GetCurrentChunkReplayStartTime();
381 :
382 480 : if (chunkReplayStartTime == 0)
383 2 : return -1;
384 :
385 478 : return TimestampDifferenceMilliseconds(chunkReplayStartTime,
386 : GetCurrentTimestamp());
387 : }
388 :
389 : /*
390 : * Returns the network latency in ms, note that this includes any
391 : * difference in clock settings between the servers, as well as timezone.
392 : */
393 : int
394 780 : GetReplicationTransferLatency(void)
395 : {
396 780 : WalRcvData *walrcv = WalRcv;
397 : TimestampTz lastMsgSendTime;
398 : TimestampTz lastMsgReceiptTime;
399 :
400 780 : SpinLockAcquire(&walrcv->mutex);
401 780 : lastMsgSendTime = walrcv->lastMsgSendTime;
402 780 : lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
403 780 : SpinLockRelease(&walrcv->mutex);
404 :
405 780 : return TimestampDifferenceMilliseconds(lastMsgSendTime,
406 : lastMsgReceiptTime);
407 : }
|