LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogwait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 96.1 % 129 124
Test Date: 2026-03-01 17:14:43 Functions: 100.0 % 12 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogwait.c
       4              :  *    Implements waiting for WAL operations to reach specific LSNs.
       5              :  *
       6              :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/access/transam/xlogwait.c
      10              :  *
      11              :  * NOTES
      12              :  *      This file implements waiting for WAL operations to reach specific LSNs
      13              :  *      on both physical standby and primary servers. The core idea is simple:
      14              :  *      every process that wants to wait publishes the LSN it needs to the
      15              :  *      shared memory, and the appropriate process (startup on standby,
      16              :  *      walreceiver on standby, or WAL writer/backend on primary) wakes it
      17              :  *      once that LSN has been reached.
      18              :  *
      19              :  *      The shared memory used by this module comprises a procInfos
      20              :  *      per-backend array with the information of the awaited LSN for each
      21              :  *      of the backend processes.  The elements of that array are organized
      22              :  *      into pairing heaps (waitersHeap), one for each WaitLSNType, which
      23              :  *      allows for very fast finding of the least awaited LSN for each type.
      24              :  *
      25              :  *      In addition, the least-awaited LSN for each type is cached in the
      26              :  *      minWaitedLSN array.  The waiter process publishes information about
      27              :  *      itself to the shared memory and waits on the latch until it is woken
      28              :  *      up by the appropriate process, standby is promoted, or the postmaster
      29              :  *      dies.  Then, it cleans information about itself in the shared memory.
      30              :  *
      31              :  *      On standby servers:
      32              :  *      - After replaying a WAL record, the startup process performs a fast
      33              :  *        path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
      34              :  *        negative, it checks waitersHeap[REPLAY] and wakes up the backends
      35              :  *        whose awaited LSNs are reached.
      36              :  *      - After receiving WAL, the walreceiver process performs similar checks
      37              :  *        against the flush and write LSNs, waking up waiters in the FLUSH
      38              :  *        and WRITE heaps, respectively.
      39              :  *
      40              :  *      On primary servers: After flushing WAL, the WAL writer or backend
      41              :  *      process performs a similar check against the flush LSN and wakes up
      42              :  *      waiters whose target flush LSNs have been reached.
      43              :  *
      44              :  *-------------------------------------------------------------------------
      45              :  */
      46              : 
      47              : #include "postgres.h"
      48              : 
      49              : #include <float.h>
      50              : 
      51              : #include "access/xlog.h"
      52              : #include "access/xlogrecovery.h"
      53              : #include "access/xlogwait.h"
      54              : #include "miscadmin.h"
      55              : #include "pgstat.h"
      56              : #include "replication/walreceiver.h"
      57              : #include "storage/latch.h"
      58              : #include "storage/proc.h"
      59              : #include "storage/shmem.h"
      60              : #include "utils/fmgrprotos.h"
      61              : #include "utils/pg_lsn.h"
      62              : #include "utils/snapmgr.h"
      63              : 
      64              : 
      65              : static int  waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
      66              :                         void *arg);
      67              : 
      68              : struct WaitLSNState *waitLSNState = NULL;
      69              : 
      70              : /*
      71              :  * Wait event for each WaitLSNType, used with WaitLatch() to report
      72              :  * the wait in pg_stat_activity.
      73              :  */
      74              : static const uint32 WaitLSNWaitEvents[] = {
      75              :     [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
      76              :     [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
      77              :     [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      78              :     [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      79              : };
      80              : 
      81              : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
      82              :                  "WaitLSNWaitEvents must match WaitLSNType enum");
      83              : 
      84              : /*
      85              :  * Get the current LSN for the specified wait type.
      86              :  */
      87              : XLogRecPtr
      88           96 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
      89              : {
      90              :     Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
      91              : 
      92           96 :     switch (lsnType)
      93              :     {
      94           37 :         case WAIT_LSN_TYPE_STANDBY_REPLAY:
      95           37 :             return GetXLogReplayRecPtr(NULL);
      96              : 
      97           26 :         case WAIT_LSN_TYPE_STANDBY_WRITE:
      98           26 :             return GetWalRcvWriteRecPtr();
      99              : 
     100           27 :         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     101           27 :             return GetWalRcvFlushRecPtr(NULL, NULL);
     102              : 
     103            6 :         case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     104            6 :             return GetFlushRecPtr(NULL);
     105              :     }
     106              : 
     107            0 :     elog(ERROR, "invalid LSN wait type: %d", lsnType);
     108              :     pg_unreachable();
     109              : }
     110              : 
     111              : /* Report the amount of shared memory space needed for WaitLSNState. */
     112              : Size
     113         3297 : WaitLSNShmemSize(void)
     114              : {
     115              :     Size        size;
     116              : 
     117         3297 :     size = offsetof(WaitLSNState, procInfos);
     118         3297 :     size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
     119         3297 :     return size;
     120              : }
     121              : 
     122              : /* Initialize the WaitLSNState in the shared memory. */
     123              : void
     124         1150 : WaitLSNShmemInit(void)
     125              : {
     126              :     bool        found;
     127              : 
     128         1150 :     waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
     129              :                                                     WaitLSNShmemSize(),
     130              :                                                     &found);
     131         1150 :     if (!found)
     132              :     {
     133              :         int         i;
     134              : 
     135              :         /* Initialize heaps and tracking */
     136         5750 :         for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
     137              :         {
     138         4600 :             pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
     139         4600 :             pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
     140              :         }
     141              : 
     142              :         /* Initialize process info array */
     143         1150 :         memset(&waitLSNState->procInfos, 0,
     144         1150 :                (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
     145              :     }
     146         1150 : }
     147              : 
     148              : /*
     149              :  * Comparison function for LSN waiters heaps. Waiting processes are ordered by
     150              :  * LSN, so that the waiter with smallest LSN is at the top.
     151              :  */
     152              : static int
     153           27 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
     154              : {
     155           27 :     const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
     156           27 :     const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
     157              : 
     158           27 :     if (aproc->waitLSN < bproc->waitLSN)
     159           15 :         return 1;
     160           12 :     else if (aproc->waitLSN > bproc->waitLSN)
     161            9 :         return -1;
     162              :     else
     163            3 :         return 0;
     164              : }
     165              : 
     166              : /*
     167              :  * Update minimum waited LSN for the specified LSN type
     168              :  */
     169              : static void
     170         2892 : updateMinWaitedLSN(WaitLSNType lsnType)
     171              : {
     172         2892 :     XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
     173         2892 :     int         i = (int) lsnType;
     174              : 
     175              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     176              : 
     177         2892 :     if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     178              :     {
     179           47 :         pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     180           47 :         WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     181              : 
     182           47 :         minWaitedLSN = procInfo->waitLSN;
     183              :     }
     184         2892 :     pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
     185         2892 : }
     186              : 
     187              : /*
     188              :  * Add current process to appropriate waiters heap based on LSN type
     189              :  */
     190              : static void
     191           43 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
     192              : {
     193           43 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     194           43 :     int         i = (int) lsnType;
     195              : 
     196              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     197              : 
     198           43 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     199              : 
     200           43 :     procInfo->procno = MyProcNumber;
     201           43 :     procInfo->waitLSN = lsn;
     202           43 :     procInfo->lsnType = lsnType;
     203              : 
     204              :     Assert(!procInfo->inHeap);
     205           43 :     pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     206           43 :     procInfo->inHeap = true;
     207           43 :     updateMinWaitedLSN(lsnType);
     208              : 
     209           43 :     LWLockRelease(WaitLSNLock);
     210           43 : }
     211              : 
     212              : /*
     213              :  * Remove current process from appropriate waiters heap based on LSN type
     214              :  */
     215              : static void
     216           43 : deleteLSNWaiter(WaitLSNType lsnType)
     217              : {
     218           43 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     219           43 :     int         i = (int) lsnType;
     220              : 
     221              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     222              : 
     223           43 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     224              : 
     225              :     Assert(procInfo->lsnType == lsnType);
     226              : 
     227           43 :     if (procInfo->inHeap)
     228              :     {
     229           17 :         pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     230           17 :         procInfo->inHeap = false;
     231           17 :         updateMinWaitedLSN(lsnType);
     232              :     }
     233              : 
     234           43 :     LWLockRelease(WaitLSNLock);
     235           43 : }
     236              : 
     237              : /*
     238              :  * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
     239              :  * on the stack.  It should be enough to take single iteration for most cases.
     240              :  */
     241              : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
     242              : 
     243              : /*
     244              :  * Remove waiters whose LSN has been reached from the heap and set their
     245              :  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
     246              :  * and set latches for all waiters.
     247              :  *
     248              :  * This function first accumulates waiters to wake up into an array, then
     249              :  * wakes them up without holding a WaitLSNLock.  The array size is static and
     250              :  * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
     251              :  * to wake up all the waiters at once in the vast majority of cases.  However,
     252              :  * if there are more waiters, this function will loop to process them in
     253              :  * multiple chunks.
     254              :  */
     255              : static void
     256         2832 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
     257              : {
     258              :     ProcNumber  wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
     259              :     int         numWakeUpProcs;
     260         2832 :     int         i = (int) lsnType;
     261              : 
     262              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     263              : 
     264              :     do
     265              :     {
     266              :         int         j;
     267              : 
     268         2832 :         numWakeUpProcs = 0;
     269         2832 :         LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     270              : 
     271              :         /*
     272              :          * Iterate the waiters heap until we find LSN not yet reached. Record
     273              :          * process numbers to wake up, but send wakeups after releasing lock.
     274              :          */
     275         2858 :         while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     276              :         {
     277           30 :             pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     278              :             WaitLSNProcInfo *procInfo;
     279              : 
     280              :             /* Get procInfo using appropriate heap node */
     281           30 :             procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     282              : 
     283           30 :             if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
     284            4 :                 break;
     285              : 
     286              :             Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
     287           26 :             wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
     288           26 :             (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
     289              : 
     290              :             /* Update appropriate flag */
     291           26 :             procInfo->inHeap = false;
     292              : 
     293           26 :             if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
     294            0 :                 break;
     295              :         }
     296              : 
     297         2832 :         updateMinWaitedLSN(lsnType);
     298         2832 :         LWLockRelease(WaitLSNLock);
     299              : 
     300              :         /*
     301              :          * Set latches for processes whose waited LSNs have been reached.
     302              :          * Since SetLatch() is a time-consuming operation, we do this outside
     303              :          * of WaitLSNLock. This is safe because procLatch is never freed, so
     304              :          * at worst we may set a latch for the wrong process or for no process
     305              :          * at all, which is harmless.
     306              :          */
     307         2858 :         for (j = 0; j < numWakeUpProcs; j++)
     308           26 :             SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
     309              : 
     310         2832 :     } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
     311         2832 : }
     312              : 
     313              : /*
     314              :  * Wake up processes waiting for LSN to reach currentLSN
     315              :  */
     316              : void
     317         2832 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
     318              : {
     319         2832 :     int         i = (int) lsnType;
     320              : 
     321              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     322              : 
     323              :     /*
     324              :      * Fast path check.  Skip if currentLSN is InvalidXLogRecPtr, which means
     325              :      * "wake all waiters" (e.g., during promotion when recovery ends).
     326              :      */
     327         2844 :     if (XLogRecPtrIsValid(currentLSN) &&
     328           12 :         pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
     329            0 :         return;
     330              : 
     331         2832 :     wakeupWaiters(lsnType, currentLSN);
     332              : }
     333              : 
     334              : /*
     335              :  * Clean up LSN waiters for exiting process
     336              :  */
     337              : void
     338        44862 : WaitLSNCleanup(void)
     339              : {
     340        44862 :     if (waitLSNState)
     341              :     {
     342              :         /*
     343              :          * We do a fast-path check of the inHeap flag without the lock.  This
     344              :          * flag is set to true only by the process itself.  So, it's only
     345              :          * possible to get a false positive.  But that will be eliminated by a
     346              :          * recheck inside deleteLSNWaiter().
     347              :          */
     348        44862 :         if (waitLSNState->procInfos[MyProcNumber].inHeap)
     349            0 :             deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
     350              :     }
     351        44862 : }
     352              : 
     353              : /*
     354              :  * Check if the given LSN type requires recovery to be in progress.
     355              :  * Standby wait types (replay, write, flush) require recovery;
     356              :  * primary wait types (flush) do not.
     357              :  */
     358              : static inline bool
     359           92 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
     360              : {
     361           57 :     return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
     362          149 :         t == WAIT_LSN_TYPE_STANDBY_WRITE ||
     363              :         t == WAIT_LSN_TYPE_STANDBY_FLUSH;
     364              : }
     365              : 
     366              : /*
     367              :  * Wait using MyLatch till the given LSN is reached, the replica gets
     368              :  * promoted, or the postmaster dies.
     369              :  *
     370              :  * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
     371              :  * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
     372              :  * or replica got promoted before the target LSN reached.
     373              :  */
     374              : WaitLSNResult
     375           43 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
     376              : {
     377              :     XLogRecPtr  currentLSN;
     378           43 :     TimestampTz endtime = 0;
     379           43 :     int         wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
     380              : 
     381              :     /* Shouldn't be called when shmem isn't initialized */
     382              :     Assert(waitLSNState);
     383              : 
     384              :     /* Should have a valid proc number */
     385              :     Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
     386              : 
     387           43 :     if (timeout > 0)
     388              :     {
     389           32 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
     390           32 :         wake_events |= WL_TIMEOUT;
     391              :     }
     392              : 
     393              :     /*
     394              :      * Add our process to the waiters heap.  It might happen that target LSN
     395              :      * gets reached before we do.  The check at the beginning of the loop
     396              :      * below prevents the race condition.
     397              :      */
     398           43 :     addLSNWaiter(targetLSN, lsnType);
     399              : 
     400              :     for (;;)
     401           49 :     {
     402              :         int         rc;
     403           92 :         long        delay_ms = -1;
     404              : 
     405              :         /* Get current LSN for the wait type */
     406           92 :         currentLSN = GetCurrentLSNForWaitType(lsnType);
     407              : 
     408              :         /* Check that recovery is still in-progress */
     409           92 :         if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
     410              :         {
     411              :             /*
     412              :              * Recovery was ended, but check if target LSN was already
     413              :              * reached.
     414              :              */
     415            6 :             deleteLSNWaiter(lsnType);
     416              : 
     417            6 :             if (PromoteIsTriggered() && targetLSN <= currentLSN)
     418            1 :                 return WAIT_LSN_RESULT_SUCCESS;
     419            5 :             return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
     420              :         }
     421              :         else
     422              :         {
     423              :             /* Check if the waited LSN has been reached */
     424           86 :             if (targetLSN <= currentLSN)
     425           34 :                 break;
     426              :         }
     427              : 
     428           52 :         if (timeout > 0)
     429              :         {
     430           40 :             delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
     431           40 :             if (delay_ms <= 0)
     432            3 :                 break;
     433              :         }
     434              : 
     435           49 :         CHECK_FOR_INTERRUPTS();
     436              : 
     437           49 :         rc = WaitLatch(MyLatch, wake_events, delay_ms,
     438           49 :                        WaitLSNWaitEvents[lsnType]);
     439              : 
     440              :         /*
     441              :          * Emergency bailout if postmaster has died.  This is to avoid the
     442              :          * necessity for manual cleanup of all postmaster children.
     443              :          */
     444           49 :         if (rc & WL_POSTMASTER_DEATH)
     445            0 :             ereport(FATAL,
     446              :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
     447              :                     errmsg("terminating connection due to unexpected postmaster exit"),
     448              :                     errcontext("while waiting for LSN"));
     449              : 
     450           49 :         if (rc & WL_LATCH_SET)
     451           46 :             ResetLatch(MyLatch);
     452              :     }
     453              : 
     454              :     /*
     455              :      * Delete our process from the shared memory heap.  We might already be
     456              :      * deleted by the startup process.  The 'inHeap' flags prevents us from
     457              :      * the double deletion.
     458              :      */
     459           37 :     deleteLSNWaiter(lsnType);
     460              : 
     461              :     /*
     462              :      * If we didn't reach the target LSN, we must be exited by timeout.
     463              :      */
     464           37 :     if (targetLSN > currentLSN)
     465            3 :         return WAIT_LSN_RESULT_TIMEOUT;
     466              : 
     467           34 :     return WAIT_LSN_RESULT_SUCCESS;
     468              : }
        

Generated by: LCOV version 2.0-1