Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogwait.c
4 : * Implements waiting for WAL operations to reach specific LSNs.
5 : *
6 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/access/transam/xlogwait.c
10 : *
11 : * NOTES
12 : * This file implements waiting for WAL operations to reach specific LSNs
13 : * on both physical standby and primary servers. The core idea is simple:
14 : * every process that wants to wait publishes the LSN it needs to the
15 : * shared memory, and the appropriate process (startup on standby,
16 : * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 : * once that LSN has been reached.
18 : *
19 : * The shared memory used by this module comprises a procInfos
20 : * per-backend array with the information of the awaited LSN for each
21 : * of the backend processes. The elements of that array are organized
22 : * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 : * allows for very fast finding of the least awaited LSN for each type.
24 : *
25 : * In addition, the least-awaited LSN for each type is cached in the
26 : * minWaitedLSN array. The waiter process publishes information about
27 : * itself to the shared memory and waits on the latch until it is woken
28 : * up by the appropriate process, standby is promoted, or the postmaster
29 : * dies. Then, it cleans information about itself in the shared memory.
30 : *
31 : * On standby servers:
32 : * - After replaying a WAL record, the startup process performs a fast
33 : * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 : * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 : * whose awaited LSNs are reached.
36 : * - After receiving WAL, the walreceiver process performs similar checks
37 : * against the flush and write LSNs, waking up waiters in the FLUSH
38 : * and WRITE heaps, respectively.
39 : *
40 : * On primary servers: After flushing WAL, the WAL writer or backend
41 : * process performs a similar check against the flush LSN and wakes up
42 : * waiters whose target flush LSNs have been reached.
43 : *
44 : *-------------------------------------------------------------------------
45 : */
46 :
47 : #include "postgres.h"
48 :
49 : #include <float.h>
50 :
51 : #include "access/xlog.h"
52 : #include "access/xlogrecovery.h"
53 : #include "access/xlogwait.h"
54 : #include "miscadmin.h"
55 : #include "pgstat.h"
56 : #include "replication/walreceiver.h"
57 : #include "storage/latch.h"
58 : #include "storage/proc.h"
59 : #include "storage/shmem.h"
60 : #include "storage/subsystems.h"
61 : #include "utils/fmgrprotos.h"
62 : #include "utils/pg_lsn.h"
63 : #include "utils/snapmgr.h"
64 : #include "utils/wait_event.h"
65 :
66 :
67 : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
68 : void *arg);
69 :
70 : struct WaitLSNState *waitLSNState = NULL;
71 :
72 : static void WaitLSNShmemRequest(void *arg);
73 : static void WaitLSNShmemInit(void *arg);
74 :
75 : const ShmemCallbacks WaitLSNShmemCallbacks = {
76 : .request_fn = WaitLSNShmemRequest,
77 : .init_fn = WaitLSNShmemInit,
78 : };
79 :
80 : /*
81 : * Wait event for each WaitLSNType, used with WaitLatch() to report
82 : * the wait in pg_stat_activity.
83 : */
84 : static const uint32 WaitLSNWaitEvents[] = {
85 : [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
86 : [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
87 : [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
88 : [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
89 : };
90 :
91 : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
92 : "WaitLSNWaitEvents must match WaitLSNType enum");
93 :
94 : /*
95 : * Get the current LSN for the specified wait type. Provide memory
96 : * barrier semantics before getting the value.
97 : */
98 : XLogRecPtr
99 11614 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
100 : {
101 : Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
102 :
103 : /*
104 : * All of the cases below provide memory barrier semantics:
105 : * GetWalRcvWriteRecPtr() and GetFlushRecPtr() have explicit barriers,
106 : * while GetXLogReplayRecPtr() and GetWalRcvFlushRecPtr() use spinlocks.
107 : */
108 11614 : switch (lsnType)
109 : {
110 212 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
111 212 : return GetXLogReplayRecPtr(NULL);
112 :
113 47 : case WAIT_LSN_TYPE_STANDBY_WRITE:
114 : {
115 47 : XLogRecPtr recptr = GetWalRcvWriteRecPtr();
116 47 : XLogRecPtr replay = GetXLogReplayRecPtr(NULL);
117 :
118 : /*
119 : * Use the replay position as a floor. WAL up to the replay
120 : * point is already on disk from a base backup, archive
121 : * restore, or prior streaming, so there is no reason to wait
122 : * for the walreceiver to re-receive it.
123 : */
124 47 : return Max(recptr, replay);
125 : }
126 :
127 35 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
128 : {
129 35 : XLogRecPtr recptr = GetWalRcvFlushRecPtr(NULL, NULL);
130 35 : XLogRecPtr replay = GetXLogReplayRecPtr(NULL);
131 :
132 : /* Same floor as standby_write; see comment above. */
133 35 : return Max(recptr, replay);
134 : }
135 :
136 11320 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
137 11320 : return GetFlushRecPtr(NULL);
138 : }
139 :
140 0 : elog(ERROR, "invalid LSN wait type: %d", lsnType);
141 : pg_unreachable();
142 : }
143 :
144 : /* Register the shared memory space needed for WaitLSNState. */
145 : static void
146 1248 : WaitLSNShmemRequest(void *arg)
147 : {
148 : Size size;
149 :
150 1248 : size = offsetof(WaitLSNState, procInfos);
151 1248 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
152 1248 : ShmemRequestStruct(.name = "WaitLSNState",
153 : .size = size,
154 : .ptr = (void **) &waitLSNState,
155 : );
156 1248 : }
157 :
158 : /* Initialize the WaitLSNState in the shared memory. */
159 : static void
160 1245 : WaitLSNShmemInit(void *arg)
161 : {
162 : /* Initialize heaps and tracking */
163 6225 : for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
164 : {
165 4980 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
166 4980 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
167 : }
168 :
169 : /* Initialize process info array */
170 1245 : memset(&waitLSNState->procInfos, 0,
171 1245 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
172 1245 : }
173 :
174 : /*
175 : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
176 : * LSN, so that the waiter with smallest LSN is at the top.
177 : */
178 : static int
179 28 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
180 : {
181 28 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
182 28 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
183 :
184 28 : if (aproc->waitLSN < bproc->waitLSN)
185 15 : return 1;
186 13 : else if (aproc->waitLSN > bproc->waitLSN)
187 10 : return -1;
188 : else
189 3 : return 0;
190 : }
191 :
192 : /*
193 : * Update minimum waited LSN for the specified LSN type
194 : */
195 : static void
196 26091 : updateMinWaitedLSN(WaitLSNType lsnType)
197 : {
198 26091 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
199 26091 : int i = (int) lsnType;
200 :
201 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
202 :
203 26091 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
204 : {
205 11531 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
206 11531 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
207 :
208 11531 : minWaitedLSN = procInfo->waitLSN;
209 : }
210 : /* Pairs with pg_atomic_read_membarrier_u64() in WaitLSNWakeup(). */
211 26091 : pg_atomic_write_membarrier_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
212 26091 : }
213 :
214 : /*
215 : * Add current process to appropriate waiters heap based on LSN type
216 : */
217 : static void
218 11526 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
219 : {
220 11526 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
221 11526 : int i = (int) lsnType;
222 :
223 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
224 :
225 11526 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
226 :
227 11526 : procInfo->procno = MyProcNumber;
228 11526 : procInfo->waitLSN = lsn;
229 11526 : procInfo->lsnType = lsnType;
230 :
231 : Assert(!procInfo->inHeap);
232 11526 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
233 11526 : procInfo->inHeap = true;
234 11526 : updateMinWaitedLSN(lsnType);
235 :
236 11526 : LWLockRelease(WaitLSNLock);
237 11526 : }
238 :
239 : /*
240 : * Remove current process from appropriate waiters heap based on LSN type
241 : */
242 : static void
243 11526 : deleteLSNWaiter(WaitLSNType lsnType)
244 : {
245 11526 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
246 11526 : int i = (int) lsnType;
247 :
248 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
249 :
250 11526 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
251 :
252 : Assert(procInfo->lsnType == lsnType);
253 :
254 11526 : if (procInfo->inHeap)
255 : {
256 11479 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
257 11479 : procInfo->inHeap = false;
258 11479 : updateMinWaitedLSN(lsnType);
259 : }
260 :
261 11526 : LWLockRelease(WaitLSNLock);
262 11526 : }
263 :
264 : /*
265 : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
266 : * on the stack. It should be enough to take single iteration for most cases.
267 : */
268 : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
269 :
270 : /*
271 : * Remove waiters whose LSN has been reached from the heap and set their
272 : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
273 : * and set latches for all waiters.
274 : *
275 : * This function first accumulates waiters to wake up into an array, then
276 : * wakes them up without holding a WaitLSNLock. The array size is static and
277 : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
278 : * to wake up all the waiters at once in the vast majority of cases. However,
279 : * if there are more waiters, this function will loop to process them in
280 : * multiple chunks.
281 : */
282 : static void
283 3086 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
284 : {
285 : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
286 : int numWakeUpProcs;
287 3086 : int i = (int) lsnType;
288 :
289 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
290 :
291 : do
292 : {
293 : int j;
294 :
295 3086 : numWakeUpProcs = 0;
296 3086 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
297 :
298 : /*
299 : * Iterate the waiters heap until we find LSN not yet reached. Record
300 : * process numbers to wake up, but send wakeups after releasing lock.
301 : */
302 3132 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
303 : {
304 50 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
305 : WaitLSNProcInfo *procInfo;
306 :
307 : /* Get procInfo using appropriate heap node */
308 50 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
309 :
310 50 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
311 4 : break;
312 :
313 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
314 46 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
315 46 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
316 :
317 : /* Update appropriate flag */
318 46 : procInfo->inHeap = false;
319 :
320 46 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
321 0 : break;
322 : }
323 :
324 3086 : updateMinWaitedLSN(lsnType);
325 3086 : LWLockRelease(WaitLSNLock);
326 :
327 : /*
328 : * Set latches for processes whose waited LSNs have been reached.
329 : * Since SetLatch() is a time-consuming operation, we do this outside
330 : * of WaitLSNLock. This is safe because procLatch is never freed, so
331 : * at worst we may set a latch for the wrong process or for no process
332 : * at all, which is harmless.
333 : */
334 3132 : for (j = 0; j < numWakeUpProcs; j++)
335 46 : SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
336 :
337 3086 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
338 3086 : }
339 :
340 : /*
341 : * Wake up processes waiting for LSN to reach currentLSN
342 : */
343 : void
344 9182423 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
345 : {
346 9182423 : int i = (int) lsnType;
347 :
348 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
349 :
350 : /*
351 : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
352 : * "wake all waiters" (e.g., during promotion when recovery ends). Pairs
353 : * with pg_atomic_write_membarrier_u64() in updateMinWaitedLSN().
354 : */
355 18361792 : if (XLogRecPtrIsValid(currentLSN) &&
356 9179369 : pg_atomic_read_membarrier_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
357 9179337 : return;
358 :
359 3086 : wakeupWaiters(lsnType, currentLSN);
360 : }
361 :
362 : /*
363 : * Clean up any LSN wait state for the current process.
364 : */
365 : void
366 60742 : WaitLSNCleanup(void)
367 : {
368 60742 : if (waitLSNState)
369 : {
370 : /*
371 : * We do a fast-path check of the inHeap flag without the lock. This
372 : * flag is set to true only by the process itself. So, it's only
373 : * possible to get a false positive. But that will be eliminated by a
374 : * recheck inside deleteLSNWaiter().
375 : */
376 60742 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
377 1 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
378 : }
379 60742 : }
380 :
381 : /*
382 : * Check if the given LSN type requires recovery to be in progress.
383 : * Standby wait types (replay, write, flush) require recovery;
384 : * primary wait types (flush) do not.
385 : */
386 : static inline bool
387 11610 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
388 : {
389 11400 : return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
390 23010 : t == WAIT_LSN_TYPE_STANDBY_WRITE ||
391 : t == WAIT_LSN_TYPE_STANDBY_FLUSH;
392 : }
393 :
394 : /*
395 : * Wait using MyLatch till the given LSN is reached, the replica gets
396 : * promoted, or the postmaster dies.
397 : *
398 : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
399 : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
400 : * or replica got promoted before the target LSN reached.
401 : */
402 : WaitLSNResult
403 11526 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
404 : {
405 : XLogRecPtr currentLSN;
406 11526 : TimestampTz endtime = 0;
407 11526 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
408 :
409 : /* Shouldn't be called when shmem isn't initialized */
410 : Assert(waitLSNState);
411 :
412 : /* Should have a valid proc number */
413 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
414 :
415 11526 : if (timeout > 0)
416 : {
417 11514 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
418 11514 : wake_events |= WL_TIMEOUT;
419 : }
420 :
421 : /*
422 : * Add our process to the waiters heap. It might happen that target LSN
423 : * gets reached before we do. The check at the beginning of the loop
424 : * below prevents the race condition.
425 : */
426 11526 : addLSNWaiter(targetLSN, lsnType);
427 :
428 : for (;;)
429 84 : {
430 : int rc;
431 11610 : long delay_ms = -1;
432 :
433 : /* Get current LSN for the wait type */
434 11610 : currentLSN = GetCurrentLSNForWaitType(lsnType);
435 :
436 : /* Check that recovery is still in-progress */
437 11610 : if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
438 : {
439 : /*
440 : * Recovery was ended, but check if target LSN was already
441 : * reached.
442 : */
443 6 : deleteLSNWaiter(lsnType);
444 :
445 6 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
446 1 : return WAIT_LSN_RESULT_SUCCESS;
447 5 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
448 : }
449 : else
450 : {
451 : /* Check if the waited LSN has been reached */
452 11604 : if (targetLSN <= currentLSN)
453 11508 : break;
454 : }
455 :
456 96 : if (timeout > 0)
457 : {
458 82 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
459 82 : if (delay_ms <= 0)
460 11 : break;
461 : }
462 :
463 85 : CHECK_FOR_INTERRUPTS();
464 :
465 84 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
466 84 : WaitLSNWaitEvents[lsnType]);
467 :
468 : /*
469 : * Emergency bailout if postmaster has died. This is to avoid the
470 : * necessity for manual cleanup of all postmaster children.
471 : */
472 84 : if (rc & WL_POSTMASTER_DEATH)
473 0 : ereport(FATAL,
474 : errcode(ERRCODE_ADMIN_SHUTDOWN),
475 : errmsg("terminating connection due to unexpected postmaster exit"),
476 : errcontext("while waiting for LSN"));
477 :
478 84 : ResetLatch(MyLatch);
479 : }
480 :
481 : /*
482 : * Delete our process from the shared memory heap. We might already be
483 : * deleted by the startup process. The 'inHeap' flags prevents us from
484 : * the double deletion.
485 : */
486 11519 : deleteLSNWaiter(lsnType);
487 :
488 : /*
489 : * If we didn't reach the target LSN, we must be exited by timeout.
490 : */
491 11519 : if (targetLSN > currentLSN)
492 11 : return WAIT_LSN_RESULT_TIMEOUT;
493 :
494 11508 : return WAIT_LSN_RESULT_SUCCESS;
495 : }
|