LCOV - code coverage report
Current view: top level - src/backend/access/transam - xlogwait.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 96.1 % 129 124
Test Date: 2026-03-24 01:16:09 Functions: 100.0 % 12 12
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * xlogwait.c
       4              :  *    Implements waiting for WAL operations to reach specific LSNs.
       5              :  *
       6              :  * Copyright (c) 2025-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *    src/backend/access/transam/xlogwait.c
      10              :  *
      11              :  * NOTES
      12              :  *      This file implements waiting for WAL operations to reach specific LSNs
      13              :  *      on both physical standby and primary servers. The core idea is simple:
      14              :  *      every process that wants to wait publishes the LSN it needs to the
      15              :  *      shared memory, and the appropriate process (startup on standby,
      16              :  *      walreceiver on standby, or WAL writer/backend on primary) wakes it
      17              :  *      once that LSN has been reached.
      18              :  *
      19              :  *      The shared memory used by this module comprises a procInfos
      20              :  *      per-backend array with the information of the awaited LSN for each
      21              :  *      of the backend processes.  The elements of that array are organized
      22              :  *      into pairing heaps (waitersHeap), one for each WaitLSNType, which
      23              :  *      allows for very fast finding of the least awaited LSN for each type.
      24              :  *
      25              :  *      In addition, the least-awaited LSN for each type is cached in the
      26              :  *      minWaitedLSN array.  The waiter process publishes information about
      27              :  *      itself to the shared memory and waits on the latch until it is woken
      28              :  *      up by the appropriate process, standby is promoted, or the postmaster
      29              :  *      dies.  Then, it cleans information about itself in the shared memory.
      30              :  *
      31              :  *      On standby servers:
      32              :  *      - After replaying a WAL record, the startup process performs a fast
      33              :  *        path check minWaitedLSN[REPLAY] > replayLSN.  If this check is
      34              :  *        negative, it checks waitersHeap[REPLAY] and wakes up the backends
      35              :  *        whose awaited LSNs are reached.
      36              :  *      - After receiving WAL, the walreceiver process performs similar checks
      37              :  *        against the flush and write LSNs, waking up waiters in the FLUSH
      38              :  *        and WRITE heaps, respectively.
      39              :  *
      40              :  *      On primary servers: After flushing WAL, the WAL writer or backend
      41              :  *      process performs a similar check against the flush LSN and wakes up
      42              :  *      waiters whose target flush LSNs have been reached.
      43              :  *
      44              :  *-------------------------------------------------------------------------
      45              :  */
      46              : 
      47              : #include "postgres.h"
      48              : 
      49              : #include <float.h>
      50              : 
      51              : #include "access/xlog.h"
      52              : #include "access/xlogrecovery.h"
      53              : #include "access/xlogwait.h"
      54              : #include "miscadmin.h"
      55              : #include "pgstat.h"
      56              : #include "replication/walreceiver.h"
      57              : #include "storage/latch.h"
      58              : #include "storage/proc.h"
      59              : #include "storage/shmem.h"
      60              : #include "utils/fmgrprotos.h"
      61              : #include "utils/pg_lsn.h"
      62              : #include "utils/snapmgr.h"
      63              : #include "utils/wait_event.h"
      64              : 
      65              : 
      66              : static int  waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b,
      67              :                         void *arg);
      68              : 
      69              : struct WaitLSNState *waitLSNState = NULL;
      70              : 
      71              : /*
      72              :  * Wait event for each WaitLSNType, used with WaitLatch() to report
      73              :  * the wait in pg_stat_activity.
      74              :  */
      75              : static const uint32 WaitLSNWaitEvents[] = {
      76              :     [WAIT_LSN_TYPE_STANDBY_REPLAY] = WAIT_EVENT_WAIT_FOR_WAL_REPLAY,
      77              :     [WAIT_LSN_TYPE_STANDBY_WRITE] = WAIT_EVENT_WAIT_FOR_WAL_WRITE,
      78              :     [WAIT_LSN_TYPE_STANDBY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      79              :     [WAIT_LSN_TYPE_PRIMARY_FLUSH] = WAIT_EVENT_WAIT_FOR_WAL_FLUSH,
      80              : };
      81              : 
      82              : StaticAssertDecl(lengthof(WaitLSNWaitEvents) == WAIT_LSN_TYPE_COUNT,
      83              :                  "WaitLSNWaitEvents must match WaitLSNType enum");
      84              : 
      85              : /*
      86              :  * Get the current LSN for the specified wait type.
      87              :  */
      88              : XLogRecPtr
      89           95 : GetCurrentLSNForWaitType(WaitLSNType lsnType)
      90              : {
      91              :     Assert(lsnType >= 0 && lsnType < WAIT_LSN_TYPE_COUNT);
      92              : 
      93           95 :     switch (lsnType)
      94              :     {
      95           36 :         case WAIT_LSN_TYPE_STANDBY_REPLAY:
      96           36 :             return GetXLogReplayRecPtr(NULL);
      97              : 
      98           26 :         case WAIT_LSN_TYPE_STANDBY_WRITE:
      99           26 :             return GetWalRcvWriteRecPtr();
     100              : 
     101           27 :         case WAIT_LSN_TYPE_STANDBY_FLUSH:
     102           27 :             return GetWalRcvFlushRecPtr(NULL, NULL);
     103              : 
     104            6 :         case WAIT_LSN_TYPE_PRIMARY_FLUSH:
     105            6 :             return GetFlushRecPtr(NULL);
     106              :     }
     107              : 
     108            0 :     elog(ERROR, "invalid LSN wait type: %d", lsnType);
     109              :     pg_unreachable();
     110              : }
     111              : 
     112              : /* Report the amount of shared memory space needed for WaitLSNState. */
     113              : Size
     114         3387 : WaitLSNShmemSize(void)
     115              : {
     116              :     Size        size;
     117              : 
     118         3387 :     size = offsetof(WaitLSNState, procInfos);
     119         3387 :     size = add_size(size, mul_size(MaxBackends + NUM_AUXILIARY_PROCS, sizeof(WaitLSNProcInfo)));
     120         3387 :     return size;
     121              : }
     122              : 
     123              : /* Initialize the WaitLSNState in the shared memory. */
     124              : void
     125         1180 : WaitLSNShmemInit(void)
     126              : {
     127              :     bool        found;
     128              : 
     129         1180 :     waitLSNState = (WaitLSNState *) ShmemInitStruct("WaitLSNState",
     130              :                                                     WaitLSNShmemSize(),
     131              :                                                     &found);
     132         1180 :     if (!found)
     133              :     {
     134              :         int         i;
     135              : 
     136              :         /* Initialize heaps and tracking */
     137         5900 :         for (i = 0; i < WAIT_LSN_TYPE_COUNT; i++)
     138              :         {
     139         4720 :             pg_atomic_init_u64(&waitLSNState->minWaitedLSN[i], PG_UINT64_MAX);
     140         4720 :             pairingheap_initialize(&waitLSNState->waitersHeap[i], waitlsn_cmp, NULL);
     141              :         }
     142              : 
     143              :         /* Initialize process info array */
     144         1180 :         memset(&waitLSNState->procInfos, 0,
     145         1180 :                (MaxBackends + NUM_AUXILIARY_PROCS) * sizeof(WaitLSNProcInfo));
     146              :     }
     147         1180 : }
     148              : 
     149              : /*
     150              :  * Comparison function for LSN waiters heaps. Waiting processes are ordered by
     151              :  * LSN, so that the waiter with smallest LSN is at the top.
     152              :  */
     153              : static int
     154           27 : waitlsn_cmp(const pairingheap_node *a, const pairingheap_node *b, void *arg)
     155              : {
     156           27 :     const WaitLSNProcInfo *aproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, a);
     157           27 :     const WaitLSNProcInfo *bproc = pairingheap_const_container(WaitLSNProcInfo, heapNode, b);
     158              : 
     159           27 :     if (aproc->waitLSN < bproc->waitLSN)
     160           15 :         return 1;
     161           12 :     else if (aproc->waitLSN > bproc->waitLSN)
     162            9 :         return -1;
     163              :     else
     164            3 :         return 0;
     165              : }
     166              : 
     167              : /*
     168              :  * Update minimum waited LSN for the specified LSN type
     169              :  */
     170              : static void
     171         2982 : updateMinWaitedLSN(WaitLSNType lsnType)
     172              : {
     173         2982 :     XLogRecPtr  minWaitedLSN = PG_UINT64_MAX;
     174         2982 :     int         i = (int) lsnType;
     175              : 
     176              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     177              : 
     178         2982 :     if (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     179              :     {
     180           47 :         pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     181           47 :         WaitLSNProcInfo *procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     182              : 
     183           47 :         minWaitedLSN = procInfo->waitLSN;
     184              :     }
     185         2982 :     pg_atomic_write_u64(&waitLSNState->minWaitedLSN[i], minWaitedLSN);
     186         2982 : }
     187              : 
     188              : /*
     189              :  * Add current process to appropriate waiters heap based on LSN type
     190              :  */
     191              : static void
     192           43 : addLSNWaiter(XLogRecPtr lsn, WaitLSNType lsnType)
     193              : {
     194           43 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     195           43 :     int         i = (int) lsnType;
     196              : 
     197              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     198              : 
     199           43 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     200              : 
     201           43 :     procInfo->procno = MyProcNumber;
     202           43 :     procInfo->waitLSN = lsn;
     203           43 :     procInfo->lsnType = lsnType;
     204              : 
     205              :     Assert(!procInfo->inHeap);
     206           43 :     pairingheap_add(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     207           43 :     procInfo->inHeap = true;
     208           43 :     updateMinWaitedLSN(lsnType);
     209              : 
     210           43 :     LWLockRelease(WaitLSNLock);
     211           43 : }
     212              : 
     213              : /*
     214              :  * Remove current process from appropriate waiters heap based on LSN type
     215              :  */
     216              : static void
     217           43 : deleteLSNWaiter(WaitLSNType lsnType)
     218              : {
     219           43 :     WaitLSNProcInfo *procInfo = &waitLSNState->procInfos[MyProcNumber];
     220           43 :     int         i = (int) lsnType;
     221              : 
     222              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     223              : 
     224           43 :     LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     225              : 
     226              :     Assert(procInfo->lsnType == lsnType);
     227              : 
     228           43 :     if (procInfo->inHeap)
     229              :     {
     230           17 :         pairingheap_remove(&waitLSNState->waitersHeap[i], &procInfo->heapNode);
     231           17 :         procInfo->inHeap = false;
     232           17 :         updateMinWaitedLSN(lsnType);
     233              :     }
     234              : 
     235           43 :     LWLockRelease(WaitLSNLock);
     236           43 : }
     237              : 
     238              : /*
     239              :  * Size of a static array of procs to wakeup by WaitLSNWakeup() allocated
     240              :  * on the stack.  It should be enough to take single iteration for most cases.
     241              :  */
     242              : #define WAKEUP_PROC_STATIC_ARRAY_SIZE (16)
     243              : 
     244              : /*
     245              :  * Remove waiters whose LSN has been reached from the heap and set their
     246              :  * latches.  If InvalidXLogRecPtr is given, remove all waiters from the heap
     247              :  * and set latches for all waiters.
     248              :  *
     249              :  * This function first accumulates waiters to wake up into an array, then
     250              :  * wakes them up without holding a WaitLSNLock.  The array size is static and
     251              :  * equal to WAKEUP_PROC_STATIC_ARRAY_SIZE.  That should be more than enough
     252              :  * to wake up all the waiters at once in the vast majority of cases.  However,
     253              :  * if there are more waiters, this function will loop to process them in
     254              :  * multiple chunks.
     255              :  */
     256              : static void
     257         2922 : wakeupWaiters(WaitLSNType lsnType, XLogRecPtr currentLSN)
     258              : {
     259              :     ProcNumber  wakeUpProcs[WAKEUP_PROC_STATIC_ARRAY_SIZE];
     260              :     int         numWakeUpProcs;
     261         2922 :     int         i = (int) lsnType;
     262              : 
     263              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     264              : 
     265              :     do
     266              :     {
     267              :         int         j;
     268              : 
     269         2922 :         numWakeUpProcs = 0;
     270         2922 :         LWLockAcquire(WaitLSNLock, LW_EXCLUSIVE);
     271              : 
     272              :         /*
     273              :          * Iterate the waiters heap until we find LSN not yet reached. Record
     274              :          * process numbers to wake up, but send wakeups after releasing lock.
     275              :          */
     276         2948 :         while (!pairingheap_is_empty(&waitLSNState->waitersHeap[i]))
     277              :         {
     278           30 :             pairingheap_node *node = pairingheap_first(&waitLSNState->waitersHeap[i]);
     279              :             WaitLSNProcInfo *procInfo;
     280              : 
     281              :             /* Get procInfo using appropriate heap node */
     282           30 :             procInfo = pairingheap_container(WaitLSNProcInfo, heapNode, node);
     283              : 
     284           30 :             if (XLogRecPtrIsValid(currentLSN) && procInfo->waitLSN > currentLSN)
     285            4 :                 break;
     286              : 
     287              :             Assert(numWakeUpProcs < WAKEUP_PROC_STATIC_ARRAY_SIZE);
     288           26 :             wakeUpProcs[numWakeUpProcs++] = procInfo->procno;
     289           26 :             (void) pairingheap_remove_first(&waitLSNState->waitersHeap[i]);
     290              : 
     291              :             /* Update appropriate flag */
     292           26 :             procInfo->inHeap = false;
     293              : 
     294           26 :             if (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE)
     295            0 :                 break;
     296              :         }
     297              : 
     298         2922 :         updateMinWaitedLSN(lsnType);
     299         2922 :         LWLockRelease(WaitLSNLock);
     300              : 
     301              :         /*
     302              :          * Set latches for processes whose waited LSNs have been reached.
     303              :          * Since SetLatch() is a time-consuming operation, we do this outside
     304              :          * of WaitLSNLock. This is safe because procLatch is never freed, so
     305              :          * at worst we may set a latch for the wrong process or for no process
     306              :          * at all, which is harmless.
     307              :          */
     308         2948 :         for (j = 0; j < numWakeUpProcs; j++)
     309           26 :             SetLatch(&GetPGProcByNumber(wakeUpProcs[j])->procLatch);
     310              : 
     311         2922 :     } while (numWakeUpProcs == WAKEUP_PROC_STATIC_ARRAY_SIZE);
     312         2922 : }
     313              : 
     314              : /*
     315              :  * Wake up processes waiting for LSN to reach currentLSN
     316              :  */
     317              : void
     318         2922 : WaitLSNWakeup(WaitLSNType lsnType, XLogRecPtr currentLSN)
     319              : {
     320         2922 :     int         i = (int) lsnType;
     321              : 
     322              :     Assert(i >= 0 && i < WAIT_LSN_TYPE_COUNT);
     323              : 
     324              :     /*
     325              :      * Fast path check.  Skip if currentLSN is InvalidXLogRecPtr, which means
     326              :      * "wake all waiters" (e.g., during promotion when recovery ends).
     327              :      */
     328         2934 :     if (XLogRecPtrIsValid(currentLSN) &&
     329           12 :         pg_atomic_read_u64(&waitLSNState->minWaitedLSN[i]) > currentLSN)
     330            0 :         return;
     331              : 
     332         2922 :     wakeupWaiters(lsnType, currentLSN);
     333              : }
     334              : 
     335              : /*
     336              :  * Clean up LSN waiters for exiting process
     337              :  */
     338              : void
     339        53780 : WaitLSNCleanup(void)
     340              : {
     341        53780 :     if (waitLSNState)
     342              :     {
     343              :         /*
     344              :          * We do a fast-path check of the inHeap flag without the lock.  This
     345              :          * flag is set to true only by the process itself.  So, it's only
     346              :          * possible to get a false positive.  But that will be eliminated by a
     347              :          * recheck inside deleteLSNWaiter().
     348              :          */
     349        53780 :         if (waitLSNState->procInfos[MyProcNumber].inHeap)
     350            0 :             deleteLSNWaiter(waitLSNState->procInfos[MyProcNumber].lsnType);
     351              :     }
     352        53780 : }
     353              : 
     354              : /*
     355              :  * Check if the given LSN type requires recovery to be in progress.
     356              :  * Standby wait types (replay, write, flush) require recovery;
     357              :  * primary wait types (flush) do not.
     358              :  */
     359              : static inline bool
     360           91 : WaitLSNTypeRequiresRecovery(WaitLSNType t)
     361              : {
     362           57 :     return t == WAIT_LSN_TYPE_STANDBY_REPLAY ||
     363          148 :         t == WAIT_LSN_TYPE_STANDBY_WRITE ||
     364              :         t == WAIT_LSN_TYPE_STANDBY_FLUSH;
     365              : }
     366              : 
     367              : /*
     368              :  * Wait using MyLatch till the given LSN is reached, the replica gets
     369              :  * promoted, or the postmaster dies.
     370              :  *
     371              :  * Returns WAIT_LSN_RESULT_SUCCESS if target LSN was reached.
     372              :  * Returns WAIT_LSN_RESULT_NOT_IN_RECOVERY if run not in recovery,
     373              :  * or replica got promoted before the target LSN reached.
     374              :  */
     375              : WaitLSNResult
     376           43 : WaitForLSN(WaitLSNType lsnType, XLogRecPtr targetLSN, int64 timeout)
     377              : {
     378              :     XLogRecPtr  currentLSN;
     379           43 :     TimestampTz endtime = 0;
     380           43 :     int         wake_events = WL_LATCH_SET | WL_POSTMASTER_DEATH;
     381              : 
     382              :     /* Shouldn't be called when shmem isn't initialized */
     383              :     Assert(waitLSNState);
     384              : 
     385              :     /* Should have a valid proc number */
     386              :     Assert(MyProcNumber >= 0 && MyProcNumber < MaxBackends + NUM_AUXILIARY_PROCS);
     387              : 
     388           43 :     if (timeout > 0)
     389              :     {
     390           32 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(), timeout);
     391           32 :         wake_events |= WL_TIMEOUT;
     392              :     }
     393              : 
     394              :     /*
     395              :      * Add our process to the waiters heap.  It might happen that target LSN
     396              :      * gets reached before we do.  The check at the beginning of the loop
     397              :      * below prevents the race condition.
     398              :      */
     399           43 :     addLSNWaiter(targetLSN, lsnType);
     400              : 
     401              :     for (;;)
     402           48 :     {
     403              :         int         rc;
     404           91 :         long        delay_ms = -1;
     405              : 
     406              :         /* Get current LSN for the wait type */
     407           91 :         currentLSN = GetCurrentLSNForWaitType(lsnType);
     408              : 
     409              :         /* Check that recovery is still in-progress */
     410           91 :         if (WaitLSNTypeRequiresRecovery(lsnType) && !RecoveryInProgress())
     411              :         {
     412              :             /*
     413              :              * Recovery was ended, but check if target LSN was already
     414              :              * reached.
     415              :              */
     416            6 :             deleteLSNWaiter(lsnType);
     417              : 
     418            6 :             if (PromoteIsTriggered() && targetLSN <= currentLSN)
     419            1 :                 return WAIT_LSN_RESULT_SUCCESS;
     420            5 :             return WAIT_LSN_RESULT_NOT_IN_RECOVERY;
     421              :         }
     422              :         else
     423              :         {
     424              :             /* Check if the waited LSN has been reached */
     425           85 :             if (targetLSN <= currentLSN)
     426           34 :                 break;
     427              :         }
     428              : 
     429           51 :         if (timeout > 0)
     430              :         {
     431           39 :             delay_ms = TimestampDifferenceMilliseconds(GetCurrentTimestamp(), endtime);
     432           39 :             if (delay_ms <= 0)
     433            3 :                 break;
     434              :         }
     435              : 
     436           48 :         CHECK_FOR_INTERRUPTS();
     437              : 
     438           48 :         rc = WaitLatch(MyLatch, wake_events, delay_ms,
     439           48 :                        WaitLSNWaitEvents[lsnType]);
     440              : 
     441              :         /*
     442              :          * Emergency bailout if postmaster has died.  This is to avoid the
     443              :          * necessity for manual cleanup of all postmaster children.
     444              :          */
     445           48 :         if (rc & WL_POSTMASTER_DEATH)
     446            0 :             ereport(FATAL,
     447              :                     errcode(ERRCODE_ADMIN_SHUTDOWN),
     448              :                     errmsg("terminating connection due to unexpected postmaster exit"),
     449              :                     errcontext("while waiting for LSN"));
     450              : 
     451           48 :         if (rc & WL_LATCH_SET)
     452           45 :             ResetLatch(MyLatch);
     453              :     }
     454              : 
     455              :     /*
     456              :      * Delete our process from the shared memory heap.  We might already be
     457              :      * deleted by the startup process.  The 'inHeap' flags prevents us from
     458              :      * the double deletion.
     459              :      */
     460           37 :     deleteLSNWaiter(lsnType);
     461              : 
     462              :     /*
     463              :      * If we didn't reach the target LSN, we must be exited by timeout.
     464              :      */
     465           37 :     if (targetLSN > currentLSN)
     466            3 :         return WAIT_LSN_RESULT_TIMEOUT;
     467              : 
     468           34 :     return WAIT_LSN_RESULT_SUCCESS;
     469              : }
        

Generated by: LCOV version 2.0-1