LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogwait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 97.7 % 131 128
Test Date: 2026-05-07 11:16:29 Functions: 100.0 % 12 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogwait.c
       4              :  *    Implements waiting for WAL operations to reach specific LSNs.
       5              :  *
       6              :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/access/transam/xlogwait.c
      10              :  *
      11              :  * NOTES
      12              :  *      This file implements waiting for WAL operations to reach specific LSNs
      13              :  *      on both physical standby and primary servers. The core idea is simple:
      14              :  *      every process that wants to wait publishes the LSN it needs to the
      15              :  *      shared memory, and the appropriate process (startup on standby,
      16              :  *      walreceiver on standby, or WAL writer/backend on primary) wakes it
      17              :  *      once that LSN has been reached.
      18              :  *
      19              :  *      The shared memory used by this module comprises a procInfos
      20              :  *      per-backend array with the information of the awaited LSN for each
      21              :  *      of the backend processes.  The elements of that array are organized
      22              :  *      into pairing heaps (waitersHeap), one for each WaitLSNType, which
      23              :  *      allows for very fast finding of the least awaited LSN for each type.
      24              :  *
      25              :  *      In addition, the least-awaited LSN for each type is cached in the
      26              :  *      minWaitedLSN array.  The waiter process publishes information about
      27              :  *      itself to the shared memory and waits on the latch until it is woken
      28              :  *      up by the appropriate process, standby is promoted, or the postmaster
      29              :  *      dies.  Then, it cleans information about itself in the shared memory.
      30              :  *
      31              :  *      On standby servers:
      32              :  *      - After replaying a WAL record, the startup process performs a fast
      33              :  *        path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
      34              :  *        negative, it checks waitersHeap[REPLAY] and wakes up the backends
      35              :  *        whose awaited LSNs are reached.
      36              :  *      - After receiving WAL, the walreceiver process performs similar checks
      37              :  *        against the flush and write LSNs, waking up waiters in the FLUSH
      38              :  *        and WRITE heaps, respectively.
      39              :  *
      40              :  *      On primary servers: After flushing WAL, the WAL writer or backend
      41              :  *      process performs a similar check against the flush LSN and wakes up
      42              :  *      waiters whose target flush LSNs have been reached.
      43              :  *
      44              :  *-------------------------------------------------------------------------
      45              :  */
      46              : 
      47              : #include "postgres.h"
      48              : 
      49              : #include <float.h>
      50              : 
      51              : #include "access/xlog.h"
      52              : #include "access/xlogrecovery.h"
      53              : #include "access/xlogwait.h"
      54              : #include "miscadmin.h"
      55              : #include "pgstat.h"
      56              : #include "replication/walreceiver.h"
      57              : #include "storage/latch.h"
      58              : #include "storage/proc.h"
      59              : #include "storage/shmem.h"
      60              : #include "storage/subsystems.h"
      61              : #include "utils/fmgrprotos.h"
      62              : #include "utils/pg_lsn.h"
      63              : #include "utils/snapmgr.h"
      64              : #include "utils/wait_event.h"
      65              : 
      66              : 
      67              : static int  waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
      68              :                         void *arg);
      69              : 
      70              : struct WaitLSNState *waitLSNState = NULL;
      71              : 
      72              : static void WaitLSNShmemRequest(void *arg);
      73              : static void WaitLSNShmemInit(void *arg);
      74              : 
      75              : const ShmemCallbacks WaitLSNShmemCallbacks = {
      76              :     .request_fn = WaitLSNShmemRequest,
      77              :     .init_fn = WaitLSNShmemInit,
      78              : };
      79              : 
      80              : /*
      81              :  * Wait event for each WaitLSNType, used with WaitLatch() to report
      82              :  * the wait in pg_stat_activity.
      83              :  */
      84              : static const uint32 WaitLSNWaitEvents[] = {
      85              :     [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
      86              :     [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
      87              :     [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      88              :     [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      89              : };
      90              : 
      91              : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
      92              :                  "WaitLSNWaitEvents must match WaitLSNType enum");
      93              : 
      94              : /*
      95              :  * Get the current LSN for the specified wait type.  Provide memory
      96              :  * barrier semantics before getting the value.
      97              :  */
      98              : XLogRecPtr
      99        11614 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
     100              : {
     101              :     Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
     102              : 
     103              :     /*
     104              :      * All of the cases below provide memory barrier semantics:
     105              :      * GetWalRcvWriteRecPtr() and GetFlushRecPtr() have explicit barriers,
     106              :      * while GetXLogReplayRecPtr() and GetWalRcvFlushRecPtr() use spinlocks.
     107              :      */
     108        11614 :     switch (lsnType)
     109              :     {
     110          212 :         case WAIT_LSN_TYPE_STANDBY_REPLAY:
     111          212 :             return GetXLogReplayRecPtr(NULL);
     112              : 
     113           47 :         case WAIT_LSN_TYPE_STANDBY_WRITE:
     114              :             {
     115           47 :                 XLogRecPtr  recptr = GetWalRcvWriteRecPtr();
     116           47 :                 XLogRecPtr  replay = GetXLogReplayRecPtr(NULL);
     117              : 
     118              :                 /*
     119              :                  * Use the replay position as a floor.  WAL up to the replay
     120              :                  * point is already on disk from a base backup, archive
     121              :                  * restore, or prior streaming, so there is no reason to wait
     122              :                  * for the walreceiver to re-receive it.
     123              :                  */
     124           47 :                 return Max(recptr, replay);
     125              :             }
     126              : 
     127           35 :         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     128              :             {
     129           35 :                 XLogRecPtr  recptr = GetWalRcvFlushRecPtr(NULL, NULL);
     130           35 :                 XLogRecPtr  replay = GetXLogReplayRecPtr(NULL);
     131              : 
     132              :                 /* Same floor as standby_write; see comment above. */
     133           35 :                 return Max(recptr, replay);
     134              :             }
     135              : 
     136        11320 :         case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     137        11320 :             return GetFlushRecPtr(NULL);
     138              :     }
     139              : 
     140            0 :     elog(ERROR, "invalid LSN wait type: %d", lsnType);
     141              :     pg_unreachable();
     142              : }
     143              : 
     144              : /* Register the shared memory space needed for WaitLSNState. */
     145              : static void
     146         1248 : WaitLSNShmemRequest(void *arg)
     147              : {
     148              :     Size        size;
     149              : 
     150         1248 :     size = offsetof(WaitLSNState, procInfos);
     151         1248 :     size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
     152         1248 :     ShmemRequestStruct(.name = "WaitLSNState",
     153              :                        .size = size,
     154              :                        .ptr = (void **) &waitLSNState,
     155              :         );
     156         1248 : }
     157              : 
     158              : /* Initialize the WaitLSNState in the shared memory. */
     159              : static void
     160         1245 : WaitLSNShmemInit(void *arg)
     161              : {
     162              :     /* Initialize heaps and tracking */
     163         6225 :     for (int i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
     164              :     {
     165         4980 :         pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
     166         4980 :         pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
     167              :     }
     168              : 
     169              :     /* Initialize process info array */
     170         1245 :     memset(&waitLSNState->procInfos, 0,
     171         1245 :            (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
     172         1245 : }
     173              : 
     174              : /*
     175              :  * Comparison function for LSN waiters heaps. Waiting processes are ordered by
     176              :  * LSN, so that the waiter with smallest LSN is at the top.
     177              :  */
     178              : static int
     179           28 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
     180              : {
     181           28 :     const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
     182           28 :     const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
     183              : 
     184           28 :     if (aproc->waitLSN < bproc->waitLSN)
     185           15 :         return 1;
     186           13 :     else if (aproc->waitLSN > bproc->waitLSN)
     187           10 :         return -1;
     188              :     else
     189            3 :         return 0;
     190              : }
     191              : 
     192              : /*
     193              :  * Update minimum waited LSN for the specified LSN type
     194              :  */
     195              : static void
     196        26091 : updateMinWaitedLSN(WaitLSNType lsnType)
     197              : {
     198        26091 :     XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
     199        26091 :     int         i = (int) lsnType;
     200              : 
     201              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     202              : 
     203        26091 :     if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     204              :     {
     205        11531 :         pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     206        11531 :         WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     207              : 
     208        11531 :         minWaitedLSN = procInfo->waitLSN;
     209              :     }
     210              :     /* Pairs with pg_atomic_read_membarrier_u64() in WaitLSNWakeup(). */
     211        26091 :     pg_atomic_write_membarrier_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
     212        26091 : }
     213              : 
     214              : /*
     215              :  * Add current process to appropriate waiters heap based on LSN type
     216              :  */
     217              : static void
     218        11526 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
     219              : {
     220        11526 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     221        11526 :     int         i = (int) lsnType;
     222              : 
     223              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     224              : 
     225        11526 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     226              : 
     227        11526 :     procInfo->procno = MyProcNumber;
     228        11526 :     procInfo->waitLSN = lsn;
     229        11526 :     procInfo->lsnType = lsnType;
     230              : 
     231              :     Assert(!procInfo->inHeap);
     232        11526 :     pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     233        11526 :     procInfo->inHeap = true;
     234        11526 :     updateMinWaitedLSN(lsnType);
     235              : 
     236        11526 :     LWLockRelease(WaitLSNLock);
     237        11526 : }
     238              : 
     239              : /*
     240              :  * Remove current process from appropriate waiters heap based on LSN type
     241              :  */
     242              : static void
     243        11526 : deleteLSNWaiter(WaitLSNType lsnType)
     244              : {
     245        11526 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     246        11526 :     int         i = (int) lsnType;
     247              : 
     248              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     249              : 
     250        11526 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     251              : 
     252              :     Assert(procInfo->lsnType == lsnType);
     253              : 
     254        11526 :     if (procInfo->inHeap)
     255              :     {
     256        11479 :         pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     257        11479 :         procInfo->inHeap = false;
     258        11479 :         updateMinWaitedLSN(lsnType);
     259              :     }
     260              : 
     261        11526 :     LWLockRelease(WaitLSNLock);
     262        11526 : }
     263              : 
     264              : /*
     265              :  * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
     266              :  * on the stack.  It should be enough to take single iteration for most cases.
     267              :  */
     268              : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
     269              : 
     270              : /*
     271              :  * Remove waiters whose LSN has been reached from the heap and set their
     272              :  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
     273              :  * and set latches for all waiters.
     274              :  *
     275              :  * This function first accumulates waiters to wake up into an array, then
     276              :  * wakes them up without holding a WaitLSNLock.  The array size is static and
     277              :  * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
     278              :  * to wake up all the waiters at once in the vast majority of cases.  However,
     279              :  * if there are more waiters, this function will loop to process them in
     280              :  * multiple chunks.
     281              :  */
     282              : static void
     283         3086 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
     284              : {
     285              :     ProcNumber  wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
     286              :     int         numWakeUpProcs;
     287         3086 :     int         i = (int) lsnType;
     288              : 
     289              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     290              : 
     291              :     do
     292              :     {
     293              :         int         j;
     294              : 
     295         3086 :         numWakeUpProcs = 0;
     296         3086 :         LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     297              : 
     298              :         /*
     299              :          * Iterate the waiters heap until we find LSN not yet reached. Record
     300              :          * process numbers to wake up, but send wakeups after releasing lock.
     301              :          */
     302         3132 :         while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     303              :         {
     304           50 :             pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     305              :             WaitLSNProcInfo *procInfo;
     306              : 
     307              :             /* Get procInfo using appropriate heap node */
     308           50 :             procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     309              : 
     310           50 :             if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
     311            4 :                 break;
     312              : 
     313              :             Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
     314           46 :             wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
     315           46 :             (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
     316              : 
     317              :             /* Update appropriate flag */
     318           46 :             procInfo->inHeap = false;
     319              : 
     320           46 :             if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
     321            0 :                 break;
     322              :         }
     323              : 
     324         3086 :         updateMinWaitedLSN(lsnType);
     325         3086 :         LWLockRelease(WaitLSNLock);
     326              : 
     327              :         /*
     328              :          * Set latches for processes whose waited LSNs have been reached.
     329              :          * Since SetLatch() is a time-consuming operation, we do this outside
     330              :          * of WaitLSNLock. This is safe because procLatch is never freed, so
     331              :          * at worst we may set a latch for the wrong process or for no process
     332              :          * at all, which is harmless.
     333              :          */
     334         3132 :         for (j = 0; j < numWakeUpProcs; j++)
     335           46 :             SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
     336              : 
     337         3086 :     } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
     338         3086 : }
     339              : 
     340              : /*
     341              :  * Wake up processes waiting for LSN to reach currentLSN
     342              :  */
     343              : void
     344      9182423 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
     345              : {
     346      9182423 :     int         i = (int) lsnType;
     347              : 
     348              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     349              : 
     350              :     /*
     351              :      * Fast path check.  Skip if currentLSN is InvalidXLogRecPtr, which means
     352              :      * "wake all waiters" (e.g., during promotion when recovery ends). Pairs
     353              :      * with pg_atomic_write_membarrier_u64() in updateMinWaitedLSN().
     354              :      */
     355     18361792 :     if (XLogRecPtrIsValid(currentLSN) &&
     356      9179369 :         pg_atomic_read_membarrier_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
     357      9179337 :         return;
     358              : 
     359         3086 :     wakeupWaiters(lsnType, currentLSN);
     360              : }
     361              : 
     362              : /*
     363              :  * Clean up any LSN wait state for the current process.
     364              :  */
     365              : void
     366        60742 : WaitLSNCleanup(void)
     367              : {
     368        60742 :     if (waitLSNState)
     369              :     {
     370              :         /*
     371              :          * We do a fast-path check of the inHeap flag without the lock.  This
     372              :          * flag is set to true only by the process itself.  So, it's only
     373              :          * possible to get a false positive.  But that will be eliminated by a
     374              :          * recheck inside deleteLSNWaiter().
     375              :          */
     376        60742 :         if (waitLSNState->procInfos[MyProcNumber].inHeap)
     377            1 :             deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
     378              :     }
     379        60742 : }
     380              : 
     381              : /*
     382              :  * Check if the given LSN type requires recovery to be in progress.
     383              :  * Standby wait types (replay, write, flush) require recovery;
     384              :  * primary wait types (flush) do not.
     385              :  */
     386              : static inline bool
     387        11610 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
     388              : {
     389        11400 :     return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
     390        23010 :         t == WAIT_LSN_TYPE_STANDBY_WRITE ||
     391              :         t == WAIT_LSN_TYPE_STANDBY_FLUSH;
     392              : }
     393              : 
     394              : /*
     395              :  * Wait using MyLatch till the given LSN is reached, the replica gets
     396              :  * promoted, or the postmaster dies.
     397              :  *
     398              :  * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
     399              :  * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
     400              :  * or replica got promoted before the target LSN reached.
     401              :  */
     402              : WaitLSNResult
     403        11526 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
     404              : {
     405              :     XLogRecPtr  currentLSN;
     406        11526 :     TimestampTz endtime = 0;
     407        11526 :     int         wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
     408              : 
     409              :     /* Shouldn't be called when shmem isn't initialized */
     410              :     Assert(waitLSNState);
     411              : 
     412              :     /* Should have a valid proc number */
     413              :     Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
     414              : 
     415        11526 :     if (timeout > 0)
     416              :     {
     417        11514 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
     418        11514 :         wake_events |= WL_TIMEOUT;
     419              :     }
     420              : 
     421              :     /*
     422              :      * Add our process to the waiters heap.  It might happen that target LSN
     423              :      * gets reached before we do.  The check at the beginning of the loop
     424              :      * below prevents the race condition.
     425              :      */
     426        11526 :     addLSNWaiter(targetLSN, lsnType);
     427              : 
     428              :     for (;;)
     429           84 :     {
     430              :         int         rc;
     431        11610 :         long        delay_ms = -1;
     432              : 
     433              :         /* Get current LSN for the wait type */
     434        11610 :         currentLSN = GetCurrentLSNForWaitType(lsnType);
     435              : 
     436              :         /* Check that recovery is still in-progress */
     437        11610 :         if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
     438              :         {
     439              :             /*
     440              :              * Recovery was ended, but check if target LSN was already
     441              :              * reached.
     442              :              */
     443            6 :             deleteLSNWaiter(lsnType);
     444              : 
     445            6 :             if (PromoteIsTriggered() && targetLSN <= currentLSN)
     446            1 :                 return WAIT_LSN_RESULT_SUCCESS;
     447            5 :             return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
     448              :         }
     449              :         else
     450              :         {
     451              :             /* Check if the waited LSN has been reached */
     452        11604 :             if (targetLSN <= currentLSN)
     453        11508 :                 break;
     454              :         }
     455              : 
     456           96 :         if (timeout > 0)
     457              :         {
     458           82 :             delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
     459           82 :             if (delay_ms <= 0)
     460           11 :                 break;
     461              :         }
     462              : 
     463           85 :         CHECK_FOR_INTERRUPTS();
     464              : 
     465           84 :         rc = WaitLatch(MyLatch, wake_events, delay_ms,
     466           84 :                        WaitLSNWaitEvents[lsnType]);
     467              : 
     468              :         /*
     469              :          * Emergency bailout if postmaster has died.  This is to avoid the
     470              :          * necessity for manual cleanup of all postmaster children.
     471              :          */
     472           84 :         if (rc & WL_POSTMASTER_DEATH)
     473            0 :             ereport(FATAL,
     474              :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
     475              :                     errmsg("terminating connection due to unexpected postmaster exit"),
     476              :                     errcontext("while waiting for LSN"));
     477              : 
     478           84 :         ResetLatch(MyLatch);
     479              :     }
     480              : 
     481              :     /*
     482              :      * Delete our process from the shared memory heap.  We might already be
     483              :      * deleted by the startup process.  The 'inHeap' flags prevents us from
     484              :      * the double deletion.
     485              :      */
     486        11519 :     deleteLSNWaiter(lsnType);
     487              : 
     488              :     /*
     489              :      * If we didn't reach the target LSN, we must be exited by timeout.
     490              :      */
     491        11519 :     if (targetLSN > currentLSN)
     492           11 :         return WAIT_LSN_RESULT_TIMEOUT;
     493              : 
     494        11508 :     return WAIT_LSN_RESULT_SUCCESS;
     495              : }
        

Generated by: LCOV version 2.0-1