LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogwait.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 124 129 96.1 %
Date: 2026-02-05 08:18:03 Functions: 12 12 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xlogwait.c
       4             :  *    Implements waiting for WAL operations to reach specific LSNs.
       5             :  *
       6             :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/access/transam/xlogwait.c
      10             :  *
      11             :  * NOTES
      12             :  *      This file implements waiting for WAL operations to reach specific LSNs
      13             :  *      on both physical standby and primary servers. The core idea is simple:
      14             :  *      every process that wants to wait publishes the LSN it needs to the
      15             :  *      shared memory, and the appropriate process (startup on standby,
      16             :  *      walreceiver on standby, or WAL writer/backend on primary) wakes it
      17             :  *      once that LSN has been reached.
      18             :  *
      19             :  *      The shared memory used by this module comprises a procInfos
      20             :  *      per-backend array with the information of the awaited LSN for each
      21             :  *      of the backend processes.  The elements of that array are organized
      22             :  *      into pairing heaps (waitersHeap), one for each WaitLSNType, which
      23             :  *      allows for very fast finding of the least awaited LSN for each type.
      24             :  *
      25             :  *      In addition, the least-awaited LSN for each type is cached in the
      26             :  *      minWaitedLSN array.  The waiter process publishes information about
      27             :  *      itself to the shared memory and waits on the latch until it is woken
      28             :  *      up by the appropriate process, standby is promoted, or the postmaster
      29             :  *      dies.  Then, it cleans information about itself in the shared memory.
      30             :  *
      31             :  *      On standby servers:
      32             :  *      - After replaying a WAL record, the startup process performs a fast
      33             :  *        path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
      34             :  *        negative, it checks waitersHeap[REPLAY] and wakes up the backends
      35             :  *        whose awaited LSNs are reached.
      36             :  *      - After receiving WAL, the walreceiver process performs similar checks
      37             :  *        against the flush and write LSNs, waking up waiters in the FLUSH
      38             :  *        and WRITE heaps, respectively.
      39             :  *
      40             :  *      On primary servers: After flushing WAL, the WAL writer or backend
      41             :  *      process performs a similar check against the flush LSN and wakes up
      42             :  *      waiters whose target flush LSNs have been reached.
      43             :  *
      44             :  *-------------------------------------------------------------------------
      45             :  */
      46             : 
      47             : #include "postgres.h"
      48             : 
      49             : #include <float.h>
      50             : 
      51             : #include "access/xlog.h"
      52             : #include "access/xlogrecovery.h"
      53             : #include "access/xlogwait.h"
      54             : #include "miscadmin.h"
      55             : #include "pgstat.h"
      56             : #include "replication/walreceiver.h"
      57             : #include "storage/latch.h"
      58             : #include "storage/proc.h"
      59             : #include "storage/shmem.h"
      60             : #include "utils/fmgrprotos.h"
      61             : #include "utils/pg_lsn.h"
      62             : #include "utils/snapmgr.h"
      63             : 
      64             : 
      65             : static int  waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
      66             :                         void *arg);
      67             : 
      68             : struct WaitLSNState *waitLSNState = NULL;
      69             : 
      70             : /*
      71             :  * Wait event for each WaitLSNType, used with WaitLatch() to report
      72             :  * the wait in pg_stat_activity.
      73             :  */
      74             : static const uint32 WaitLSNWaitEvents[] = {
      75             :     [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
      76             :     [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
      77             :     [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      78             :     [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      79             : };
      80             : 
      81             : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
      82             :                  "WaitLSNWaitEvents must match WaitLSNType enum");
      83             : 
      84             : /*
      85             :  * Get the current LSN for the specified wait type.
      86             :  */
      87             : XLogRecPtr
      88         194 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
      89             : {
      90             :     Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
      91             : 
      92         194 :     switch (lsnType)
      93             :     {
      94          74 :         case WAIT_LSN_TYPE_STANDBY_REPLAY:
      95          74 :             return GetXLogReplayRecPtr(NULL);
      96             : 
      97          54 :         case WAIT_LSN_TYPE_STANDBY_WRITE:
      98          54 :             return GetWalRcvWriteRecPtr();
      99             : 
     100          54 :         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     101          54 :             return GetWalRcvFlushRecPtr(NULL, NULL);
     102             : 
     103          12 :         case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     104          12 :             return GetFlushRecPtr(NULL);
     105             :     }
     106             : 
     107           0 :     elog(ERROR, "invalid LSN wait type: %d", lsnType);
     108             :     pg_unreachable();
     109             : }
     110             : 
     111             : /* Report the amount of shared memory space needed for WaitLSNState. */
     112             : Size
     113        6534 : WaitLSNShmemSize(void)
     114             : {
     115             :     Size        size;
     116             : 
     117        6534 :     size = offsetof(WaitLSNState, procInfos);
     118        6534 :     size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
     119        6534 :     return size;
     120             : }
     121             : 
     122             : /* Initialize the WaitLSNState in the shared memory. */
     123             : void
     124        2280 : WaitLSNShmemInit(void)
     125             : {
     126             :     bool        found;
     127             : 
     128        2280 :     waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
     129             :                                                     WaitLSNShmemSize(),
     130             :                                                     &found);
     131        2280 :     if (!found)
     132             :     {
     133             :         int         i;
     134             : 
     135             :         /* Initialize heaps and tracking */
     136       11400 :         for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
     137             :         {
     138        9120 :             pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
     139        9120 :             pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
     140             :         }
     141             : 
     142             :         /* Initialize process info array */
     143        2280 :         memset(&waitLSNState->procInfos, 0,
     144        2280 :                (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
     145             :     }
     146        2280 : }
     147             : 
     148             : /*
     149             :  * Comparison function for LSN waiters heaps. Waiting processes are ordered by
     150             :  * LSN, so that the waiter with smallest LSN is at the top.
     151             :  */
     152             : static int
     153          54 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
     154             : {
     155          54 :     const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
     156          54 :     const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
     157             : 
     158          54 :     if (aproc->waitLSN < bproc->waitLSN)
     159          30 :         return 1;
     160          24 :     else if (aproc->waitLSN > bproc->waitLSN)
     161          18 :         return -1;
     162             :     else
     163           6 :         return 0;
     164             : }
     165             : 
     166             : /*
     167             :  * Update minimum waited LSN for the specified LSN type
     168             :  */
     169             : static void
     170        5724 : updateMinWaitedLSN(WaitLSNType lsnType)
     171             : {
     172        5724 :     XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
     173        5724 :     int         i = (int) lsnType;
     174             : 
     175             :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     176             : 
     177        5724 :     if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     178             :     {
     179          94 :         pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     180          94 :         WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     181             : 
     182          94 :         minWaitedLSN = procInfo->waitLSN;
     183             :     }
     184        5724 :     pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
     185        5724 : }
     186             : 
     187             : /*
     188             :  * Add current process to appropriate waiters heap based on LSN type
     189             :  */
     190             : static void
     191          86 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
     192             : {
     193          86 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     194          86 :     int         i = (int) lsnType;
     195             : 
     196             :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     197             : 
     198          86 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     199             : 
     200          86 :     procInfo->procno = MyProcNumber;
     201          86 :     procInfo->waitLSN = lsn;
     202          86 :     procInfo->lsnType = lsnType;
     203             : 
     204             :     Assert(!procInfo->inHeap);
     205          86 :     pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     206          86 :     procInfo->inHeap = true;
     207          86 :     updateMinWaitedLSN(lsnType);
     208             : 
     209          86 :     LWLockRelease(WaitLSNLock);
     210          86 : }
     211             : 
     212             : /*
     213             :  * Remove current process from appropriate waiters heap based on LSN type
     214             :  */
     215             : static void
     216          86 : deleteLSNWaiter(WaitLSNType lsnType)
     217             : {
     218          86 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     219          86 :     int         i = (int) lsnType;
     220             : 
     221             :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     222             : 
     223          86 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     224             : 
     225             :     Assert(procInfo->lsnType == lsnType);
     226             : 
     227          86 :     if (procInfo->inHeap)
     228             :     {
     229          34 :         pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     230          34 :         procInfo->inHeap = false;
     231          34 :         updateMinWaitedLSN(lsnType);
     232             :     }
     233             : 
     234          86 :     LWLockRelease(WaitLSNLock);
     235          86 : }
     236             : 
     237             : /*
     238             :  * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
     239             :  * on the stack.  It should be enough to take single iteration for most cases.
     240             :  */
     241             : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
     242             : 
     243             : /*
     244             :  * Remove waiters whose LSN has been reached from the heap and set their
     245             :  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
     246             :  * and set latches for all waiters.
     247             :  *
     248             :  * This function first accumulates waiters to wake up into an array, then
     249             :  * wakes them up without holding a WaitLSNLock.  The array size is static and
     250             :  * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
     251             :  * to wake up all the waiters at once in the vast majority of cases.  However,
     252             :  * if there are more waiters, this function will loop to process them in
     253             :  * multiple chunks.
     254             :  */
     255             : static void
     256        5604 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
     257             : {
     258             :     ProcNumber  wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
     259             :     int         numWakeUpProcs;
     260        5604 :     int         i = (int) lsnType;
     261             : 
     262             :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     263             : 
     264             :     do
     265             :     {
     266             :         int         j;
     267             : 
     268        5604 :         numWakeUpProcs = 0;
     269        5604 :         LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     270             : 
     271             :         /*
     272             :          * Iterate the waiters heap until we find LSN not yet reached. Record
     273             :          * process numbers to wake up, but send wakeups after releasing lock.
     274             :          */
     275        5656 :         while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     276             :         {
     277          60 :             pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     278             :             WaitLSNProcInfo *procInfo;
     279             : 
     280             :             /* Get procInfo using appropriate heap node */
     281          60 :             procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     282             : 
     283          60 :             if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
     284           8 :                 break;
     285             : 
     286             :             Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
     287          52 :             wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
     288          52 :             (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
     289             : 
     290             :             /* Update appropriate flag */
     291          52 :             procInfo->inHeap = false;
     292             : 
     293          52 :             if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
     294           0 :                 break;
     295             :         }
     296             : 
     297        5604 :         updateMinWaitedLSN(lsnType);
     298        5604 :         LWLockRelease(WaitLSNLock);
     299             : 
     300             :         /*
     301             :          * Set latches for processes whose waited LSNs have been reached.
     302             :          * Since SetLatch() is a time-consuming operation, we do this outside
     303             :          * of WaitLSNLock. This is safe because procLatch is never freed, so
     304             :          * at worst we may set a latch for the wrong process or for no process
     305             :          * at all, which is harmless.
     306             :          */
     307        5656 :         for (j = 0; j < numWakeUpProcs; j++)
     308          52 :             SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
     309             : 
     310        5604 :     } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
     311        5604 : }
     312             : 
     313             : /*
     314             :  * Wake up processes waiting for LSN to reach currentLSN
     315             :  */
     316             : void
     317        5604 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
     318             : {
     319        5604 :     int         i = (int) lsnType;
     320             : 
     321             :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     322             : 
     323             :     /*
     324             :      * Fast path check.  Skip if currentLSN is InvalidXLogRecPtr, which means
     325             :      * "wake all waiters" (e.g., during promotion when recovery ends).
     326             :      */
     327        5628 :     if (XLogRecPtrIsValid(currentLSN) &&
     328          24 :         pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
     329           0 :         return;
     330             : 
     331        5604 :     wakeupWaiters(lsnType, currentLSN);
     332             : }
     333             : 
     334             : /*
     335             :  * Clean up LSN waiters for exiting process
     336             :  */
     337             : void
     338       88018 : WaitLSNCleanup(void)
     339             : {
     340       88018 :     if (waitLSNState)
     341             :     {
     342             :         /*
     343             :          * We do a fast-path check of the inHeap flag without the lock.  This
     344             :          * flag is set to true only by the process itself.  So, it's only
     345             :          * possible to get a false positive.  But that will be eliminated by a
     346             :          * recheck inside deleteLSNWaiter().
     347             :          */
     348       88018 :         if (waitLSNState->procInfos[MyProcNumber].inHeap)
     349           0 :             deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
     350             :     }
     351       88018 : }
     352             : 
     353             : /*
     354             :  * Check if the given LSN type requires recovery to be in progress.
     355             :  * Standby wait types (replay, write, flush) require recovery;
     356             :  * primary wait types (flush) do not.
     357             :  */
     358             : static inline bool
     359         186 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
     360             : {
     361         116 :     return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
     362         302 :         t == WAIT_LSN_TYPE_STANDBY_WRITE ||
     363             :         t == WAIT_LSN_TYPE_STANDBY_FLUSH;
     364             : }
     365             : 
     366             : /*
     367             :  * Wait using MyLatch till the given LSN is reached, the replica gets
     368             :  * promoted, or the postmaster dies.
     369             :  *
     370             :  * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
     371             :  * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
     372             :  * or replica got promoted before the target LSN reached.
     373             :  */
     374             : WaitLSNResult
     375          86 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
     376             : {
     377             :     XLogRecPtr  currentLSN;
     378          86 :     TimestampTz endtime = 0;
     379          86 :     int         wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
     380             : 
     381             :     /* Shouldn't be called when shmem isn't initialized */
     382             :     Assert(waitLSNState);
     383             : 
     384             :     /* Should have a valid proc number */
     385             :     Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
     386             : 
     387          86 :     if (timeout > 0)
     388             :     {
     389          64 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
     390          64 :         wake_events |= WL_TIMEOUT;
     391             :     }
     392             : 
     393             :     /*
     394             :      * Add our process to the waiters heap.  It might happen that target LSN
     395             :      * gets reached before we do.  The check at the beginning of the loop
     396             :      * below prevents the race condition.
     397             :      */
     398          86 :     addLSNWaiter(targetLSN, lsnType);
     399             : 
     400             :     for (;;)
     401         100 :     {
     402             :         int         rc;
     403         186 :         long        delay_ms = -1;
     404             : 
     405             :         /* Get current LSN for the wait type */
     406         186 :         currentLSN = GetCurrentLSNForWaitType(lsnType);
     407             : 
     408             :         /* Check that recovery is still in-progress */
     409         186 :         if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
     410             :         {
     411             :             /*
     412             :              * Recovery was ended, but check if target LSN was already
     413             :              * reached.
     414             :              */
     415          12 :             deleteLSNWaiter(lsnType);
     416             : 
     417          12 :             if (PromoteIsTriggered() && targetLSN <= currentLSN)
     418           2 :                 return WAIT_LSN_RESULT_SUCCESS;
     419          10 :             return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
     420             :         }
     421             :         else
     422             :         {
     423             :             /* Check if the waited LSN has been reached */
     424         174 :             if (targetLSN <= currentLSN)
     425          68 :                 break;
     426             :         }
     427             : 
     428         106 :         if (timeout > 0)
     429             :         {
     430          80 :             delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
     431          80 :             if (delay_ms <= 0)
     432           6 :                 break;
     433             :         }
     434             : 
     435         100 :         CHECK_FOR_INTERRUPTS();
     436             : 
     437         100 :         rc = WaitLatch(MyLatch, wake_events, delay_ms,
     438         100 :                        WaitLSNWaitEvents[lsnType]);
     439             : 
     440             :         /*
     441             :          * Emergency bailout if postmaster has died.  This is to avoid the
     442             :          * necessity for manual cleanup of all postmaster children.
     443             :          */
     444         100 :         if (rc & WL_POSTMASTER_DEATH)
     445           0 :             ereport(FATAL,
     446             :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
     447             :                     errmsg("terminating connection due to unexpected postmaster exit"),
     448             :                     errcontext("while waiting for LSN"));
     449             : 
     450         100 :         if (rc & WL_LATCH_SET)
     451          94 :             ResetLatch(MyLatch);
     452             :     }
     453             : 
     454             :     /*
     455             :      * Delete our process from the shared memory heap.  We might already be
     456             :      * deleted by the startup process.  The 'inHeap' flags prevents us from
     457             :      * the double deletion.
     458             :      */
     459          74 :     deleteLSNWaiter(lsnType);
     460             : 
     461             :     /*
     462             :      * If we didn't reach the target LSN, we must be exited by timeout.
     463             :      */
     464          74 :     if (targetLSN > currentLSN)
     465           6 :         return WAIT_LSN_RESULT_TIMEOUT;
     466             : 
     467          68 :     return WAIT_LSN_RESULT_SUCCESS;
     468             : }

Generated by: LCOV version 1.16