LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogwait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 96.1 % 128 123
Test Date: 2026-04-16 15:16:24 Functions: 100.0 % 12 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogwait.c
       4              :  *    Implements waiting for WAL operations to reach specific LSNs.
       5              :  *
       6              :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/access/transam/xlogwait.c
      10              :  *
      11              :  * NOTES
      12              :  *      This file implements waiting for WAL operations to reach specific LSNs
      13              :  *      on both physical standby and primary servers. The core idea is simple:
      14              :  *      every process that wants to wait publishes the LSN it needs to the
      15              :  *      shared memory, and the appropriate process (startup on standby,
      16              :  *      walreceiver on standby, or WAL writer/backend on primary) wakes it
      17              :  *      once that LSN has been reached.
      18              :  *
      19              :  *      The shared memory used by this module comprises a procInfos
      20              :  *      per-backend array with the information of the awaited LSN for each
      21              :  *      of the backend processes.  The elements of that array are organized
      22              :  *      into pairing heaps (waitersHeap), one for each WaitLSNType, which
      23              :  *      allows for very fast finding of the least awaited LSN for each type.
      24              :  *
      25              :  *      In addition, the least-awaited LSN for each type is cached in the
      26              :  *      minWaitedLSN array.  The waiter process publishes information about
      27              :  *      itself to the shared memory and waits on the latch until it is woken
      28              :  *      up by the appropriate process, standby is promoted, or the postmaster
      29              :  *      dies.  Then, it cleans information about itself in the shared memory.
      30              :  *
      31              :  *      On standby servers:
      32              :  *      - After replaying a WAL record, the startup process performs a fast
      33              :  *        path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
      34              :  *        negative, it checks waitersHeap[REPLAY] and wakes up the backends
      35              :  *        whose awaited LSNs are reached.
      36              :  *      - After receiving WAL, the walreceiver process performs similar checks
      37              :  *        against the flush and write LSNs, waking up waiters in the FLUSH
      38              :  *        and WRITE heaps, respectively.
      39              :  *
      40              :  *      On primary servers: After flushing WAL, the WAL writer or backend
      41              :  *      process performs a similar check against the flush LSN and wakes up
      42              :  *      waiters whose target flush LSNs have been reached.
      43              :  *
      44              :  *-------------------------------------------------------------------------
      45              :  */
      46              : 
      47              : #include "postgres.h"
      48              : 
      49              : #include <float.h>
      50              : 
      51              : #include "access/xlog.h"
      52              : #include "access/xlogrecovery.h"
      53              : #include "access/xlogwait.h"
      54              : #include "miscadmin.h"
      55              : #include "pgstat.h"
      56              : #include "replication/walreceiver.h"
      57              : #include "storage/latch.h"
      58              : #include "storage/proc.h"
      59              : #include "storage/shmem.h"
      60              : #include "storage/subsystems.h"
      61              : #include "utils/fmgrprotos.h"
      62              : #include "utils/pg_lsn.h"
      63              : #include "utils/snapmgr.h"
      64              : #include "utils/wait_event.h"
      65              : 
      66              : 
      67              : static int  waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
      68              :                         void *arg);
      69              : 
      70              : struct WaitLSNState *waitLSNState = NULL;
      71              : 
      72              : static void WaitLSNShmemRequest(void *arg);
      73              : static void WaitLSNShmemInit(void *arg);
      74              : 
      75              : const ShmemCallbacks WaitLSNShmemCallbacks = {
      76              :     .request_fn = WaitLSNShmemRequest,
      77              :     .init_fn = WaitLSNShmemInit,
      78              : };
      79              : 
      80              : /*
      81              :  * Wait event for each WaitLSNType, used with WaitLatch() to report
      82              :  * the wait in pg_stat_activity.
      83              :  */
      84              : static const uint32 WaitLSNWaitEvents[] = {
      85              :     [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
      86              :     [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
      87              :     [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      88              :     [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      89              : };
      90              : 
      91              : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
      92              :                  "WaitLSNWaitEvents must match WaitLSNType enum");
      93              : 
      94              : /*
      95              :  * Get the current LSN for the specified wait type.
      96              :  */
      97              : XLogRecPtr
      98          304 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
      99              : {
     100              :     Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
     101              : 
     102          304 :     switch (lsnType)
     103              :     {
     104          203 :         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     105          203 :             return GetXLogReplayRecPtr(NULL);
     106              : 
     107           43 :         case WAIT_LSN_TYPE_STANDBY_WRITE:
     108           43 :             return GetWalRcvWriteRecPtr();
     109              : 
     110           27 :         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     111           27 :             return GetWalRcvFlushRecPtr(NULL, NULL);
     112              : 
     113           31 :         case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     114           31 :             return GetFlushRecPtr(NULL);
     115              :     }
     116              : 
     117            0 :     elog(ERROR, "invalid LSN wait type: %d", lsnType);
     118              :     pg_unreachable();
     119              : }
     120              : 
     121              : /* Register the shared memory space needed for WaitLSNState. */
     122              : static void
     123         1238 : WaitLSNShmemRequest(void *arg)
     124              : {
     125              :     Size        size;
     126              : 
     127         1238 :     size = offsetof(WaitLSNState, procInfos);
     128         1238 :     size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
     129         1238 :     ShmemRequestStruct(.name = "WaitLSNState",
     130              :                        .size = size,
     131              :                        .ptr = (void **) &waitLSNState,
     132              :         );
     133         1238 : }
     134              : 
     135              : /* Initialize the WaitLSNState in the shared memory. */
     136              : static void
     137         1235 : WaitLSNShmemInit(void *arg)
     138              : {
     139              :     /* Initialize heaps and tracking */
     140         6175 :     for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
     141              :     {
     142         4940 :         pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
     143         4940 :         pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
     144              :     }
     145              : 
     146              :     /* Initialize process info array */
     147         1235 :     memset(&waitLSNState->procInfos, 0,
     148         1235 :            (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
     149         1235 : }
     150              : 
     151              : /*
     152              :  * Comparison function for LSN waiters heaps. Waiting processes are ordered by
     153              :  * LSN, so that the waiter with smallest LSN is at the top.
     154              :  */
     155              : static int
     156           28 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
     157              : {
     158           28 :     const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
     159           28 :     const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
     160              : 
     161           28 :     if (aproc->waitLSN < bproc->waitLSN)
     162           15 :         return 1;
     163           13 :     else if (aproc->waitLSN > bproc->waitLSN)
     164           10 :         return -1;
     165              :     else
     166            3 :         return 0;
     167              : }
     168              : 
     169              : /*
     170              :  * Update minimum waited LSN for the specified LSN type
     171              :  */
     172              : static void
     173         3473 : updateMinWaitedLSN(WaitLSNType lsnType)
     174              : {
     175         3473 :     XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
     176         3473 :     int         i = (int) lsnType;
     177              : 
     178              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     179              : 
     180         3473 :     if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     181              :     {
     182          232 :         pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     183          232 :         WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     184              : 
     185          232 :         minWaitedLSN = procInfo->waitLSN;
     186              :     }
     187         3473 :     pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
     188         3473 : }
     189              : 
     190              : /*
     191              :  * Add current process to appropriate waiters heap based on LSN type
     192              :  */
     193              : static void
     194          227 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
     195              : {
     196          227 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     197          227 :     int         i = (int) lsnType;
     198              : 
     199              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     200              : 
     201          227 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     202              : 
     203          227 :     procInfo->procno = MyProcNumber;
     204          227 :     procInfo->waitLSN = lsn;
     205          227 :     procInfo->lsnType = lsnType;
     206              : 
     207              :     Assert(!procInfo->inHeap);
     208          227 :     pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     209          227 :     procInfo->inHeap = true;
     210          227 :     updateMinWaitedLSN(lsnType);
     211              : 
     212          227 :     LWLockRelease(WaitLSNLock);
     213          227 : }
     214              : 
     215              : /*
     216              :  * Remove current process from appropriate waiters heap based on LSN type
     217              :  */
     218              : static void
     219          227 : deleteLSNWaiter(WaitLSNType lsnType)
     220              : {
     221          227 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     222          227 :     int         i = (int) lsnType;
     223              : 
     224              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     225              : 
     226          227 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     227              : 
     228              :     Assert(procInfo->lsnType == lsnType);
     229              : 
     230          227 :     if (procInfo->inHeap)
     231              :     {
     232          182 :         pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     233          182 :         procInfo->inHeap = false;
     234          182 :         updateMinWaitedLSN(lsnType);
     235              :     }
     236              : 
     237          227 :     LWLockRelease(WaitLSNLock);
     238          227 : }
     239              : 
     240              : /*
     241              :  * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
     242              :  * on the stack.  It should be enough to take single iteration for most cases.
     243              :  */
     244              : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
     245              : 
     246              : /*
     247              :  * Remove waiters whose LSN has been reached from the heap and set their
     248              :  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
     249              :  * and set latches for all waiters.
     250              :  *
     251              :  * This function first accumulates waiters to wake up into an array, then
     252              :  * wakes them up without holding a WaitLSNLock.  The array size is static and
     253              :  * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
     254              :  * to wake up all the waiters at once in the vast majority of cases.  However,
     255              :  * if there are more waiters, this function will loop to process them in
     256              :  * multiple chunks.
     257              :  */
     258              : static void
     259         3064 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
     260              : {
     261              :     ProcNumber  wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
     262              :     int         numWakeUpProcs;
     263         3064 :     int         i = (int) lsnType;
     264              : 
     265              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     266              : 
     267              :     do
     268              :     {
     269              :         int         j;
     270              : 
     271         3064 :         numWakeUpProcs = 0;
     272         3064 :         LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     273              : 
     274              :         /*
     275              :          * Iterate the waiters heap until we find LSN not yet reached. Record
     276              :          * process numbers to wake up, but send wakeups after releasing lock.
     277              :          */
     278         3109 :         while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     279              :         {
     280           49 :             pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     281              :             WaitLSNProcInfo *procInfo;
     282              : 
     283              :             /* Get procInfo using appropriate heap node */
     284           49 :             procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     285              : 
     286           49 :             if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
     287            4 :                 break;
     288              : 
     289              :             Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
     290           45 :             wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
     291           45 :             (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
     292              : 
     293              :             /* Update appropriate flag */
     294           45 :             procInfo->inHeap = false;
     295              : 
     296           45 :             if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
     297            0 :                 break;
     298              :         }
     299              : 
     300         3064 :         updateMinWaitedLSN(lsnType);
     301         3064 :         LWLockRelease(WaitLSNLock);
     302              : 
     303              :         /*
     304              :          * Set latches for processes whose waited LSNs have been reached.
     305              :          * Since SetLatch() is a time-consuming operation, we do this outside
     306              :          * of WaitLSNLock. This is safe because procLatch is never freed, so
     307              :          * at worst we may set a latch for the wrong process or for no process
     308              :          * at all, which is harmless.
     309              :          */
     310         3109 :         for (j = 0; j < numWakeUpProcs; j++)
     311           45 :             SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
     312              : 
     313         3064 :     } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
     314         3064 : }
     315              : 
     316              : /*
     317              :  * Wake up processes waiting for LSN to reach currentLSN
     318              :  */
     319              : void
     320         3064 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
     321              : {
     322         3064 :     int         i = (int) lsnType;
     323              : 
     324              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     325              : 
     326              :     /*
     327              :      * Fast path check.  Skip if currentLSN is InvalidXLogRecPtr, which means
     328              :      * "wake all waiters" (e.g., during promotion when recovery ends).
     329              :      */
     330         3095 :     if (XLogRecPtrIsValid(currentLSN) &&
     331           31 :         pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
     332            0 :         return;
     333              : 
     334         3064 :     wakeupWaiters(lsnType, currentLSN);
     335              : }
     336              : 
     337              : /*
     338              :  * Clean up LSN waiters for exiting process
     339              :  */
     340              : void
     341        55327 : WaitLSNCleanup(void)
     342              : {
     343        55327 :     if (waitLSNState)
     344              :     {
     345              :         /*
     346              :          * We do a fast-path check of the inHeap flag without the lock.  This
     347              :          * flag is set to true only by the process itself.  So, it's only
     348              :          * possible to get a false positive.  But that will be eliminated by a
     349              :          * recheck inside deleteLSNWaiter().
     350              :          */
     351        55327 :         if (waitLSNState->procInfos[MyProcNumber].inHeap)
     352            0 :             deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
     353              :     }
     354        55327 : }
     355              : 
     356              : /*
     357              :  * Check if the given LSN type requires recovery to be in progress.
     358              :  * Standby wait types (replay, write, flush) require recovery;
     359              :  * primary wait types (flush) do not.
     360              :  */
     361              : static inline bool
     362          300 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
     363              : {
     364           99 :     return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
     365          399 :         t == WAIT_LSN_TYPE_STANDBY_WRITE ||
     366              :         t == WAIT_LSN_TYPE_STANDBY_FLUSH;
     367              : }
     368              : 
     369              : /*
     370              :  * Wait using MyLatch till the given LSN is reached, the replica gets
     371              :  * promoted, or the postmaster dies.
     372              :  *
     373              :  * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
     374              :  * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
     375              :  * or replica got promoted before the target LSN reached.
     376              :  */
     377              : WaitLSNResult
     378          227 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
     379              : {
     380              :     XLogRecPtr  currentLSN;
     381          227 :     TimestampTz endtime = 0;
     382          227 :     int         wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
     383              : 
     384              :     /* Shouldn't be called when shmem isn't initialized */
     385              :     Assert(waitLSNState);
     386              : 
     387              :     /* Should have a valid proc number */
     388              :     Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
     389              : 
     390          227 :     if (timeout > 0)
     391              :     {
     392          216 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
     393          216 :         wake_events |= WL_TIMEOUT;
     394              :     }
     395              : 
     396              :     /*
     397              :      * Add our process to the waiters heap.  It might happen that target LSN
     398              :      * gets reached before we do.  The check at the beginning of the loop
     399              :      * below prevents the race condition.
     400              :      */
     401          227 :     addLSNWaiter(targetLSN, lsnType);
     402              : 
     403              :     for (;;)
     404           73 :     {
     405              :         int         rc;
     406          300 :         long        delay_ms = -1;
     407              : 
     408              :         /* Get current LSN for the wait type */
     409          300 :         currentLSN = GetCurrentLSNForWaitType(lsnType);
     410              : 
     411              :         /* Check that recovery is still in-progress */
     412          300 :         if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
     413              :         {
     414              :             /*
     415              :              * Recovery was ended, but check if target LSN was already
     416              :              * reached.
     417              :              */
     418            6 :             deleteLSNWaiter(lsnType);
     419              : 
     420            6 :             if (PromoteIsTriggered() && targetLSN <= currentLSN)
     421            1 :                 return WAIT_LSN_RESULT_SUCCESS;
     422            5 :             return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
     423              :         }
     424              :         else
     425              :         {
     426              :             /* Check if the waited LSN has been reached */
     427          294 :             if (targetLSN <= currentLSN)
     428          215 :                 break;
     429              :         }
     430              : 
     431           79 :         if (timeout > 0)
     432              :         {
     433           67 :             delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
     434           67 :             if (delay_ms <= 0)
     435            6 :                 break;
     436              :         }
     437              : 
     438           73 :         CHECK_FOR_INTERRUPTS();
     439              : 
     440           73 :         rc = WaitLatch(MyLatch, wake_events, delay_ms,
     441           73 :                        WaitLSNWaitEvents[lsnType]);
     442              : 
     443              :         /*
     444              :          * Emergency bailout if postmaster has died.  This is to avoid the
     445              :          * necessity for manual cleanup of all postmaster children.
     446              :          */
     447           73 :         if (rc & WL_POSTMASTER_DEATH)
     448            0 :             ereport(FATAL,
     449              :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
     450              :                     errmsg("terminating connection due to unexpected postmaster exit"),
     451              :                     errcontext("while waiting for LSN"));
     452              : 
     453           73 :         if (rc & WL_LATCH_SET)
     454           67 :             ResetLatch(MyLatch);
     455              :     }
     456              : 
     457              :     /*
     458              :      * Delete our process from the shared memory heap.  We might already be
     459              :      * deleted by the startup process.  The 'inHeap' flags prevents us from
     460              :      * the double deletion.
     461              :      */
     462          221 :     deleteLSNWaiter(lsnType);
     463              : 
     464              :     /*
     465              :      * If we didn't reach the target LSN, we must be exited by timeout.
     466              :      */
     467          221 :     if (targetLSN > currentLSN)
     468            6 :         return WAIT_LSN_RESULT_TIMEOUT;
     469              : 
     470          215 :     return WAIT_LSN_RESULT_SUCCESS;
     471              : }
        

Generated by: LCOV version 2.0-1