Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogwait.c
4 : * Implements waiting for WAL operations to reach specific LSNs.
5 : *
6 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/access/transam/xlogwait.c
10 : *
11 : * NOTES
12 : * This file implements waiting for WAL operations to reach specific LSNs
13 : * on both physical standby and primary servers. The core idea is simple:
14 : * every process that wants to wait publishes the LSN it needs to the
15 : * shared memory, and the appropriate process (startup on standby,
16 : * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 : * once that LSN has been reached.
18 : *
19 : * The shared memory used by this module comprises a procInfos
20 : * per-backend array with the information of the awaited LSN for each
21 : * of the backend processes. The elements of that array are organized
22 : * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 : * allows for very fast finding of the least awaited LSN for each type.
24 : *
25 : * In addition, the least-awaited LSN for each type is cached in the
26 : * minWaitedLSN array. The waiter process publishes information about
27 : * itself to the shared memory and waits on the latch until it is woken
28 : * up by the appropriate process, standby is promoted, or the postmaster
29 : * dies. Then, it cleans information about itself in the shared memory.
30 : *
31 : * On standby servers:
32 : * - After replaying a WAL record, the startup process performs a fast
33 : * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 : * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 : * whose awaited LSNs are reached.
36 : * - After receiving WAL, the walreceiver process performs similar checks
37 : * against the flush and write LSNs, waking up waiters in the FLUSH
38 : * and WRITE heaps, respectively.
39 : *
40 : * On primary servers: After flushing WAL, the WAL writer or backend
41 : * process performs a similar check against the flush LSN and wakes up
42 : * waiters whose target flush LSNs have been reached.
43 : *
44 : *-------------------------------------------------------------------------
45 : */
46 :
47 : #include "postgres.h"
48 :
49 : #include <float.h>
50 :
51 : #include "access/xlog.h"
52 : #include "access/xlogrecovery.h"
53 : #include "access/xlogwait.h"
54 : #include "miscadmin.h"
55 : #include "pgstat.h"
56 : #include "replication/walreceiver.h"
57 : #include "storage/latch.h"
58 : #include "storage/proc.h"
59 : #include "storage/shmem.h"
60 : #include "utils/fmgrprotos.h"
61 : #include "utils/pg_lsn.h"
62 : #include "utils/snapmgr.h"
63 :
64 :
65 : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
66 : void *arg);
67 :
68 : struct WaitLSNState *waitLSNState = NULL;
69 :
70 : /*
71 : * Wait event for each WaitLSNType, used with WaitLatch() to report
72 : * the wait in pg_stat_activity.
73 : */
74 : static const uint32 WaitLSNWaitEvents[] = {
75 : [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
76 : [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
77 : [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
78 : [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
79 : };
80 :
81 : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
82 : "WaitLSNWaitEvents must match WaitLSNType enum");
83 :
84 : /*
85 : * Get the current LSN for the specified wait type.
86 : */
87 : XLogRecPtr
88 194 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
89 : {
90 : Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
91 :
92 194 : switch (lsnType)
93 : {
94 74 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
95 74 : return GetXLogReplayRecPtr(NULL);
96 :
97 54 : case WAIT_LSN_TYPE_STANDBY_WRITE:
98 54 : return GetWalRcvWriteRecPtr();
99 :
100 54 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
101 54 : return GetWalRcvFlushRecPtr(NULL, NULL);
102 :
103 12 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
104 12 : return GetFlushRecPtr(NULL);
105 : }
106 :
107 0 : elog(ERROR, "invalid LSN wait type: %d", lsnType);
108 : pg_unreachable();
109 : }
110 :
111 : /* Report the amount of shared memory space needed for WaitLSNState. */
112 : Size
113 6534 : WaitLSNShmemSize(void)
114 : {
115 : Size size;
116 :
117 6534 : size = offsetof(WaitLSNState, procInfos);
118 6534 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
119 6534 : return size;
120 : }
121 :
122 : /* Initialize the WaitLSNState in the shared memory. */
123 : void
124 2280 : WaitLSNShmemInit(void)
125 : {
126 : bool found;
127 :
128 2280 : waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
129 : WaitLSNShmemSize(),
130 : &found);
131 2280 : if (!found)
132 : {
133 : int i;
134 :
135 : /* Initialize heaps and tracking */
136 11400 : for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
137 : {
138 9120 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
139 9120 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
140 : }
141 :
142 : /* Initialize process info array */
143 2280 : memset(&waitLSNState->procInfos, 0,
144 2280 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
145 : }
146 2280 : }
147 :
148 : /*
149 : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
150 : * LSN, so that the waiter with smallest LSN is at the top.
151 : */
152 : static int
153 54 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
154 : {
155 54 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
156 54 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
157 :
158 54 : if (aproc->waitLSN < bproc->waitLSN)
159 30 : return 1;
160 24 : else if (aproc->waitLSN > bproc->waitLSN)
161 18 : return -1;
162 : else
163 6 : return 0;
164 : }
165 :
166 : /*
167 : * Update minimum waited LSN for the specified LSN type
168 : */
169 : static void
170 5724 : updateMinWaitedLSN(WaitLSNType lsnType)
171 : {
172 5724 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
173 5724 : int i = (int) lsnType;
174 :
175 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
176 :
177 5724 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
178 : {
179 94 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
180 94 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
181 :
182 94 : minWaitedLSN = procInfo->waitLSN;
183 : }
184 5724 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
185 5724 : }
186 :
187 : /*
188 : * Add current process to appropriate waiters heap based on LSN type
189 : */
190 : static void
191 86 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
192 : {
193 86 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
194 86 : int i = (int) lsnType;
195 :
196 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
197 :
198 86 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
199 :
200 86 : procInfo->procno = MyProcNumber;
201 86 : procInfo->waitLSN = lsn;
202 86 : procInfo->lsnType = lsnType;
203 :
204 : Assert(!procInfo->inHeap);
205 86 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
206 86 : procInfo->inHeap = true;
207 86 : updateMinWaitedLSN(lsnType);
208 :
209 86 : LWLockRelease(WaitLSNLock);
210 86 : }
211 :
212 : /*
213 : * Remove current process from appropriate waiters heap based on LSN type
214 : */
215 : static void
216 86 : deleteLSNWaiter(WaitLSNType lsnType)
217 : {
218 86 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
219 86 : int i = (int) lsnType;
220 :
221 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
222 :
223 86 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
224 :
225 : Assert(procInfo->lsnType == lsnType);
226 :
227 86 : if (procInfo->inHeap)
228 : {
229 34 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
230 34 : procInfo->inHeap = false;
231 34 : updateMinWaitedLSN(lsnType);
232 : }
233 :
234 86 : LWLockRelease(WaitLSNLock);
235 86 : }
236 :
237 : /*
238 : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
239 : * on the stack. It should be enough to take single iteration for most cases.
240 : */
241 : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
242 :
243 : /*
244 : * Remove waiters whose LSN has been reached from the heap and set their
245 : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
246 : * and set latches for all waiters.
247 : *
248 : * This function first accumulates waiters to wake up into an array, then
249 : * wakes them up without holding a WaitLSNLock. The array size is static and
250 : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
251 : * to wake up all the waiters at once in the vast majority of cases. However,
252 : * if there are more waiters, this function will loop to process them in
253 : * multiple chunks.
254 : */
255 : static void
256 5604 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
257 : {
258 : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
259 : int numWakeUpProcs;
260 5604 : int i = (int) lsnType;
261 :
262 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
263 :
264 : do
265 : {
266 : int j;
267 :
268 5604 : numWakeUpProcs = 0;
269 5604 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
270 :
271 : /*
272 : * Iterate the waiters heap until we find LSN not yet reached. Record
273 : * process numbers to wake up, but send wakeups after releasing lock.
274 : */
275 5656 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
276 : {
277 60 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
278 : WaitLSNProcInfo *procInfo;
279 :
280 : /* Get procInfo using appropriate heap node */
281 60 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
282 :
283 60 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
284 8 : break;
285 :
286 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
287 52 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
288 52 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
289 :
290 : /* Update appropriate flag */
291 52 : procInfo->inHeap = false;
292 :
293 52 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
294 0 : break;
295 : }
296 :
297 5604 : updateMinWaitedLSN(lsnType);
298 5604 : LWLockRelease(WaitLSNLock);
299 :
300 : /*
301 : * Set latches for processes whose waited LSNs have been reached.
302 : * Since SetLatch() is a time-consuming operation, we do this outside
303 : * of WaitLSNLock. This is safe because procLatch is never freed, so
304 : * at worst we may set a latch for the wrong process or for no process
305 : * at all, which is harmless.
306 : */
307 5656 : for (j = 0; j < numWakeUpProcs; j++)
308 52 : SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
309 :
310 5604 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
311 5604 : }
312 :
313 : /*
314 : * Wake up processes waiting for LSN to reach currentLSN
315 : */
316 : void
317 5604 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
318 : {
319 5604 : int i = (int) lsnType;
320 :
321 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
322 :
323 : /*
324 : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
325 : * "wake all waiters" (e.g., during promotion when recovery ends).
326 : */
327 5628 : if (XLogRecPtrIsValid(currentLSN) &&
328 24 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
329 0 : return;
330 :
331 5604 : wakeupWaiters(lsnType, currentLSN);
332 : }
333 :
334 : /*
335 : * Clean up LSN waiters for exiting process
336 : */
337 : void
338 88018 : WaitLSNCleanup(void)
339 : {
340 88018 : if (waitLSNState)
341 : {
342 : /*
343 : * We do a fast-path check of the inHeap flag without the lock. This
344 : * flag is set to true only by the process itself. So, it's only
345 : * possible to get a false positive. But that will be eliminated by a
346 : * recheck inside deleteLSNWaiter().
347 : */
348 88018 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
349 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
350 : }
351 88018 : }
352 :
353 : /*
354 : * Check if the given LSN type requires recovery to be in progress.
355 : * Standby wait types (replay, write, flush) require recovery;
356 : * primary wait types (flush) do not.
357 : */
358 : static inline bool
359 186 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
360 : {
361 116 : return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
362 302 : t == WAIT_LSN_TYPE_STANDBY_WRITE ||
363 : t == WAIT_LSN_TYPE_STANDBY_FLUSH;
364 : }
365 :
366 : /*
367 : * Wait using MyLatch till the given LSN is reached, the replica gets
368 : * promoted, or the postmaster dies.
369 : *
370 : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
371 : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
372 : * or replica got promoted before the target LSN reached.
373 : */
374 : WaitLSNResult
375 86 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
376 : {
377 : XLogRecPtr currentLSN;
378 86 : TimestampTz endtime = 0;
379 86 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
380 :
381 : /* Shouldn't be called when shmem isn't initialized */
382 : Assert(waitLSNState);
383 :
384 : /* Should have a valid proc number */
385 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
386 :
387 86 : if (timeout > 0)
388 : {
389 64 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
390 64 : wake_events |= WL_TIMEOUT;
391 : }
392 :
393 : /*
394 : * Add our process to the waiters heap. It might happen that target LSN
395 : * gets reached before we do. The check at the beginning of the loop
396 : * below prevents the race condition.
397 : */
398 86 : addLSNWaiter(targetLSN, lsnType);
399 :
400 : for (;;)
401 100 : {
402 : int rc;
403 186 : long delay_ms = -1;
404 :
405 : /* Get current LSN for the wait type */
406 186 : currentLSN = GetCurrentLSNForWaitType(lsnType);
407 :
408 : /* Check that recovery is still in-progress */
409 186 : if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
410 : {
411 : /*
412 : * Recovery was ended, but check if target LSN was already
413 : * reached.
414 : */
415 12 : deleteLSNWaiter(lsnType);
416 :
417 12 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
418 2 : return WAIT_LSN_RESULT_SUCCESS;
419 10 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
420 : }
421 : else
422 : {
423 : /* Check if the waited LSN has been reached */
424 174 : if (targetLSN <= currentLSN)
425 68 : break;
426 : }
427 :
428 106 : if (timeout > 0)
429 : {
430 80 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
431 80 : if (delay_ms <= 0)
432 6 : break;
433 : }
434 :
435 100 : CHECK_FOR_INTERRUPTS();
436 :
437 100 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
438 100 : WaitLSNWaitEvents[lsnType]);
439 :
440 : /*
441 : * Emergency bailout if postmaster has died. This is to avoid the
442 : * necessity for manual cleanup of all postmaster children.
443 : */
444 100 : if (rc & WL_POSTMASTER_DEATH)
445 0 : ereport(FATAL,
446 : errcode(ERRCODE_ADMIN_SHUTDOWN),
447 : errmsg("terminating connection due to unexpected postmaster exit"),
448 : errcontext("while waiting for LSN"));
449 :
450 100 : if (rc & WL_LATCH_SET)
451 94 : ResetLatch(MyLatch);
452 : }
453 :
454 : /*
455 : * Delete our process from the shared memory heap. We might already be
456 : * deleted by the startup process. The 'inHeap' flags prevents us from
457 : * the double deletion.
458 : */
459 74 : deleteLSNWaiter(lsnType);
460 :
461 : /*
462 : * If we didn't reach the target LSN, we must be exited by timeout.
463 : */
464 74 : if (targetLSN > currentLSN)
465 6 : return WAIT_LSN_RESULT_TIMEOUT;
466 :
467 68 : return WAIT_LSN_RESULT_SUCCESS;
468 : }
|