LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 127 153 83.0 %
Date: 2025-11-13 01:18:06 Functions: 9 11 81.8 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "access/xlogrecovery.h"
      27             : #include "pgstat.h"
      28             : #include "replication/walreceiver.h"
      29             : #include "storage/pmsignal.h"
      30             : #include "storage/proc.h"
      31             : #include "storage/shmem.h"
      32             : #include "utils/timestamp.h"
      33             : 
      34             : WalRcvData *WalRcv = NULL;
      35             : 
      36             : /*
      37             :  * How long to wait for walreceiver to start up after requesting
      38             :  * postmaster to launch it. In seconds.
      39             :  */
      40             : #define WALRCV_STARTUP_TIMEOUT 10
      41             : 
      42             : /* Report shared memory space needed by WalRcvShmemInit */
      43             : Size
      44        8500 : WalRcvShmemSize(void)
      45             : {
      46        8500 :     Size        size = 0;
      47             : 
      48        8500 :     size = add_size(size, sizeof(WalRcvData));
      49             : 
      50        8500 :     return size;
      51             : }
      52             : 
      53             : /* Allocate and initialize walreceiver-related shared memory */
      54             : void
      55        2200 : WalRcvShmemInit(void)
      56             : {
      57             :     bool        found;
      58             : 
      59        2200 :     WalRcv = (WalRcvData *)
      60        2200 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      61             : 
      62        2200 :     if (!found)
      63             :     {
      64             :         /* First time through, so initialize */
      65        2200 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      66        2200 :         WalRcv->walRcvState = WALRCV_STOPPED;
      67        2200 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      68        2200 :         SpinLockInit(&WalRcv->mutex);
      69        2200 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      70        2200 :         WalRcv->procno = INVALID_PROC_NUMBER;
      71             :     }
      72        2200 : }
      73             : 
      74             : /* Is walreceiver running (or starting up)? */
      75             : bool
      76        1976 : WalRcvRunning(void)
      77             : {
      78        1976 :     WalRcvData *walrcv = WalRcv;
      79             :     WalRcvState state;
      80             :     pg_time_t   startTime;
      81             : 
      82        1976 :     SpinLockAcquire(&walrcv->mutex);
      83             : 
      84        1976 :     state = walrcv->walRcvState;
      85        1976 :     startTime = walrcv->startTime;
      86             : 
      87        1976 :     SpinLockRelease(&walrcv->mutex);
      88             : 
      89             :     /*
      90             :      * If it has taken too long for walreceiver to start up, give up. Setting
      91             :      * the state to STOPPED ensures that if walreceiver later does start up
      92             :      * after all, it will see that it's not supposed to be running and die
      93             :      * without doing anything.
      94             :      */
      95        1976 :     if (state == WALRCV_STARTING)
      96             :     {
      97           2 :         pg_time_t   now = (pg_time_t) time(NULL);
      98             : 
      99           2 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     100             :         {
     101           0 :             bool        stopped = false;
     102             : 
     103           0 :             SpinLockAcquire(&walrcv->mutex);
     104           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     105             :             {
     106           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     107           0 :                 stopped = true;
     108             :             }
     109           0 :             SpinLockRelease(&walrcv->mutex);
     110             : 
     111           0 :             if (stopped)
     112           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     113             :         }
     114             :     }
     115             : 
     116        1976 :     if (state != WALRCV_STOPPED)
     117          72 :         return true;
     118             :     else
     119        1904 :         return false;
     120             : }
     121             : 
     122             : /* Return the state of the walreceiver. */
     123             : WalRcvState
     124           0 : WalRcvGetState(void)
     125             : {
     126           0 :     WalRcvData *walrcv = WalRcv;
     127             :     WalRcvState state;
     128             : 
     129           0 :     SpinLockAcquire(&walrcv->mutex);
     130           0 :     state = walrcv->walRcvState;
     131           0 :     SpinLockRelease(&walrcv->mutex);
     132             : 
     133           0 :     return state;
     134             : }
     135             : 
     136             : /*
     137             :  * Is walreceiver running and streaming (or at least attempting to connect,
     138             :  * or starting up)?
     139             :  */
     140             : bool
     141       20220 : WalRcvStreaming(void)
     142             : {
     143       20220 :     WalRcvData *walrcv = WalRcv;
     144             :     WalRcvState state;
     145             :     pg_time_t   startTime;
     146             : 
     147       20220 :     SpinLockAcquire(&walrcv->mutex);
     148             : 
     149       20220 :     state = walrcv->walRcvState;
     150       20220 :     startTime = walrcv->startTime;
     151             : 
     152       20220 :     SpinLockRelease(&walrcv->mutex);
     153             : 
     154             :     /*
     155             :      * If it has taken too long for walreceiver to start up, give up. Setting
     156             :      * the state to STOPPED ensures that if walreceiver later does start up
     157             :      * after all, it will see that it's not supposed to be running and die
     158             :      * without doing anything.
     159             :      */
     160       20220 :     if (state == WALRCV_STARTING)
     161             :     {
     162         584 :         pg_time_t   now = (pg_time_t) time(NULL);
     163             : 
     164         584 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     165             :         {
     166           0 :             bool        stopped = false;
     167             : 
     168           0 :             SpinLockAcquire(&walrcv->mutex);
     169           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     170             :             {
     171           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     172           0 :                 stopped = true;
     173             :             }
     174           0 :             SpinLockRelease(&walrcv->mutex);
     175             : 
     176           0 :             if (stopped)
     177           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     178             :         }
     179             :     }
     180             : 
     181       20220 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     182             :         state == WALRCV_RESTARTING)
     183       19644 :         return true;
     184             :     else
     185         576 :         return false;
     186             : }
     187             : 
     188             : /*
     189             :  * Stop walreceiver (if running) and wait for it to die.
     190             :  * Executed by the Startup process.
     191             :  */
     192             : void
     193        1902 : ShutdownWalRcv(void)
     194             : {
     195        1902 :     WalRcvData *walrcv = WalRcv;
     196        1902 :     pid_t       walrcvpid = 0;
     197        1902 :     bool        stopped = false;
     198             : 
     199             :     /*
     200             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     201             :      * mode once it's finished, and will also request postmaster to not
     202             :      * restart itself.
     203             :      */
     204        1902 :     SpinLockAcquire(&walrcv->mutex);
     205        1902 :     switch (walrcv->walRcvState)
     206             :     {
     207        1826 :         case WALRCV_STOPPED:
     208        1826 :             break;
     209          12 :         case WALRCV_STARTING:
     210          12 :             walrcv->walRcvState = WALRCV_STOPPED;
     211          12 :             stopped = true;
     212          12 :             break;
     213             : 
     214          64 :         case WALRCV_STREAMING:
     215             :         case WALRCV_WAITING:
     216             :         case WALRCV_RESTARTING:
     217          64 :             walrcv->walRcvState = WALRCV_STOPPING;
     218             :             /* fall through */
     219          64 :         case WALRCV_STOPPING:
     220          64 :             walrcvpid = walrcv->pid;
     221          64 :             break;
     222             :     }
     223        1902 :     SpinLockRelease(&walrcv->mutex);
     224             : 
     225             :     /* Unnecessary but consistent. */
     226        1902 :     if (stopped)
     227          12 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     228             : 
     229             :     /*
     230             :      * Signal walreceiver process if it was still running.
     231             :      */
     232        1902 :     if (walrcvpid != 0)
     233          64 :         kill(walrcvpid, SIGTERM);
     234             : 
     235             :     /*
     236             :      * Wait for walreceiver to acknowledge its death by setting state to
     237             :      * WALRCV_STOPPED.
     238             :      */
     239        1902 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     240        1966 :     while (WalRcvRunning())
     241          64 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     242             :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     243        1902 :     ConditionVariableCancelSleep();
     244        1902 : }
     245             : 
     246             : /*
     247             :  * Request postmaster to start walreceiver.
     248             :  *
     249             :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     250             :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     251             :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     252             :  * a temporary slot when no "slotname" is given.
     253             :  *
     254             :  * WAL receivers do not directly load GUC parameters used for the connection
     255             :  * to the primary, and rely on the values passed down by the caller of this
     256             :  * routine instead.  Hence, the addition of any new parameters should happen
     257             :  * through this code path.
     258             :  */
     259             : void
     260         370 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     261             :                      const char *slotname, bool create_temp_slot)
     262             : {
     263         370 :     WalRcvData *walrcv = WalRcv;
     264         370 :     bool        launch = false;
     265         370 :     pg_time_t   now = (pg_time_t) time(NULL);
     266             :     ProcNumber  walrcv_proc;
     267             : 
     268             :     /*
     269             :      * We always start at the beginning of the segment. That prevents a broken
     270             :      * segment (i.e., with no records in the first half of a segment) from
     271             :      * being created by XLOG streaming, which might cause trouble later on if
     272             :      * the segment is e.g archived.
     273             :      */
     274         370 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     275         370 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     276             : 
     277         370 :     SpinLockAcquire(&walrcv->mutex);
     278             : 
     279             :     /* It better be stopped if we try to restart it */
     280             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     281             :            walrcv->walRcvState == WALRCV_WAITING);
     282             : 
     283         370 :     if (conninfo != NULL)
     284         370 :         strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
     285             :     else
     286           0 :         walrcv->conninfo[0] = '\0';
     287             : 
     288             :     /*
     289             :      * Use configured replication slot if present, and ignore the value of
     290             :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     291             :      * create_temp_slot to determine whether this WAL receiver should create a
     292             :      * temporary slot by itself and use it, or not.
     293             :      */
     294         370 :     if (slotname != NULL && slotname[0] != '\0')
     295             :     {
     296         106 :         strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
     297         106 :         walrcv->is_temp_slot = false;
     298             :     }
     299             :     else
     300             :     {
     301         264 :         walrcv->slotname[0] = '\0';
     302         264 :         walrcv->is_temp_slot = create_temp_slot;
     303             :     }
     304             : 
     305         370 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     306             :     {
     307         358 :         launch = true;
     308         358 :         walrcv->walRcvState = WALRCV_STARTING;
     309             :     }
     310             :     else
     311          12 :         walrcv->walRcvState = WALRCV_RESTARTING;
     312         370 :     walrcv->startTime = now;
     313             : 
     314             :     /*
     315             :      * If this is the first startup of walreceiver (on this timeline),
     316             :      * initialize flushedUpto and latestChunkStart to the starting point.
     317             :      */
     318         370 :     if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
     319             :     {
     320         198 :         walrcv->flushedUpto = recptr;
     321         198 :         walrcv->receivedTLI = tli;
     322         198 :         walrcv->latestChunkStart = recptr;
     323             :     }
     324         370 :     walrcv->receiveStart = recptr;
     325         370 :     walrcv->receiveStartTLI = tli;
     326             : 
     327         370 :     walrcv_proc = walrcv->procno;
     328             : 
     329         370 :     SpinLockRelease(&walrcv->mutex);
     330             : 
     331         370 :     if (launch)
     332         358 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     333          12 :     else if (walrcv_proc != INVALID_PROC_NUMBER)
     334          12 :         SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
     335         370 : }
     336             : 
     337             : /*
     338             :  * Returns the last+1 byte position that walreceiver has flushed.
     339             :  *
     340             :  * Optionally, returns the previous chunk start, that is the first byte
     341             :  * written in the most recent walreceiver flush cycle.  Callers not
     342             :  * interested in that value may pass NULL for latestChunkStart. Same for
     343             :  * receiveTLI.
     344             :  */
     345             : XLogRecPtr
     346       17730 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     347             : {
     348       17730 :     WalRcvData *walrcv = WalRcv;
     349             :     XLogRecPtr  recptr;
     350             : 
     351       17730 :     SpinLockAcquire(&walrcv->mutex);
     352       17730 :     recptr = walrcv->flushedUpto;
     353       17730 :     if (latestChunkStart)
     354       15696 :         *latestChunkStart = walrcv->latestChunkStart;
     355       17730 :     if (receiveTLI)
     356       17354 :         *receiveTLI = walrcv->receivedTLI;
     357       17730 :     SpinLockRelease(&walrcv->mutex);
     358             : 
     359       17730 :     return recptr;
     360             : }
     361             : 
     362             : /*
     363             :  * Returns the last+1 byte position that walreceiver has written.
     364             :  * This returns a recently written value without taking a lock.
     365             :  */
     366             : XLogRecPtr
     367           0 : GetWalRcvWriteRecPtr(void)
     368             : {
     369           0 :     WalRcvData *walrcv = WalRcv;
     370             : 
     371           0 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     372             : }
     373             : 
     374             : /*
     375             :  * Returns the replication apply delay in ms or -1
     376             :  * if the apply delay info is not available
     377             :  */
     378             : int
     379         746 : GetReplicationApplyDelay(void)
     380             : {
     381         746 :     WalRcvData *walrcv = WalRcv;
     382             :     XLogRecPtr  receivePtr;
     383             :     XLogRecPtr  replayPtr;
     384             :     TimestampTz chunkReplayStartTime;
     385             : 
     386         746 :     SpinLockAcquire(&walrcv->mutex);
     387         746 :     receivePtr = walrcv->flushedUpto;
     388         746 :     SpinLockRelease(&walrcv->mutex);
     389             : 
     390         746 :     replayPtr = GetXLogReplayRecPtr(NULL);
     391             : 
     392         746 :     if (receivePtr == replayPtr)
     393         124 :         return 0;
     394             : 
     395         622 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     396             : 
     397         622 :     if (chunkReplayStartTime == 0)
     398           2 :         return -1;
     399             : 
     400         620 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     401             :                                            GetCurrentTimestamp());
     402             : }
     403             : 
     404             : /*
     405             :  * Returns the network latency in ms, note that this includes any
     406             :  * difference in clock settings between the servers, as well as timezone.
     407             :  */
     408             : int
     409         746 : GetReplicationTransferLatency(void)
     410             : {
     411         746 :     WalRcvData *walrcv = WalRcv;
     412             :     TimestampTz lastMsgSendTime;
     413             :     TimestampTz lastMsgReceiptTime;
     414             : 
     415         746 :     SpinLockAcquire(&walrcv->mutex);
     416         746 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     417         746 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     418         746 :     SpinLockRelease(&walrcv->mutex);
     419             : 
     420         746 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     421             :                                            lastMsgReceiptTime);
     422             : }

Generated by: LCOV version 1.16