Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogwait.c
4 : * Implements waiting for WAL operations to reach specific LSNs.
5 : *
6 : * Copyright (c) 2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/access/transam/xlogwait.c
10 : *
11 : * NOTES
12 : * This file implements waiting for WAL operations to reach specific LSNs
13 : * on both physical standby and primary servers. The core idea is simple:
14 : * every process that wants to wait publishes the LSN it needs to the
15 : * shared memory, and the appropriate process (startup on standby, or
16 : * WAL writer/backend on primary) wakes it once that LSN has been reached.
17 : *
18 : * The shared memory used by this module comprises a procInfos
19 : * per-backend array with the information of the awaited LSN for each
20 : * of the backend processes. The elements of that array are organized
21 : * into a pairing heap waitersHeap, which allows for very fast finding
22 : * of the least awaited LSN.
23 : *
24 : * In addition, the least-awaited LSN is cached as minWaitedLSN. The
25 : * waiter process publishes information about itself to the shared
26 : * memory and waits on the latch until it is woken up by the appropriate
27 : * process, standby is promoted, or the postmaster dies. Then, it cleans
28 : * information about itself in the shared memory.
29 : *
30 : * On standby servers: After replaying a WAL record, the startup process
31 : * first performs a fast path check minWaitedLSN > replayLSN. If this
32 : * check is negative, it checks waitersHeap and wakes up the backend
33 : * whose awaited LSNs are reached.
34 : *
35 : * On primary servers: After flushing WAL, the WAL writer or backend
36 : * process performs a similar check against the flush LSN and wakes up
37 : * waiters whose target flush LSNs have been reached.
38 : *
39 : *-------------------------------------------------------------------------
40 : */
41 :
42 : #include "postgres.h"
43 :
44 : #include <float.h>
45 : #include <math.h>
46 :
47 : #include "access/xlog.h"
48 : #include "access/xlogrecovery.h"
49 : #include "access/xlogwait.h"
50 : #include "miscadmin.h"
51 : #include "pgstat.h"
52 : #include "storage/latch.h"
53 : #include "storage/proc.h"
54 : #include "storage/shmem.h"
55 : #include "utils/fmgrprotos.h"
56 : #include "utils/pg_lsn.h"
57 : #include "utils/snapmgr.h"
58 :
59 :
60 : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
61 : void *arg);
62 :
63 : struct WaitLSNState *waitLSNState = NULL;
64 :
65 : /* Report the amount of shared memory space needed for WaitLSNState. */
66 : Size
67 6300 : WaitLSNShmemSize(void)
68 : {
69 : Size size;
70 :
71 6300 : size = offsetof(WaitLSNState, procInfos);
72 6300 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
73 6300 : return size;
74 : }
75 :
76 : /* Initialize the WaitLSNState in the shared memory. */
77 : void
78 2200 : WaitLSNShmemInit(void)
79 : {
80 : bool found;
81 :
82 2200 : waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
83 : WaitLSNShmemSize(),
84 : &found);
85 2200 : if (!found)
86 : {
87 : int i;
88 :
89 : /* Initialize heaps and tracking */
90 6600 : for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
91 : {
92 4400 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
93 4400 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, (void *) (uintptr_t) i);
94 : }
95 :
96 : /* Initialize process info array */
97 2200 : memset(&waitLSNState->procInfos, 0,
98 2200 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
99 : }
100 2200 : }
101 :
102 : /*
103 : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
104 : * LSN, so that the waiter with smallest LSN is at the top.
105 : */
106 : static int
107 16 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
108 : {
109 16 : int i = (uintptr_t) arg;
110 16 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], a);
111 16 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], b);
112 :
113 16 : if (aproc->waitLSN < bproc->waitLSN)
114 10 : return 1;
115 6 : else if (aproc->waitLSN > bproc->waitLSN)
116 6 : return -1;
117 : else
118 0 : return 0;
119 : }
120 :
121 : /*
122 : * Update minimum waited LSN for the specified LSN type
123 : */
124 : static void
125 1864 : updateMinWaitedLSN(WaitLSNType lsnType)
126 : {
127 1864 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
128 1864 : int i = (int) lsnType;
129 :
130 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
131 :
132 1864 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
133 : {
134 42 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
135 42 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
136 :
137 42 : minWaitedLSN = procInfo->waitLSN;
138 : }
139 1864 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
140 1864 : }
141 :
142 : /*
143 : * Add current process to appropriate waiters heap based on LSN type
144 : */
145 : static void
146 34 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
147 : {
148 34 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
149 34 : int i = (int) lsnType;
150 :
151 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
152 :
153 34 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
154 :
155 34 : procInfo->procno = MyProcNumber;
156 34 : procInfo->waitLSN = lsn;
157 :
158 : Assert(!procInfo->inHeap[i]);
159 34 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
160 34 : procInfo->inHeap[i] = true;
161 34 : updateMinWaitedLSN(lsnType);
162 :
163 34 : LWLockRelease(WaitLSNLock);
164 34 : }
165 :
166 : /*
167 : * Remove current process from appropriate waiters heap based on LSN type
168 : */
169 : static void
170 34 : deleteLSNWaiter(WaitLSNType lsnType)
171 : {
172 34 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
173 34 : int i = (int) lsnType;
174 :
175 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
176 :
177 34 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
178 :
179 34 : if (procInfo->inHeap[i])
180 : {
181 18 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
182 18 : procInfo->inHeap[i] = false;
183 18 : updateMinWaitedLSN(lsnType);
184 : }
185 :
186 34 : LWLockRelease(WaitLSNLock);
187 34 : }
188 :
189 : /*
190 : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
191 : * on the stack. It should be enough to take single iteration for most cases.
192 : */
193 : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
194 :
195 : /*
196 : * Remove waiters whose LSN has been reached from the heap and set their
197 : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
198 : * and set latches for all waiters.
199 : *
200 : * This function first accumulates waiters to wake up into an array, then
201 : * wakes them up without holding a WaitLSNLock. The array size is static and
202 : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
203 : * to wake up all the waiters at once in the vast majority of cases. However,
204 : * if there are more waiters, this function will loop to process them in
205 : * multiple chunks.
206 : */
207 : static void
208 1812 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
209 : {
210 : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
211 : int numWakeUpProcs;
212 1812 : int i = (int) lsnType;
213 :
214 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
215 :
216 : do
217 : {
218 1812 : numWakeUpProcs = 0;
219 1812 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
220 :
221 : /*
222 : * Iterate the waiters heap until we find LSN not yet reached. Record
223 : * process numbers to wake up, but send wakeups after releasing lock.
224 : */
225 1828 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
226 : {
227 24 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
228 : WaitLSNProcInfo *procInfo;
229 :
230 : /* Get procInfo using appropriate heap node */
231 24 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
232 :
233 24 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
234 8 : break;
235 :
236 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
237 16 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
238 16 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
239 :
240 : /* Update appropriate flag */
241 16 : procInfo->inHeap[i] = false;
242 :
243 16 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
244 0 : break;
245 : }
246 :
247 1812 : updateMinWaitedLSN(lsnType);
248 1812 : LWLockRelease(WaitLSNLock);
249 :
250 : /*
251 : * Set latches for processes whose waited LSNs have been reached.
252 : * Since SetLatch() is a time-consuming operation, we do this outside
253 : * of WaitLSNLock. This is safe because procLatch is never freed, so
254 : * at worst we may set a latch for the wrong process or for no process
255 : * at all, which is harmless.
256 : */
257 1828 : for (i = 0; i < numWakeUpProcs; i++)
258 16 : SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
259 :
260 1812 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
261 1812 : }
262 :
263 : /*
264 : * Wake up processes waiting for LSN to reach currentLSN
265 : */
266 : void
267 1812 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
268 : {
269 1812 : int i = (int) lsnType;
270 :
271 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
272 :
273 : /*
274 : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
275 : * "wake all waiters" (e.g., during promotion when recovery ends).
276 : */
277 1826 : if (XLogRecPtrIsValid(currentLSN) &&
278 14 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
279 0 : return;
280 :
281 1812 : wakeupWaiters(lsnType, currentLSN);
282 : }
283 :
284 : /*
285 : * Clean up LSN waiters for exiting process
286 : */
287 : void
288 85938 : WaitLSNCleanup(void)
289 : {
290 85938 : if (waitLSNState)
291 : {
292 : int i;
293 :
294 : /*
295 : * We do a fast-path check of the heap flags without the lock. These
296 : * flags are set to true only by the process itself. So, it's only
297 : * possible to get a false positive. But that will be eliminated by a
298 : * recheck inside deleteLSNWaiter().
299 : */
300 :
301 257814 : for (i = 0; i < (int) WAIT_LSN_TYPE_COUNT; i++)
302 : {
303 171876 : if (waitLSNState->procInfos[MyProcNumber].inHeap[i])
304 0 : deleteLSNWaiter((WaitLSNType) i);
305 : }
306 : }
307 85938 : }
308 :
309 : /*
310 : * Wait using MyLatch till the given LSN is reached, the replica gets
311 : * promoted, or the postmaster dies.
312 : *
313 : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
314 : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
315 : * or replica got promoted before the target LSN reached.
316 : */
317 : WaitLSNResult
318 34 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
319 : {
320 : XLogRecPtr currentLSN;
321 34 : TimestampTz endtime = 0;
322 34 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
323 :
324 : /* Shouldn't be called when shmem isn't initialized */
325 : Assert(waitLSNState);
326 :
327 : /* Should have a valid proc number */
328 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
329 :
330 34 : if (timeout > 0)
331 : {
332 16 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
333 16 : wake_events |= WL_TIMEOUT;
334 : }
335 :
336 : /*
337 : * Add our process to the waiters heap. It might happen that target LSN
338 : * gets reached before we do. The check at the beginning of the loop
339 : * below prevents the race condition.
340 : */
341 34 : addLSNWaiter(targetLSN, lsnType);
342 :
343 : for (;;)
344 22 : {
345 : int rc;
346 56 : long delay_ms = -1;
347 :
348 56 : if (lsnType == WAIT_LSN_TYPE_REPLAY)
349 56 : currentLSN = GetXLogReplayRecPtr(NULL);
350 : else
351 0 : currentLSN = GetFlushRecPtr(NULL);
352 :
353 : /* Check that recovery is still in-progress */
354 56 : if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
355 : {
356 : /*
357 : * Recovery was ended, but check if target LSN was already
358 : * reached.
359 : */
360 8 : deleteLSNWaiter(lsnType);
361 :
362 8 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
363 2 : return WAIT_LSN_RESULT_SUCCESS;
364 6 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
365 : }
366 : else
367 : {
368 : /* Check if the waited LSN has been reached */
369 48 : if (targetLSN <= currentLSN)
370 20 : break;
371 : }
372 :
373 28 : if (timeout > 0)
374 : {
375 14 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
376 14 : if (delay_ms <= 0)
377 6 : break;
378 : }
379 :
380 22 : CHECK_FOR_INTERRUPTS();
381 :
382 22 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
383 : (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
384 :
385 : /*
386 : * Emergency bailout if postmaster has died. This is to avoid the
387 : * necessity for manual cleanup of all postmaster children.
388 : */
389 22 : if (rc & WL_POSTMASTER_DEATH)
390 0 : ereport(FATAL,
391 : errcode(ERRCODE_ADMIN_SHUTDOWN),
392 : errmsg("terminating connection due to unexpected postmaster exit"),
393 : errcontext("while waiting for LSN"));
394 :
395 22 : if (rc & WL_LATCH_SET)
396 16 : ResetLatch(MyLatch);
397 : }
398 :
399 : /*
400 : * Delete our process from the shared memory heap. We might already be
401 : * deleted by the startup process. The 'inHeap' flags prevents us from
402 : * the double deletion.
403 : */
404 26 : deleteLSNWaiter(lsnType);
405 :
406 : /*
407 : * If we didn't reach the target LSN, we must be exited by timeout.
408 : */
409 26 : if (targetLSN > currentLSN)
410 6 : return WAIT_LSN_RESULT_TIMEOUT;
411 :
412 20 : return WAIT_LSN_RESULT_SUCCESS;
413 : }
|