Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogwait.c
4 : * Implements waiting for WAL operations to reach specific LSNs.
5 : *
6 : * Copyright (c) 2025-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/access/transam/xlogwait.c
10 : *
11 : * NOTES
12 : * This file implements waiting for WAL operations to reach specific LSNs
13 : * on both physical standby and primary servers. The core idea is simple:
14 : * every process that wants to wait publishes the LSN it needs to the
15 : * shared memory, and the appropriate process (startup on standby,
16 : * walreceiver on standby, or WAL writer/backend on primary) wakes it
17 : * once that LSN has been reached.
18 : *
19 : * The shared memory used by this module comprises a procInfos
20 : * per-backend array with the information of the awaited LSN for each
21 : * of the backend processes. The elements of that array are organized
22 : * into pairing heaps (waitersHeap), one for each WaitLSNType, which
23 : * allows for very fast finding of the least awaited LSN for each type.
24 : *
25 : * In addition, the least-awaited LSN for each type is cached in the
26 : * minWaitedLSN array. The waiter process publishes information about
27 : * itself to the shared memory and waits on the latch until it is woken
28 : * up by the appropriate process, standby is promoted, or the postmaster
29 : * dies. Then, it cleans information about itself in the shared memory.
30 : *
31 : * On standby servers:
32 : * - After replaying a WAL record, the startup process performs a fast
33 : * path check minWaitedLSN[REPLAY] > replayLSN. If this check is
34 : * negative, it checks waitersHeap[REPLAY] and wakes up the backends
35 : * whose awaited LSNs are reached.
36 : * - After receiving WAL, the walreceiver process performs similar checks
37 : * against the flush and write LSNs, waking up waiters in the FLUSH
38 : * and WRITE heaps, respectively.
39 : *
40 : * On primary servers: After flushing WAL, the WAL writer or backend
41 : * process performs a similar check against the flush LSN and wakes up
42 : * waiters whose target flush LSNs have been reached.
43 : *
44 : *-------------------------------------------------------------------------
45 : */
46 :
47 : #include "postgres.h"
48 :
49 : #include <float.h>
50 :
51 : #include "access/xlog.h"
52 : #include "access/xlogrecovery.h"
53 : #include "access/xlogwait.h"
54 : #include "miscadmin.h"
55 : #include "pgstat.h"
56 : #include "replication/walreceiver.h"
57 : #include "storage/latch.h"
58 : #include "storage/proc.h"
59 : #include "storage/shmem.h"
60 : #include "utils/fmgrprotos.h"
61 : #include "utils/pg_lsn.h"
62 : #include "utils/snapmgr.h"
63 : #include "utils/wait_event.h"
64 :
65 :
66 : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
67 : void *arg);
68 :
69 : struct WaitLSNState *waitLSNState = NULL;
70 :
71 : /*
72 : * Wait event for each WaitLSNType, used with WaitLatch() to report
73 : * the wait in pg_stat_activity.
74 : */
75 : static const uint32 WaitLSNWaitEvents[] = {
76 : [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
77 : [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
78 : [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
79 : [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
80 : };
81 :
82 : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
83 : "WaitLSNWaitEvents must match WaitLSNType enum");
84 :
85 : /*
86 : * Get the current LSN for the specified wait type.
87 : */
88 : XLogRecPtr
89 95 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
90 : {
91 : Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
92 :
93 95 : switch (lsnType)
94 : {
95 36 : case WAIT_LSN_TYPE_STANDBY_REPLAY:
96 36 : return GetXLogReplayRecPtr(NULL);
97 :
98 26 : case WAIT_LSN_TYPE_STANDBY_WRITE:
99 26 : return GetWalRcvWriteRecPtr();
100 :
101 27 : case WAIT_LSN_TYPE_STANDBY_FLUSH:
102 27 : return GetWalRcvFlushRecPtr(NULL, NULL);
103 :
104 6 : case WAIT_LSN_TYPE_PRIMARY_FLUSH:
105 6 : return GetFlushRecPtr(NULL);
106 : }
107 :
108 0 : elog(ERROR, "invalid LSN wait type: %d", lsnType);
109 : pg_unreachable();
110 : }
111 :
112 : /* Report the amount of shared memory space needed for WaitLSNState. */
113 : Size
114 3387 : WaitLSNShmemSize(void)
115 : {
116 : Size size;
117 :
118 3387 : size = offsetof(WaitLSNState, procInfos);
119 3387 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
120 3387 : return size;
121 : }
122 :
123 : /* Initialize the WaitLSNState in the shared memory. */
124 : void
125 1180 : WaitLSNShmemInit(void)
126 : {
127 : bool found;
128 :
129 1180 : waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
130 : WaitLSNShmemSize(),
131 : &found);
132 1180 : if (!found)
133 : {
134 : int i;
135 :
136 : /* Initialize heaps and tracking */
137 5900 : for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
138 : {
139 4720 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
140 4720 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
141 : }
142 :
143 : /* Initialize process info array */
144 1180 : memset(&waitLSNState->procInfos, 0,
145 1180 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
146 : }
147 1180 : }
148 :
149 : /*
150 : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
151 : * LSN, so that the waiter with smallest LSN is at the top.
152 : */
153 : static int
154 27 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
155 : {
156 27 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
157 27 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
158 :
159 27 : if (aproc->waitLSN < bproc->waitLSN)
160 15 : return 1;
161 12 : else if (aproc->waitLSN > bproc->waitLSN)
162 9 : return -1;
163 : else
164 3 : return 0;
165 : }
166 :
167 : /*
168 : * Update minimum waited LSN for the specified LSN type
169 : */
170 : static void
171 2982 : updateMinWaitedLSN(WaitLSNType lsnType)
172 : {
173 2982 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
174 2982 : int i = (int) lsnType;
175 :
176 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
177 :
178 2982 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
179 : {
180 47 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
181 47 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
182 :
183 47 : minWaitedLSN = procInfo->waitLSN;
184 : }
185 2982 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
186 2982 : }
187 :
188 : /*
189 : * Add current process to appropriate waiters heap based on LSN type
190 : */
191 : static void
192 43 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
193 : {
194 43 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
195 43 : int i = (int) lsnType;
196 :
197 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
198 :
199 43 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
200 :
201 43 : procInfo->procno = MyProcNumber;
202 43 : procInfo->waitLSN = lsn;
203 43 : procInfo->lsnType = lsnType;
204 :
205 : Assert(!procInfo->inHeap);
206 43 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
207 43 : procInfo->inHeap = true;
208 43 : updateMinWaitedLSN(lsnType);
209 :
210 43 : LWLockRelease(WaitLSNLock);
211 43 : }
212 :
213 : /*
214 : * Remove current process from appropriate waiters heap based on LSN type
215 : */
216 : static void
217 43 : deleteLSNWaiter(WaitLSNType lsnType)
218 : {
219 43 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
220 43 : int i = (int) lsnType;
221 :
222 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
223 :
224 43 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
225 :
226 : Assert(procInfo->lsnType == lsnType);
227 :
228 43 : if (procInfo->inHeap)
229 : {
230 17 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
231 17 : procInfo->inHeap = false;
232 17 : updateMinWaitedLSN(lsnType);
233 : }
234 :
235 43 : LWLockRelease(WaitLSNLock);
236 43 : }
237 :
238 : /*
239 : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
240 : * on the stack. It should be enough to take single iteration for most cases.
241 : */
242 : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
243 :
244 : /*
245 : * Remove waiters whose LSN has been reached from the heap and set their
246 : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
247 : * and set latches for all waiters.
248 : *
249 : * This function first accumulates waiters to wake up into an array, then
250 : * wakes them up without holding a WaitLSNLock. The array size is static and
251 : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
252 : * to wake up all the waiters at once in the vast majority of cases. However,
253 : * if there are more waiters, this function will loop to process them in
254 : * multiple chunks.
255 : */
256 : static void
257 2922 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
258 : {
259 : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
260 : int numWakeUpProcs;
261 2922 : int i = (int) lsnType;
262 :
263 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
264 :
265 : do
266 : {
267 : int j;
268 :
269 2922 : numWakeUpProcs = 0;
270 2922 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
271 :
272 : /*
273 : * Iterate the waiters heap until we find LSN not yet reached. Record
274 : * process numbers to wake up, but send wakeups after releasing lock.
275 : */
276 2948 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
277 : {
278 30 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
279 : WaitLSNProcInfo *procInfo;
280 :
281 : /* Get procInfo using appropriate heap node */
282 30 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
283 :
284 30 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
285 4 : break;
286 :
287 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
288 26 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
289 26 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
290 :
291 : /* Update appropriate flag */
292 26 : procInfo->inHeap = false;
293 :
294 26 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
295 0 : break;
296 : }
297 :
298 2922 : updateMinWaitedLSN(lsnType);
299 2922 : LWLockRelease(WaitLSNLock);
300 :
301 : /*
302 : * Set latches for processes whose waited LSNs have been reached.
303 : * Since SetLatch() is a time-consuming operation, we do this outside
304 : * of WaitLSNLock. This is safe because procLatch is never freed, so
305 : * at worst we may set a latch for the wrong process or for no process
306 : * at all, which is harmless.
307 : */
308 2948 : for (j = 0; j < numWakeUpProcs; j++)
309 26 : SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
310 :
311 2922 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
312 2922 : }
313 :
314 : /*
315 : * Wake up processes waiting for LSN to reach currentLSN
316 : */
317 : void
318 2922 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
319 : {
320 2922 : int i = (int) lsnType;
321 :
322 : Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
323 :
324 : /*
325 : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
326 : * "wake all waiters" (e.g., during promotion when recovery ends).
327 : */
328 2934 : if (XLogRecPtrIsValid(currentLSN) &&
329 12 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
330 0 : return;
331 :
332 2922 : wakeupWaiters(lsnType, currentLSN);
333 : }
334 :
335 : /*
336 : * Clean up LSN waiters for exiting process
337 : */
338 : void
339 53780 : WaitLSNCleanup(void)
340 : {
341 53780 : if (waitLSNState)
342 : {
343 : /*
344 : * We do a fast-path check of the inHeap flag without the lock. This
345 : * flag is set to true only by the process itself. So, it's only
346 : * possible to get a false positive. But that will be eliminated by a
347 : * recheck inside deleteLSNWaiter().
348 : */
349 53780 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
350 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
351 : }
352 53780 : }
353 :
354 : /*
355 : * Check if the given LSN type requires recovery to be in progress.
356 : * Standby wait types (replay, write, flush) require recovery;
357 : * primary wait types (flush) do not.
358 : */
359 : static inline bool
360 91 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
361 : {
362 57 : return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
363 148 : t == WAIT_LSN_TYPE_STANDBY_WRITE ||
364 : t == WAIT_LSN_TYPE_STANDBY_FLUSH;
365 : }
366 :
367 : /*
368 : * Wait using MyLatch till the given LSN is reached, the replica gets
369 : * promoted, or the postmaster dies.
370 : *
371 : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
372 : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
373 : * or replica got promoted before the target LSN reached.
374 : */
375 : WaitLSNResult
376 43 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
377 : {
378 : XLogRecPtr currentLSN;
379 43 : TimestampTz endtime = 0;
380 43 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
381 :
382 : /* Shouldn't be called when shmem isn't initialized */
383 : Assert(waitLSNState);
384 :
385 : /* Should have a valid proc number */
386 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
387 :
388 43 : if (timeout > 0)
389 : {
390 32 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
391 32 : wake_events |= WL_TIMEOUT;
392 : }
393 :
394 : /*
395 : * Add our process to the waiters heap. It might happen that target LSN
396 : * gets reached before we do. The check at the beginning of the loop
397 : * below prevents the race condition.
398 : */
399 43 : addLSNWaiter(targetLSN, lsnType);
400 :
401 : for (;;)
402 48 : {
403 : int rc;
404 91 : long delay_ms = -1;
405 :
406 : /* Get current LSN for the wait type */
407 91 : currentLSN = GetCurrentLSNForWaitType(lsnType);
408 :
409 : /* Check that recovery is still in-progress */
410 91 : if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
411 : {
412 : /*
413 : * Recovery was ended, but check if target LSN was already
414 : * reached.
415 : */
416 6 : deleteLSNWaiter(lsnType);
417 :
418 6 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
419 1 : return WAIT_LSN_RESULT_SUCCESS;
420 5 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
421 : }
422 : else
423 : {
424 : /* Check if the waited LSN has been reached */
425 85 : if (targetLSN <= currentLSN)
426 34 : break;
427 : }
428 :
429 51 : if (timeout > 0)
430 : {
431 39 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
432 39 : if (delay_ms <= 0)
433 3 : break;
434 : }
435 :
436 48 : CHECK_FOR_INTERRUPTS();
437 :
438 48 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
439 48 : WaitLSNWaitEvents[lsnType]);
440 :
441 : /*
442 : * Emergency bailout if postmaster has died. This is to avoid the
443 : * necessity for manual cleanup of all postmaster children.
444 : */
445 48 : if (rc & WL_POSTMASTER_DEATH)
446 0 : ereport(FATAL,
447 : errcode(ERRCODE_ADMIN_SHUTDOWN),
448 : errmsg("terminating connection due to unexpected postmaster exit"),
449 : errcontext("while waiting for LSN"));
450 :
451 48 : if (rc & WL_LATCH_SET)
452 45 : ResetLatch(MyLatch);
453 : }
454 :
455 : /*
456 : * Delete our process from the shared memory heap. We might already be
457 : * deleted by the startup process. The 'inHeap' flags prevents us from
458 : * the double deletion.
459 : */
460 37 : deleteLSNWaiter(lsnType);
461 :
462 : /*
463 : * If we didn't reach the target LSN, we must be exited by timeout.
464 : */
465 37 : if (targetLSN > currentLSN)
466 3 : return WAIT_LSN_RESULT_TIMEOUT;
467 :
468 34 : return WAIT_LSN_RESULT_SUCCESS;
469 : }
|