Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * xlogwait.c
4 : * Implements waiting for WAL operations to reach specific LSNs.
5 : *
6 : * Copyright (c) 2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/backend/access/transam/xlogwait.c
10 : *
11 : * NOTES
12 : * This file implements waiting for WAL operations to reach specific LSNs
13 : * on both physical standby and primary servers. The core idea is simple:
14 : * every process that wants to wait publishes the LSN it needs to the
15 : * shared memory, and the appropriate process (startup on standby, or
16 : * WAL writer/backend on primary) wakes it once that LSN has been reached.
17 : *
18 : * The shared memory used by this module comprises a procInfos
19 : * per-backend array with the information of the awaited LSN for each
20 : * of the backend processes. The elements of that array are organized
21 : * into a pairing heap waitersHeap, which allows for very fast finding
22 : * of the least awaited LSN.
23 : *
24 : * In addition, the least-awaited LSN is cached as minWaitedLSN. The
25 : * waiter process publishes information about itself to the shared
26 : * memory and waits on the latch until it is woken up by the appropriate
27 : * process, standby is promoted, or the postmaster dies. Then, it cleans
28 : * information about itself in the shared memory.
29 : *
30 : * On standby servers: After replaying a WAL record, the startup process
31 : * first performs a fast path check minWaitedLSN > replayLSN. If this
32 : * check is negative, it checks waitersHeap and wakes up the backend
33 : * whose awaited LSNs are reached.
34 : *
35 : * On primary servers: After flushing WAL, the WAL writer or backend
36 : * process performs a similar check against the flush LSN and wakes up
37 : * waiters whose target flush LSNs have been reached.
38 : *
39 : *-------------------------------------------------------------------------
40 : */
41 :
42 : #include "postgres.h"
43 :
44 : #include <float.h>
45 : #include <math.h>
46 :
47 : #include "access/xlog.h"
48 : #include "access/xlogrecovery.h"
49 : #include "access/xlogwait.h"
50 : #include "miscadmin.h"
51 : #include "pgstat.h"
52 : #include "storage/latch.h"
53 : #include "storage/proc.h"
54 : #include "storage/shmem.h"
55 : #include "utils/fmgrprotos.h"
56 : #include "utils/pg_lsn.h"
57 : #include "utils/snapmgr.h"
58 :
59 :
60 : static int waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
61 : void *arg);
62 :
63 : struct WaitLSNState *waitLSNState = NULL;
64 :
65 : /* Report the amount of shared memory space needed for WaitLSNState. */
66 : Size
67 6324 : WaitLSNShmemSize(void)
68 : {
69 : Size size;
70 :
71 6324 : size = offsetof(WaitLSNState, procInfos);
72 6324 : size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
73 6324 : return size;
74 : }
75 :
76 : /* Initialize the WaitLSNState in the shared memory. */
77 : void
78 2208 : WaitLSNShmemInit(void)
79 : {
80 : bool found;
81 :
82 2208 : waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
83 : WaitLSNShmemSize(),
84 : &found);
85 2208 : if (!found)
86 : {
87 : int i;
88 :
89 : /* Initialize heaps and tracking */
90 6624 : for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
91 : {
92 4416 : pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
93 4416 : pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
94 : }
95 :
96 : /* Initialize process info array */
97 2208 : memset(&waitLSNState->procInfos, 0,
98 2208 : (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
99 : }
100 2208 : }
101 :
102 : /*
103 : * Comparison function for LSN waiters heaps. Waiting processes are ordered by
104 : * LSN, so that the waiter with smallest LSN is at the top.
105 : */
106 : static int
107 16 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
108 : {
109 16 : const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
110 16 : const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
111 :
112 16 : if (aproc->waitLSN < bproc->waitLSN)
113 10 : return 1;
114 6 : else if (aproc->waitLSN > bproc->waitLSN)
115 6 : return -1;
116 : else
117 0 : return 0;
118 : }
119 :
120 : /*
121 : * Update minimum waited LSN for the specified LSN type
122 : */
123 : static void
124 1872 : updateMinWaitedLSN(WaitLSNType lsnType)
125 : {
126 1872 : XLogRecPtr minWaitedLSN = PG_UINT64_MAX;
127 1872 : int i = (int) lsnType;
128 :
129 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
130 :
131 1872 : if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
132 : {
133 42 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
134 42 : WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
135 :
136 42 : minWaitedLSN = procInfo->waitLSN;
137 : }
138 1872 : pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
139 1872 : }
140 :
141 : /*
142 : * Add current process to appropriate waiters heap based on LSN type
143 : */
144 : static void
145 34 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
146 : {
147 34 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
148 34 : int i = (int) lsnType;
149 :
150 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
151 :
152 34 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
153 :
154 34 : procInfo->procno = MyProcNumber;
155 34 : procInfo->waitLSN = lsn;
156 34 : procInfo->lsnType = lsnType;
157 :
158 : Assert(!procInfo->inHeap);
159 34 : pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
160 34 : procInfo->inHeap = true;
161 34 : updateMinWaitedLSN(lsnType);
162 :
163 34 : LWLockRelease(WaitLSNLock);
164 34 : }
165 :
166 : /*
167 : * Remove current process from appropriate waiters heap based on LSN type
168 : */
169 : static void
170 34 : deleteLSNWaiter(WaitLSNType lsnType)
171 : {
172 34 : WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
173 34 : int i = (int) lsnType;
174 :
175 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
176 :
177 34 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
178 :
179 : Assert(procInfo->lsnType == lsnType);
180 :
181 34 : if (procInfo->inHeap)
182 : {
183 18 : pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
184 18 : procInfo->inHeap = false;
185 18 : updateMinWaitedLSN(lsnType);
186 : }
187 :
188 34 : LWLockRelease(WaitLSNLock);
189 34 : }
190 :
191 : /*
192 : * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
193 : * on the stack. It should be enough to take single iteration for most cases.
194 : */
195 : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
196 :
197 : /*
198 : * Remove waiters whose LSN has been reached from the heap and set their
199 : * latches. If InvalidXLogRecPtr is given, remove all waiters from the heap
200 : * and set latches for all waiters.
201 : *
202 : * This function first accumulates waiters to wake up into an array, then
203 : * wakes them up without holding a WaitLSNLock. The array size is static and
204 : * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE. That should be more than enough
205 : * to wake up all the waiters at once in the vast majority of cases. However,
206 : * if there are more waiters, this function will loop to process them in
207 : * multiple chunks.
208 : */
209 : static void
210 1820 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
211 : {
212 : ProcNumber wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
213 : int numWakeUpProcs;
214 1820 : int i = (int) lsnType;
215 :
216 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
217 :
218 : do
219 : {
220 1820 : numWakeUpProcs = 0;
221 1820 : LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
222 :
223 : /*
224 : * Iterate the waiters heap until we find LSN not yet reached. Record
225 : * process numbers to wake up, but send wakeups after releasing lock.
226 : */
227 1836 : while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
228 : {
229 24 : pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
230 : WaitLSNProcInfo *procInfo;
231 :
232 : /* Get procInfo using appropriate heap node */
233 24 : procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
234 :
235 24 : if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
236 8 : break;
237 :
238 : Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
239 16 : wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
240 16 : (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
241 :
242 : /* Update appropriate flag */
243 16 : procInfo->inHeap = false;
244 :
245 16 : if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
246 0 : break;
247 : }
248 :
249 1820 : updateMinWaitedLSN(lsnType);
250 1820 : LWLockRelease(WaitLSNLock);
251 :
252 : /*
253 : * Set latches for processes whose waited LSNs have been reached.
254 : * Since SetLatch() is a time-consuming operation, we do this outside
255 : * of WaitLSNLock. This is safe because procLatch is never freed, so
256 : * at worst we may set a latch for the wrong process or for no process
257 : * at all, which is harmless.
258 : */
259 1836 : for (i = 0; i < numWakeUpProcs; i++)
260 16 : SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
261 :
262 1820 : } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
263 1820 : }
264 :
265 : /*
266 : * Wake up processes waiting for LSN to reach currentLSN
267 : */
268 : void
269 1820 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
270 : {
271 1820 : int i = (int) lsnType;
272 :
273 : Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
274 :
275 : /*
276 : * Fast path check. Skip if currentLSN is InvalidXLogRecPtr, which means
277 : * "wake all waiters" (e.g., during promotion when recovery ends).
278 : */
279 1834 : if (XLogRecPtrIsValid(currentLSN) &&
280 14 : pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
281 0 : return;
282 :
283 1820 : wakeupWaiters(lsnType, currentLSN);
284 : }
285 :
286 : /*
287 : * Clean up LSN waiters for exiting process
288 : */
289 : void
290 86606 : WaitLSNCleanup(void)
291 : {
292 86606 : if (waitLSNState)
293 : {
294 : /*
295 : * We do a fast-path check of the inHeap flag without the lock. This
296 : * flag is set to true only by the process itself. So, it's only
297 : * possible to get a false positive. But that will be eliminated by a
298 : * recheck inside deleteLSNWaiter().
299 : */
300 86606 : if (waitLSNState->procInfos[MyProcNumber].inHeap)
301 0 : deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
302 : }
303 86606 : }
304 :
305 : /*
306 : * Wait using MyLatch till the given LSN is reached, the replica gets
307 : * promoted, or the postmaster dies.
308 : *
309 : * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
310 : * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
311 : * or replica got promoted before the target LSN reached.
312 : */
313 : WaitLSNResult
314 34 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
315 : {
316 : XLogRecPtr currentLSN;
317 34 : TimestampTz endtime = 0;
318 34 : int wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
319 :
320 : /* Shouldn't be called when shmem isn't initialized */
321 : Assert(waitLSNState);
322 :
323 : /* Should have a valid proc number */
324 : Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
325 :
326 34 : if (timeout > 0)
327 : {
328 16 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
329 16 : wake_events |= WL_TIMEOUT;
330 : }
331 :
332 : /*
333 : * Add our process to the waiters heap. It might happen that target LSN
334 : * gets reached before we do. The check at the beginning of the loop
335 : * below prevents the race condition.
336 : */
337 34 : addLSNWaiter(targetLSN, lsnType);
338 :
339 : for (;;)
340 24 : {
341 : int rc;
342 58 : long delay_ms = -1;
343 :
344 58 : if (lsnType == WAIT_LSN_TYPE_REPLAY)
345 58 : currentLSN = GetXLogReplayRecPtr(NULL);
346 : else
347 0 : currentLSN = GetFlushRecPtr(NULL);
348 :
349 : /* Check that recovery is still in-progress */
350 58 : if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
351 : {
352 : /*
353 : * Recovery was ended, but check if target LSN was already
354 : * reached.
355 : */
356 8 : deleteLSNWaiter(lsnType);
357 :
358 8 : if (PromoteIsTriggered() && targetLSN <= currentLSN)
359 2 : return WAIT_LSN_RESULT_SUCCESS;
360 6 : return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
361 : }
362 : else
363 : {
364 : /* Check if the waited LSN has been reached */
365 50 : if (targetLSN <= currentLSN)
366 20 : break;
367 : }
368 :
369 30 : if (timeout > 0)
370 : {
371 14 : delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
372 14 : if (delay_ms <= 0)
373 6 : break;
374 : }
375 :
376 24 : CHECK_FOR_INTERRUPTS();
377 :
378 24 : rc = WaitLatch(MyLatch, wake_events, delay_ms,
379 : (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
380 :
381 : /*
382 : * Emergency bailout if postmaster has died. This is to avoid the
383 : * necessity for manual cleanup of all postmaster children.
384 : */
385 24 : if (rc & WL_POSTMASTER_DEATH)
386 0 : ereport(FATAL,
387 : errcode(ERRCODE_ADMIN_SHUTDOWN),
388 : errmsg("terminating connection due to unexpected postmaster exit"),
389 : errcontext("while waiting for LSN"));
390 :
391 24 : if (rc & WL_LATCH_SET)
392 18 : ResetLatch(MyLatch);
393 : }
394 :
395 : /*
396 : * Delete our process from the shared memory heap. We might already be
397 : * deleted by the startup process. The 'inHeap' flags prevents us from
398 : * the double deletion.
399 : */
400 26 : deleteLSNWaiter(lsnType);
401 :
402 : /*
403 : * If we didn't reach the target LSN, we must be exited by timeout.
404 : */
405 26 : if (targetLSN > currentLSN)
406 6 : return WAIT_LSN_RESULT_TIMEOUT;
407 :
408 20 : return WAIT_LSN_RESULT_SUCCESS;
409 : }
|