LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogwait.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 111 117 94.9 %
Date: 2025-11-17 03:18:13 Functions: 10 10 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * xlogwait.c
       4             :  *    Implements waiting for WAL operations to reach specific LSNs.
       5             :  *
       6             :  * Copyright (c) 2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *    src/backend/access/transam/xlogwait.c
      10             :  *
      11             :  * NOTES
      12             :  *      This file implements waiting for WAL operations to reach specific LSNs
      13             :  *      on both physical standby and primary servers. The core idea is simple:
      14             :  *      every process that wants to wait publishes the LSN it needs to the
      15             :  *      shared memory, and the appropriate process (startup on standby, or
      16             :  *      WAL writer/backend on primary) wakes it once that LSN has been reached.
      17             :  *
      18             :  *      The shared memory used by this module comprises a procInfos
      19             :  *      per-backend array with the information of the awaited LSN for each
      20             :  *      of the backend processes.  The elements of that array are organized
      21             :  *      into a pairing heap waitersHeap, which allows for very fast finding
      22             :  *      of the least awaited LSN.
      23             :  *
      24             :  *      In addition, the least-awaited LSN is cached as minWaitedLSN.  The
      25             :  *      waiter process publishes information about itself to the shared
      26             :  *      memory and waits on the latch until it is woken up by the appropriate
      27             :  *      process, standby is promoted, or the postmaster dies.  Then, it cleans
      28             :  *      information about itself in the shared memory.
      29             :  *
      30             :  *      On standby servers: After replaying a WAL record, the startup process
      31             :  *      first performs a fast path check minWaitedLSN > replayLSN.  If this
      32             :  *      check is negative, it checks waitersHeap and wakes up the backend
      33             :  *      whose awaited LSNs are reached.
      34             :  *
      35             :  *      On primary servers: After flushing WAL, the WAL writer or backend
      36             :  *      process performs a similar check against the flush LSN and wakes up
      37             :  *      waiters whose target flush LSNs have been reached.
      38             :  *
      39             :  *-------------------------------------------------------------------------
      40             :  */
      41             : 
      42             : #include "postgres.h"
      43             : 
      44             : #include <float.h>
      45             : #include <math.h>
      46             : 
      47             : #include "access/xlog.h"
      48             : #include "access/xlogrecovery.h"
      49             : #include "access/xlogwait.h"
      50             : #include "miscadmin.h"
      51             : #include "pgstat.h"
      52             : #include "storage/latch.h"
      53             : #include "storage/proc.h"
      54             : #include "storage/shmem.h"
      55             : #include "utils/fmgrprotos.h"
      56             : #include "utils/pg_lsn.h"
      57             : #include "utils/snapmgr.h"
      58             : 
      59             : 
      60             : static int  waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
      61             :                         void *arg);
      62             : 
      63             : struct WaitLSNState *waitLSNState = NULL;
      64             : 
      65             : /* Report the amount of shared memory space needed for WaitLSNState. */
      66             : Size
      67        6300 : WaitLSNShmemSize(void)
      68             : {
      69             :     Size        size;
      70             : 
      71        6300 :     size = offsetof(WaitLSNState, procInfos);
      72        6300 :     size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
      73        6300 :     return size;
      74             : }
      75             : 
      76             : /* Initialize the WaitLSNState in the shared memory. */
      77             : void
      78        2200 : WaitLSNShmemInit(void)
      79             : {
      80             :     bool        found;
      81             : 
      82        2200 :     waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
      83             :                                                     WaitLSNShmemSize(),
      84             :                                                     &found);
      85        2200 :     if (!found)
      86             :     {
      87             :         int         i;
      88             : 
      89             :         /* Initialize heaps and tracking */
      90        6600 :         for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
      91             :         {
      92        4400 :             pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
      93        4400 :             pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, (void *) (uintptr_t) i);
      94             :         }
      95             : 
      96             :         /* Initialize process info array */
      97        2200 :         memset(&waitLSNState->procInfos, 0,
      98        2200 :                (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
      99             :     }
     100        2200 : }
     101             : 
     102             : /*
     103             :  * Comparison function for LSN waiters heaps. Waiting processes are ordered by
     104             :  * LSN, so that the waiter with smallest LSN is at the top.
     105             :  */
     106             : static int
     107          16 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
     108             : {
     109          16 :     int         i = (uintptr_t) arg;
     110          16 :     const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], a);
     111          16 :     const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode[i], b);
     112             : 
     113          16 :     if (aproc->waitLSN < bproc->waitLSN)
     114          10 :         return 1;
     115           6 :     else if (aproc->waitLSN > bproc->waitLSN)
     116           6 :         return -1;
     117             :     else
     118           0 :         return 0;
     119             : }
     120             : 
     121             : /*
     122             :  * Update minimum waited LSN for the specified LSN type
     123             :  */
     124             : static void
     125        1864 : updateMinWaitedLSN(WaitLSNType lsnType)
     126             : {
     127        1864 :     XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
     128        1864 :     int         i = (int) lsnType;
     129             : 
     130             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     131             : 
     132        1864 :     if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     133             :     {
     134          42 :         pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     135          42 :         WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
     136             : 
     137          42 :         minWaitedLSN = procInfo->waitLSN;
     138             :     }
     139        1864 :     pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
     140        1864 : }
     141             : 
     142             : /*
     143             :  * Add current process to appropriate waiters heap based on LSN type
     144             :  */
     145             : static void
     146          34 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
     147             : {
     148          34 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     149          34 :     int         i = (int) lsnType;
     150             : 
     151             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     152             : 
     153          34 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     154             : 
     155          34 :     procInfo->procno = MyProcNumber;
     156          34 :     procInfo->waitLSN = lsn;
     157             : 
     158             :     Assert(!procInfo->inHeap[i]);
     159          34 :     pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
     160          34 :     procInfo->inHeap[i] = true;
     161          34 :     updateMinWaitedLSN(lsnType);
     162             : 
     163          34 :     LWLockRelease(WaitLSNLock);
     164          34 : }
     165             : 
     166             : /*
     167             :  * Remove current process from appropriate waiters heap based on LSN type
     168             :  */
     169             : static void
     170          34 : deleteLSNWaiter(WaitLSNType lsnType)
     171             : {
     172          34 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     173          34 :     int         i = (int) lsnType;
     174             : 
     175             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     176             : 
     177          34 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     178             : 
     179          34 :     if (procInfo->inHeap[i])
     180             :     {
     181          18 :         pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode[i]);
     182          18 :         procInfo->inHeap[i] = false;
     183          18 :         updateMinWaitedLSN(lsnType);
     184             :     }
     185             : 
     186          34 :     LWLockRelease(WaitLSNLock);
     187          34 : }
     188             : 
     189             : /*
     190             :  * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
     191             :  * on the stack.  It should be enough to take single iteration for most cases.
     192             :  */
     193             : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
     194             : 
     195             : /*
     196             :  * Remove waiters whose LSN has been reached from the heap and set their
     197             :  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
     198             :  * and set latches for all waiters.
     199             :  *
     200             :  * This function first accumulates waiters to wake up into an array, then
     201             :  * wakes them up without holding a WaitLSNLock.  The array size is static and
     202             :  * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
     203             :  * to wake up all the waiters at once in the vast majority of cases.  However,
     204             :  * if there are more waiters, this function will loop to process them in
     205             :  * multiple chunks.
     206             :  */
     207             : static void
     208        1812 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
     209             : {
     210             :     ProcNumber  wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
     211             :     int         numWakeUpProcs;
     212        1812 :     int         i = (int) lsnType;
     213             : 
     214             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     215             : 
     216             :     do
     217             :     {
     218        1812 :         numWakeUpProcs = 0;
     219        1812 :         LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     220             : 
     221             :         /*
     222             :          * Iterate the waiters heap until we find LSN not yet reached. Record
     223             :          * process numbers to wake up, but send wakeups after releasing lock.
     224             :          */
     225        1828 :         while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     226             :         {
     227          24 :             pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     228             :             WaitLSNProcInfo *procInfo;
     229             : 
     230             :             /* Get procInfo using appropriate heap node */
     231          24 :             procInfo = pairingheap_container(WaitLSNProcInfo, heapNode[i], node);
     232             : 
     233          24 :             if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
     234           8 :                 break;
     235             : 
     236             :             Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
     237          16 :             wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
     238          16 :             (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
     239             : 
     240             :             /* Update appropriate flag */
     241          16 :             procInfo->inHeap[i] = false;
     242             : 
     243          16 :             if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
     244           0 :                 break;
     245             :         }
     246             : 
     247        1812 :         updateMinWaitedLSN(lsnType);
     248        1812 :         LWLockRelease(WaitLSNLock);
     249             : 
     250             :         /*
     251             :          * Set latches for processes whose waited LSNs have been reached.
     252             :          * Since SetLatch() is a time-consuming operation, we do this outside
     253             :          * of WaitLSNLock. This is safe because procLatch is never freed, so
     254             :          * at worst we may set a latch for the wrong process or for no process
     255             :          * at all, which is harmless.
     256             :          */
     257        1828 :         for (i = 0; i < numWakeUpProcs; i++)
     258          16 :             SetLatch(&GetPGProcByNumber(wakeUpProcs[i])->procLatch);
     259             : 
     260        1812 :     } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
     261        1812 : }
     262             : 
     263             : /*
     264             :  * Wake up processes waiting for LSN to reach currentLSN
     265             :  */
     266             : void
     267        1812 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
     268             : {
     269        1812 :     int         i = (int) lsnType;
     270             : 
     271             :     Assert(i >= 0 && i < (int) WAIT_LSN_TYPE_COUNT);
     272             : 
     273             :     /*
     274             :      * Fast path check.  Skip if currentLSN is InvalidXLogRecPtr, which means
     275             :      * "wake all waiters" (e.g., during promotion when recovery ends).
     276             :      */
     277        1826 :     if (XLogRecPtrIsValid(currentLSN) &&
     278          14 :         pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
     279           0 :         return;
     280             : 
     281        1812 :     wakeupWaiters(lsnType, currentLSN);
     282             : }
     283             : 
     284             : /*
     285             :  * Clean up LSN waiters for exiting process
     286             :  */
     287             : void
     288       85938 : WaitLSNCleanup(void)
     289             : {
     290       85938 :     if (waitLSNState)
     291             :     {
     292             :         int         i;
     293             : 
     294             :         /*
     295             :          * We do a fast-path check of the heap flags without the lock.  These
     296             :          * flags are set to true only by the process itself.  So, it's only
     297             :          * possible to get a false positive.  But that will be eliminated by a
     298             :          * recheck inside deleteLSNWaiter().
     299             :          */
     300             : 
     301      257814 :         for (i = 0; i < (int) WAIT_LSN_TYPE_COUNT; i++)
     302             :         {
     303      171876 :             if (waitLSNState->procInfos[MyProcNumber].inHeap[i])
     304           0 :                 deleteLSNWaiter((WaitLSNType) i);
     305             :         }
     306             :     }
     307       85938 : }
     308             : 
     309             : /*
     310             :  * Wait using MyLatch till the given LSN is reached, the replica gets
     311             :  * promoted, or the postmaster dies.
     312             :  *
     313             :  * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
     314             :  * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
     315             :  * or replica got promoted before the target LSN reached.
     316             :  */
     317             : WaitLSNResult
     318          34 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
     319             : {
     320             :     XLogRecPtr  currentLSN;
     321          34 :     TimestampTz endtime = 0;
     322          34 :     int         wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
     323             : 
     324             :     /* Shouldn't be called when shmem isn't initialized */
     325             :     Assert(waitLSNState);
     326             : 
     327             :     /* Should have a valid proc number */
     328             :     Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends);
     329             : 
     330          34 :     if (timeout > 0)
     331             :     {
     332          16 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
     333          16 :         wake_events |= WL_TIMEOUT;
     334             :     }
     335             : 
     336             :     /*
     337             :      * Add our process to the waiters heap.  It might happen that target LSN
     338             :      * gets reached before we do.  The check at the beginning of the loop
     339             :      * below prevents the race condition.
     340             :      */
     341          34 :     addLSNWaiter(targetLSN, lsnType);
     342             : 
     343             :     for (;;)
     344          22 :     {
     345             :         int         rc;
     346          56 :         long        delay_ms = -1;
     347             : 
     348          56 :         if (lsnType == WAIT_LSN_TYPE_REPLAY)
     349          56 :             currentLSN = GetXLogReplayRecPtr(NULL);
     350             :         else
     351           0 :             currentLSN = GetFlushRecPtr(NULL);
     352             : 
     353             :         /* Check that recovery is still in-progress */
     354          56 :         if (lsnType == WAIT_LSN_TYPE_REPLAY && !RecoveryInProgress())
     355             :         {
     356             :             /*
     357             :              * Recovery was ended, but check if target LSN was already
     358             :              * reached.
     359             :              */
     360           8 :             deleteLSNWaiter(lsnType);
     361             : 
     362           8 :             if (PromoteIsTriggered() && targetLSN <= currentLSN)
     363           2 :                 return WAIT_LSN_RESULT_SUCCESS;
     364           6 :             return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
     365             :         }
     366             :         else
     367             :         {
     368             :             /* Check if the waited LSN has been reached */
     369          48 :             if (targetLSN <= currentLSN)
     370          20 :                 break;
     371             :         }
     372             : 
     373          28 :         if (timeout > 0)
     374             :         {
     375          14 :             delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
     376          14 :             if (delay_ms <= 0)
     377           6 :                 break;
     378             :         }
     379             : 
     380          22 :         CHECK_FOR_INTERRUPTS();
     381             : 
     382          22 :         rc = WaitLatch(MyLatch, wake_events, delay_ms,
     383             :                        (lsnType == WAIT_LSN_TYPE_REPLAY) ? WAIT_EVENT_WAIT_FOR_WAL_REPLAY : WAIT_EVENT_WAIT_FOR_WAL_FLUSH);
     384             : 
     385             :         /*
     386             :          * Emergency bailout if postmaster has died.  This is to avoid the
     387             :          * necessity for manual cleanup of all postmaster children.
     388             :          */
     389          22 :         if (rc & WL_POSTMASTER_DEATH)
     390           0 :             ereport(FATAL,
     391             :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
     392             :                     errmsg("terminating connection due to unexpected postmaster exit"),
     393             :                     errcontext("while waiting for LSN"));
     394             : 
     395          22 :         if (rc & WL_LATCH_SET)
     396          16 :             ResetLatch(MyLatch);
     397             :     }
     398             : 
     399             :     /*
     400             :      * Delete our process from the shared memory heap.  We might already be
     401             :      * deleted by the startup process.  The 'inHeap' flags prevents us from
     402             :      * the double deletion.
     403             :      */
     404          26 :     deleteLSNWaiter(lsnType);
     405             : 
     406             :     /*
     407             :      * If we didn't reach the target LSN, we must be exited by timeout.
     408             :      */
     409          26 :     if (targetLSN > currentLSN)
     410           6 :         return WAIT_LSN_RESULT_TIMEOUT;
     411             : 
     412          20 :     return WAIT_LSN_RESULT_SUCCESS;
     413             : }

Generated by: LCOV version 1.16