Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogwait.c
4 : * Implements waiting for WAL operations to reach specific LSNs.
5 : *
6 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/access/transam/xlogwait.c
10 : *
11 : * NOTES
12 : * This file implements waiting for WAL operations to reach specific LSNs
13 : * on both physical standby and primary servers. The core idea is simple:
14 : * every process that wants to wait publishes the LSN it needs to the
15 : * shared memory, and the appropriate process (startup on standby,
16 : * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 : * once that LSN has been reached.
18 : *
19 : * The shared memory used by this module comprises a procInfos
20 : * per-backend array with the information of the awaited LSN for each
21 : * of the backend processes. The elements of that array are organized
22 : * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 : * allows for very fast finding of the least awaited LSN for each type.
24 : *
25 : * In addition, the least-awaited LSN for each type is cached in the
26 : * minWaitedLSN array. The waiter process publishes information about
27 : * itself to the shared memory and waits on the latch until it is woken
28 : * up by the appropriate process, standby is promoted, or the postmaster
29 : * dies. Then, it cleans information about itself in the shared memory.
30 : *
31 : * On standby servers:
32 : * - After replaying a WAL record, the startup process performs a fast
33 : * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 : * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 : * whose awaited LSNs are reached.
36 : * - After receiving WAL, the walreceiver process performs similar checks
37 : * against the flush and write LSNs, waking up waiters in the FLUSH
38 : * and WRITE heaps, respectively.
39 : *
40 : * On primary servers: After flushing WAL, the WAL writer or backend
41 : * process performs a similar check against the flush LSN and wakes up
42 : * waiters whose target flush LSNs have been reached.
43 : *
44 : *-------------------------------------------------------------------------
45 : */
46 :
47 : #include "postgres.h"
48 :
49 : #include <float.h>
50 :
51 : #include "access/xlog.h"
52 : #include "access/xlogrecovery.h"
53 : #include "access/xlogwait.h"
54 : #include "miscadmin.h"
55 : #include "pgstat.h"
56 : #include "replication/walreceiver.h"
57 : #include "storage/latch.h"
58 : #include "storage/proc.h"
59 : #include "storage/shmem.h"
60 : #include "storage/subsystems.h"
61 : #include "utils/fmgrprotos.h"
62 : #include "utils/pg_lsn.h"
63 : #include "utils/snapmgr.h"
64 : #include "utils/wait_event.h"
65 :
66 :
67 : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
68 : void *arg);
69 :
70 : struct WaitLSNState *waitLSNState = NULL;
71 :
72 : static void WaitLSNShmemRequest(void *arg);
73 : static void WaitLSNShmemInit(void *arg);
74 :
75 : const ShmemCallbacks WaitLSNShmemCallbacks = {
76 : .request_fn = WaitLSNShmemRequest,
77 : .init_fn = WaitLSNShmemInit,
78 : };
79 :
80 : /*
81 : * Wait event for each WaitLSNType, used with WaitLatch() to report
82 : * the wait in pg_stat_activity.
83 : */
84 : static const uint32 WaitLSNWaitEvents[] = {
85 : [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
86 : [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
87 : [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
88 : [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
89 : };
90 :
91 : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
92 : "WaitLSNWaitEvents must match WaitLSNType enum");
93 :
94 : /*
95 : * Get the current LSN for the specified wait type.
96 : */
97 : XLogRecPtr
98 304 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
99 : {
100 : Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
101 :
102 304 : switch (lsnType)
103 : {
104 203 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
105 203 : return GetXLogReplayRecPtr(NULL);
106 :
107 43 : case WAIT_LSN_TYPE_STANDBY_WRITE:
108 43 : return GetWalRcvWriteRecPtr();
109 :
110 27 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
111 27 : return GetWalRcvFlushRecPtr(NULL, NULL);
112 :
113 31 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
114 31 : return GetFlushRecPtr(NULL);
115 : }
116 :
117 0 : elog(ERROR, "invalid LSN wait type: %d", lsnType);
118 : pg_unreachable();
119 : }
120 :
121 : /* Register the shared memory space needed for WaitLSNState. */
122 : static void
123 1238 : WaitLSNShmemRequest(void *arg)
124 : {
125 : Size size;
126 :
127 1238 : size = offsetof(WaitLSNState, procInfos);
128 1238 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
129 1238 : ShmemRequestStruct(.name = "WaitLSNState",
130 : .size = size,
131 : .ptr = (void **) &waitLSNState,
132 : );
133 1238 : }
134 :
135 : /* Initialize the WaitLSNState in the shared memory. */
136 : static void
137 1235 : WaitLSNShmemInit(void *arg)
138 : {
139 : /* Initialize heaps and tracking */
140 6175 : for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
141 : {
142 4940 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
143 4940 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
144 : }
145 :
146 : /* Initialize process info array */
147 1235 : memset(&waitLSNState->procInfos, 0,
148 1235 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
149 1235 : }
150 :
151 : /*
152 : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
153 : * LSN, so that the waiter with smallest LSN is at the top.
154 : */
155 : static int
156 28 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
157 : {
158 28 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
159 28 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
160 :
161 28 : if (aproc->waitLSN < bproc->waitLSN)
162 15 : return 1;
163 13 : else if (aproc->waitLSN > bproc->waitLSN)
164 10 : return -1;
165 : else
166 3 : return 0;
167 : }
168 :
169 : /*
170 : * Update minimum waited LSN for the specified LSN type
171 : */
172 : static void
173 3473 : updateMinWaitedLSN(WaitLSNType lsnType)
174 : {
175 3473 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
176 3473 : int i = (int) lsnType;
177 :
178 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
179 :
180 3473 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
181 : {
182 232 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
183 232 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
184 :
185 232 : minWaitedLSN = procInfo->waitLSN;
186 : }
187 3473 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
188 3473 : }
189 :
190 : /*
191 : * Add current process to appropriate waiters heap based on LSN type
192 : */
193 : static void
194 227 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
195 : {
196 227 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
197 227 : int i = (int) lsnType;
198 :
199 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
200 :
201 227 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
202 :
203 227 : procInfo->procno = MyProcNumber;
204 227 : procInfo->waitLSN = lsn;
205 227 : procInfo->lsnType = lsnType;
206 :
207 : Assert(!procInfo->inHeap);
208 227 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
209 227 : procInfo->inHeap = true;
210 227 : updateMinWaitedLSN(lsnType);
211 :
212 227 : LWLockRelease(WaitLSNLock);
213 227 : }
214 :
215 : /*
216 : * Remove current process from appropriate waiters heap based on LSN type
217 : */
218 : static void
219 227 : deleteLSNWaiter(WaitLSNType lsnType)
220 : {
221 227 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
222 227 : int i = (int) lsnType;
223 :
224 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
225 :
226 227 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
227 :
228 : Assert(procInfo->lsnType == lsnType);
229 :
230 227 : if (procInfo->inHeap)
231 : {
232 182 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
233 182 : procInfo->inHeap = false;
234 182 : updateMinWaitedLSN(lsnType);
235 : }
236 :
237 227 : LWLockRelease(WaitLSNLock);
238 227 : }
239 :
240 : /*
241 : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
242 : * on the stack. It should be enough to take single iteration for most cases.
243 : */
244 : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
245 :
246 : /*
247 : * Remove waiters whose LSN has been reached from the heap and set their
248 : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
249 : * and set latches for all waiters.
250 : *
251 : * This function first accumulates waiters to wake up into an array, then
252 : * wakes them up without holding a WaitLSNLock. The array size is static and
253 : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
254 : * to wake up all the waiters at once in the vast majority of cases. However,
255 : * if there are more waiters, this function will loop to process them in
256 : * multiple chunks.
257 : */
258 : static void
259 3064 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
260 : {
261 : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
262 : int numWakeUpProcs;
263 3064 : int i = (int) lsnType;
264 :
265 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
266 :
267 : do
268 : {
269 : int j;
270 :
271 3064 : numWakeUpProcs = 0;
272 3064 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
273 :
274 : /*
275 : * Iterate the waiters heap until we find LSN not yet reached. Record
276 : * process numbers to wake up, but send wakeups after releasing lock.
277 : */
278 3109 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
279 : {
280 49 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
281 : WaitLSNProcInfo *procInfo;
282 :
283 : /* Get procInfo using appropriate heap node */
284 49 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
285 :
286 49 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
287 4 : break;
288 :
289 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
290 45 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
291 45 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
292 :
293 : /* Update appropriate flag */
294 45 : procInfo->inHeap = false;
295 :
296 45 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
297 0 : break;
298 : }
299 :
300 3064 : updateMinWaitedLSN(lsnType);
301 3064 : LWLockRelease(WaitLSNLock);
302 :
303 : /*
304 : * Set latches for processes whose waited LSNs have been reached.
305 : * Since SetLatch() is a time-consuming operation, we do this outside
306 : * of WaitLSNLock. This is safe because procLatch is never freed, so
307 : * at worst we may set a latch for the wrong process or for no process
308 : * at all, which is harmless.
309 : */
310 3109 : for (j = 0; j < numWakeUpProcs; j++)
311 45 : SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
312 :
313 3064 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
314 3064 : }
315 :
316 : /*
317 : * Wake up processes waiting for LSN to reach currentLSN
318 : */
319 : void
320 3064 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
321 : {
322 3064 : int i = (int) lsnType;
323 :
324 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
325 :
326 : /*
327 : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
328 : * "wake all waiters" (e.g., during promotion when recovery ends).
329 : */
330 3095 : if (XLogRecPtrIsValid(currentLSN) &&
331 31 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
332 0 : return;
333 :
334 3064 : wakeupWaiters(lsnType, currentLSN);
335 : }
336 :
337 : /*
338 : * Clean up LSN waiters for exiting process
339 : */
340 : void
341 55327 : WaitLSNCleanup(void)
342 : {
343 55327 : if (waitLSNState)
344 : {
345 : /*
346 : * We do a fast-path check of the inHeap flag without the lock. This
347 : * flag is set to true only by the process itself. So, it's only
348 : * possible to get a false positive. But that will be eliminated by a
349 : * recheck inside deleteLSNWaiter().
350 : */
351 55327 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
352 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
353 : }
354 55327 : }
355 :
356 : /*
357 : * Check if the given LSN type requires recovery to be in progress.
358 : * Standby wait types (replay, write, flush) require recovery;
359 : * primary wait types (flush) do not.
360 : */
361 : static inline bool
362 300 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
363 : {
364 99 : return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
365 399 : t == WAIT_LSN_TYPE_STANDBY_WRITE ||
366 : t == WAIT_LSN_TYPE_STANDBY_FLUSH;
367 : }
368 :
369 : /*
370 : * Wait using MyLatch till the given LSN is reached, the replica gets
371 : * promoted, or the postmaster dies.
372 : *
373 : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
374 : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
375 : * or replica got promoted before the target LSN reached.
376 : */
377 : WaitLSNResult
378 227 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
379 : {
380 : XLogRecPtr currentLSN;
381 227 : TimestampTz endtime = 0;
382 227 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
383 :
384 : /* Shouldn't be called when shmem isn't initialized */
385 : Assert(waitLSNState);
386 :
387 : /* Should have a valid proc number */
388 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
389 :
390 227 : if (timeout > 0)
391 : {
392 216 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
393 216 : wake_events |= WL_TIMEOUT;
394 : }
395 :
396 : /*
397 : * Add our process to the waiters heap. It might happen that target LSN
398 : * gets reached before we do. The check at the beginning of the loop
399 : * below prevents the race condition.
400 : */
401 227 : addLSNWaiter(targetLSN, lsnType);
402 :
403 : for (;;)
404 73 : {
405 : int rc;
406 300 : long delay_ms = -1;
407 :
408 : /* Get current LSN for the wait type */
409 300 : currentLSN = GetCurrentLSNForWaitType(lsnType);
410 :
411 : /* Check that recovery is still in-progress */
412 300 : if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
413 : {
414 : /*
415 : * Recovery was ended, but check if target LSN was already
416 : * reached.
417 : */
418 6 : deleteLSNWaiter(lsnType);
419 :
420 6 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
421 1 : return WAIT_LSN_RESULT_SUCCESS;
422 5 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
423 : }
424 : else
425 : {
426 : /* Check if the waited LSN has been reached */
427 294 : if (targetLSN <= currentLSN)
428 215 : break;
429 : }
430 :
431 79 : if (timeout > 0)
432 : {
433 67 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
434 67 : if (delay_ms <= 0)
435 6 : break;
436 : }
437 :
438 73 : CHECK_FOR_INTERRUPTS();
439 :
440 73 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
441 73 : WaitLSNWaitEvents[lsnType]);
442 :
443 : /*
444 : * Emergency bailout if postmaster has died. This is to avoid the
445 : * necessity for manual cleanup of all postmaster children.
446 : */
447 73 : if (rc & WL_POSTMASTER_DEATH)
448 0 : ereport(FATAL,
449 : errcode(ERRCODE_ADMIN_SHUTDOWN),
450 : errmsg("terminating connection due to unexpected postmaster exit"),
451 : errcontext("while waiting for LSN"));
452 :
453 73 : if (rc & WL_LATCH_SET)
454 67 : ResetLatch(MyLatch);
455 : }
456 :
457 : /*
458 : * Delete our process from the shared memory heap. We might already be
459 : * deleted by the startup process. The 'inHeap' flags prevents us from
460 : * the double deletion.
461 : */
462 221 : deleteLSNWaiter(lsnType);
463 :
464 : /*
465 : * If we didn't reach the target LSN, we must be exited by timeout.
466 : */
467 221 : if (targetLSN > currentLSN)
468 6 : return WAIT_LSN_RESULT_TIMEOUT;
469 :
470 215 : return WAIT_LSN_RESULT_SUCCESS;
471 : }
|