LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogwait.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 110 116 94.8 %
Date: 2025-12-07 04:17:18 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xlogwait.c
       4             :  *    Implements waiting for WAL operations to reach specific LSNs.
       5             :  *
       6             :  * Copyright (c) 2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/access/transam/xlogwait.c
      10             :  *
      11             :  * NOTES
      12             :  *      This file implements waiting for WAL operations to reach specific LSNs
      13             :  *      on both physical standby and primary servers. The core idea is simple:
      14             :  *      every process that wants to wait publishes the LSN it needs to the
      15             :  *      shared memory, and the appropriate process (startup on standby, or
      16             :  *      WAL writer/backend on primary) wakes it once that LSN has been reached.
      17             :  *
      18             :  *      The shared memory used by this module comprises a procInfos
      19             :  *      per-backend array with the information of the awaited LSN for each
      20             :  *      of the backend processes.  The elements of that array are organized
      21             :  *      into a pairing heap waitersHeap, which allows for very fast finding
      22             :  *      of the least awaited LSN.
      23             :  *
      24             :  *      In addition, the least-awaited LSN is cached as minWaitedLSN.  The
      25             :  *      waiter process publishes information about itself to the shared
      26             :  *      memory and waits on the latch until it is woken up by the appropriate
      27             :  *      process, standby is promoted, or the postmaster dies.  Then, it cleans
      28             :  *      information about itself in the shared memory.
      29             :  *
      30             :  *      On standby servers: After replaying a WAL record, the startup process
      31             :  *      first performs a fast path check minWaitedLSN > replayLSN.  If this
      32             :  *      check is negative, it checks waitersHeap and wakes up the backend
      33             :  *      whose awaited LSNs are reached.
      34             :  *
      35             :  *      On primary servers: After flushing WAL, the WAL writer or backend
      36             :  *      process performs a similar check against the flush LSN and wakes up
      37             :  *      waiters whose target flush LSNs have been reached.
      38             :  *
      39             :  *-------------------------------------------------------------------------
      40             :  */
      41             : 
      42             : #include "postgres.h"
      43             : 
      44             : #include <float.h>
      45             : #include <math.h>
      46             : 
      47             : #include "access/xlog.h"
      48             : #include "access/xlogrecovery.h"
      49             : #include "access/xlogwait.h"
      50             : #include "miscadmin.h"
      51             : #include "pgstat.h"
      52             : #include "storage/latch.h"
      53             : #include "storage/proc.h"
      54             : #include "storage/shmem.h"
      55             : #include "utils/fmgrprotos.h"
      56             : #include "utils/pg_lsn.h"
      57             : #include "utils/snapmgr.h"
      58             : 
      59             : 
      60             : static int  waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
      61             :                         void *arg);
      62             : 
      63             : struct WaitLSNState *waitLSNState = NULL;
      64             : 
      65             : /* Report the amount of shared memory space needed for WaitLSNState. */
      66             : Size
      67        6324 : WaitLSNShmemSize(void)
      68             : {
      69             :     Size        size;
      70             : 
      71        6324 :     size = offsetof(WaitLSNState, procInfos);
      72        6324 :     size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
      73        6324 :     return size;
      74             : }
      75             : 
      76             : /* Initialize the WaitLSNState in the shared memory. */
      77             : void
      78        2208 : WaitLSNShmemInit(void)
      79             : {
      80             :     bool        found;
      81             : 
      82        2208 :     waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
      83             :                                                     WaitLSNShmemSize(),
      84             :                                                     &found);
      85        2208 :     if (!found)
      86             :     {
      87             :         int         i;
      88             : 
      89             :         /* Initialize heaps and tracking */
      90        6624 :         for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
      91             :         {
      92        4416 :             pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
      93        4416 :             pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
      94             :         }
      95             : 
      96             :         /* Initialize process info array */
      97        2208 :         memset(&waitLSNState->procInfos, 0,
      98        2208 :                (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
      99             :     }
     100        2208 : }
     101             : 
     102             : /*
     103             :  * Comparison function for LSN waiters heaps. Waiting processes are ordered by
     104             :  * LSN, so that the waiter with smallest LSN is at the top.
     105             :  */
     106             : static int
     107          16 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
     108             : {
     109          16 :     const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
     110          16 :     const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
     111             : 
     112          16 :     if (aproc->waitLSN < bproc->waitLSN)
     113          10 :         return 1;
     114           6 :     else if (aproc->waitLSN > bproc->waitLSN)
     115           6 :         return -1;
     116             :     else
     117           0 :         return 0;
     118             : }
     119             : 
     120             : /*
     121             :  * Update minimum waited LSN for the specified LSN type
     122             :  */
     123             : static void
     124        1872 : updateMinWaitedLSN(WaitLSNType lsnType)
     125             : {
     126        1872 :     XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
     127        1872 :     int         i = (int) lsnType;
     128             : 
     129             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     130             : 
     131        1872 :     if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     132             :     {
     133          42 :         pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     134          42 :         WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     135             : 
     136          42 :         minWaitedLSN = procInfo->waitLSN;
     137             :     }
     138        1872 :     pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
     139        1872 : }
     140             : 
     141             : /*
     142             :  * Add current process to appropriate waiters heap based on LSN type
     143             :  */
     144             : static void
     145          34 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
     146             : {
     147          34 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     148          34 :     int         i = (int) lsnType;
     149             : 
     150             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     151             : 
     152          34 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     153             : 
     154          34 :     procInfo->procno = MyProcNumber;
     155          34 :     procInfo->waitLSN = lsn;
     156          34 :     procInfo->lsnType = lsnType;
     157             : 
     158             :     Assert(!procInfo->inHeap);
     159          34 :     pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     160          34 :     procInfo->inHeap = true;
     161          34 :     updateMinWaitedLSN(lsnType);
     162             : 
     163          34 :     LWLockRelease(WaitLSNLock);
     164          34 : }
     165             : 
     166             : /*
     167             :  * Remove current process from appropriate waiters heap based on LSN type
     168             :  */
     169             : static void
     170          34 : deleteLSNWaiter(WaitLSNType lsnType)
     171             : {
     172          34 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     173          34 :     int         i = (int) lsnType;
     174             : 
     175             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     176             : 
     177          34 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     178             : 
     179             :     Assert(procInfo->lsnType == lsnType);
     180             : 
     181          34 :     if (procInfo->inHeap)
     182             :     {
     183          18 :         pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     184          18 :         procInfo->inHeap = false;
     185          18 :         updateMinWaitedLSN(lsnType);
     186             :     }
     187             : 
     188          34 :     LWLockRelease(WaitLSNLock);
     189          34 : }
     190             : 
     191             : /*
     192             :  * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
     193             :  * on the stack.  It should be enough to take single iteration for most cases.
     194             :  */
     195             : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
     196             : 
     197             : /*
     198             :  * Remove waiters whose LSN has been reached from the heap and set their
     199             :  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
     200             :  * and set latches for all waiters.
     201             :  *
     202             :  * This function first accumulates waiters to wake up into an array, then
     203             :  * wakes them up without holding a WaitLSNLock.  The array size is static and
     204             :  * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
     205             :  * to wake up all the waiters at once in the vast majority of cases.  However,
     206             :  * if there are more waiters, this function will loop to process them in
     207             :  * multiple chunks.
     208             :  */
     209             : static void
     210        1820 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
     211             : {
     212             :     ProcNumber  wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
     213             :     int         numWakeUpProcs;
     214        1820 :     int         i = (int) lsnType;
     215             : 
     216             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     217             : 
     218             :     do
     219             :     {
     220        1820 :         numWakeUpProcs = 0;
     221        1820 :         LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     222             : 
     223             :         /*
     224             :          * Iterate the waiters heap until we find LSN not yet reached. Record
     225             :          * process numbers to wake up, but send wakeups after releasing lock.
     226             :          */
     227        1836 :         while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     228             :         {
     229          24 :             pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     230             :             WaitLSNProcInfo *procInfo;
     231             : 
     232             :             /* Get procInfo using appropriate heap node */
     233          24 :             procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     234             : 
     235          24 :             if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
     236           8 :                 break;
     237             : 
     238             :             Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
     239          16 :             wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
     240          16 :             (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
     241             : 
     242             :             /* Update appropriate flag */
     243          16 :             procInfo->inHeap = false;
     244             : 
     245          16 :             if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
     246           0 :                 break;
     247             :         }
     248             : 
     249        1820 :         updateMinWaitedLSN(lsnType);
     250        1820 :         LWLockRelease(WaitLSNLock);
     251             : 
     252             :         /*
     253             :          * Set latches for processes whose waited LSNs have been reached.
     254             :          * Since SetLatch() is a time-consuming operation, we do this outside
     255             :          * of WaitLSNLock. This is safe because procLatch is never freed, so
     256             :          * at worst we may set a latch for the wrong process or for no process
     257             :          * at all, which is harmless.
     258             :          */
     259        1836 :         for (i = 0; i < numWakeUpProcs; i++)
     260          16 :             SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
     261             : 
     262        1820 :     } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
     263        1820 : }
     264             : 
     265             : /*
     266             :  * Wake up processes waiting for LSN to reach currentLSN
     267             :  */
     268             : void
     269        1820 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
     270             : {
     271        1820 :     int         i = (int) lsnType;
     272             : 
     273             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     274             : 
     275             :     /*
     276             :      * Fast path check.  Skip if currentLSN is InvalidXLogRecPtr, which means
     277             :      * "wake all waiters" (e.g., during promotion when recovery ends).
     278             :      */
     279        1834 :     if (XLogRecPtrIsValid(currentLSN) &&
     280          14 :         pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
     281           0 :         return;
     282             : 
     283        1820 :     wakeupWaiters(lsnType, currentLSN);
     284             : }
     285             : 
     286             : /*
     287             :  * Clean up LSN waiters for exiting process
     288             :  */
     289             : void
     290       86606 : WaitLSNCleanup(void)
     291             : {
     292       86606 :     if (waitLSNState)
     293             :     {
     294             :         /*
     295             :          * We do a fast-path check of the inHeap flag without the lock.  This
     296             :          * flag is set to true only by the process itself.  So, it's only
     297             :          * possible to get a false positive.  But that will be eliminated by a
     298             :          * recheck inside deleteLSNWaiter().
     299             :          */
     300       86606 :         if (waitLSNState->procInfos[MyProcNumber].inHeap)
     301           0 :             deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
     302             :     }
     303       86606 : }
     304             : 
     305             : /*
     306             :  * Wait using MyLatch till the given LSN is reached, the replica gets
     307             :  * promoted, or the postmaster dies.
     308             :  *
     309             :  * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
     310             :  * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
     311             :  * or replica got promoted before the target LSN reached.
     312             :  */
     313             : WaitLSNResult
     314          34 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
     315             : {
     316             :     XLogRecPtr  currentLSN;
     317          34 :     TimestampTz endtime = 0;
     318          34 :     int         wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
     319             : 
     320             :     /* Shouldn't be called when shmem isn't initialized */
     321             :     Assert(waitLSNState);
     322             : 
     323             :     /* Should have a valid proc number */
     324             :     Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
     325             : 
     326          34 :     if (timeout > 0)
     327             :     {
     328          16 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
     329          16 :         wake_events |= WL_TIMEOUT;
     330             :     }
     331             : 
     332             :     /*
     333             :      * Add our process to the waiters heap.  It might happen that target LSN
     334             :      * gets reached before we do.  The check at the beginning of the loop
     335             :      * below prevents the race condition.
     336             :      */
     337          34 :     addLSNWaiter(targetLSN, lsnType);
     338             : 
     339             :     for (;;)
     340          24 :     {
     341             :         int         rc;
     342          58 :         long        delay_ms = -1;
     343             : 
     344          58 :         if (lsnType == WAIT_LSN_TYPE_REPLAY)
     345          58 :             currentLSN = GetXLogReplayRecPtr(NULL);
     346             :         else
     347           0 :             currentLSN = GetFlushRecPtr(NULL);
     348             : 
     349             :         /* Check that recovery is still in-progress */
     350          58 :         if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
     351             :         {
     352             :             /*
     353             :              * Recovery was ended, but check if target LSN was already
     354             :              * reached.
     355             :              */
     356           8 :             deleteLSNWaiter(lsnType);
     357             : 
     358           8 :             if (PromoteIsTriggered() && targetLSN <= currentLSN)
     359           2 :                 return WAIT_LSN_RESULT_SUCCESS;
     360           6 :             return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
     361             :         }
     362             :         else
     363             :         {
     364             :             /* Check if the waited LSN has been reached */
     365          50 :             if (targetLSN <= currentLSN)
     366          20 :                 break;
     367             :         }
     368             : 
     369          30 :         if (timeout > 0)
     370             :         {
     371          14 :             delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
     372          14 :             if (delay_ms <= 0)
     373           6 :                 break;
     374             :         }
     375             : 
     376          24 :         CHECK_FOR_INTERRUPTS();
     377             : 
     378          24 :         rc = WaitLatch(MyLatch, wake_events, delay_ms,
     379             :                        (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
     380             : 
     381             :         /*
     382             :          * Emergency bailout if postmaster has died.  This is to avoid the
     383             :          * necessity for manual cleanup of all postmaster children.
     384             :          */
     385          24 :         if (rc & WL_POSTMASTER_DEATH)
     386           0 :             ereport(FATAL,
     387             :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
     388             :                     errmsg("terminating connection due to unexpected postmaster exit"),
     389             :                     errcontext("while waiting for LSN"));
     390             : 
     391          24 :         if (rc & WL_LATCH_SET)
     392          18 :             ResetLatch(MyLatch);
     393             :     }
     394             : 
     395             :     /*
     396             :      * Delete our process from the shared memory heap.  We might already be
     397             :      * deleted by the startup process.  The 'inHeap' flags prevents us from
     398             :      * the double deletion.
     399             :      */
     400          26 :     deleteLSNWaiter(lsnType);
     401             : 
     402             :     /*
     403             :      * If we didn't reach the target LSN, we must be exited by timeout.
     404             :      */
     405          26 :     if (targetLSN > currentLSN)
     406           6 :         return WAIT_LSN_RESULT_TIMEOUT;
     407             : 
     408          20 :     return WAIT_LSN_RESULT_SUCCESS;
     409             : }

Generated by: LCOV version 1.16