LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 129 154 83.8 %
Date: 2026-01-24 20:17:27 Functions: 10 11 90.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "access/xlogrecovery.h"
      27             : #include "pgstat.h"
      28             : #include "replication/walreceiver.h"
      29             : #include "storage/pmsignal.h"
      30             : #include "storage/proc.h"
      31             : #include "storage/shmem.h"
      32             : #include "utils/timestamp.h"
      33             : 
      34             : WalRcvData *WalRcv = NULL;
      35             : 
      36             : /*
      37             :  * How long to wait for walreceiver to start up after requesting
      38             :  * postmaster to launch it. In seconds.
      39             :  */
      40             : #define WALRCV_STARTUP_TIMEOUT 10
      41             : 
      42             : /* Report shared memory space needed by WalRcvShmemInit */
      43             : Size
      44        8806 : WalRcvShmemSize(void)
      45             : {
      46        8806 :     Size        size = 0;
      47             : 
      48        8806 :     size = add_size(size, sizeof(WalRcvData));
      49             : 
      50        8806 :     return size;
      51             : }
      52             : 
      53             : /* Allocate and initialize walreceiver-related shared memory */
      54             : void
      55        2278 : WalRcvShmemInit(void)
      56             : {
      57             :     bool        found;
      58             : 
      59        2278 :     WalRcv = (WalRcvData *)
      60        2278 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      61             : 
      62        2278 :     if (!found)
      63             :     {
      64             :         /* First time through, so initialize */
      65        2278 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      66        2278 :         WalRcv->walRcvState = WALRCV_STOPPED;
      67        2278 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      68        2278 :         SpinLockInit(&WalRcv->mutex);
      69        2278 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      70        2278 :         WalRcv->procno = INVALID_PROC_NUMBER;
      71             :     }
      72        2278 : }
      73             : 
      74             : /* Is walreceiver running (or starting up)? */
      75             : bool
      76        2090 : WalRcvRunning(void)
      77             : {
      78        2090 :     WalRcvData *walrcv = WalRcv;
      79             :     WalRcvState state;
      80             :     pg_time_t   startTime;
      81             : 
      82        2090 :     SpinLockAcquire(&walrcv->mutex);
      83             : 
      84        2090 :     state = walrcv->walRcvState;
      85        2090 :     startTime = walrcv->startTime;
      86             : 
      87        2090 :     SpinLockRelease(&walrcv->mutex);
      88             : 
      89             :     /*
      90             :      * If it has taken too long for walreceiver to start up, give up. Setting
      91             :      * the state to STOPPED ensures that if walreceiver later does start up
      92             :      * after all, it will see that it's not supposed to be running and die
      93             :      * without doing anything.
      94             :      */
      95        2090 :     if (state == WALRCV_STARTING)
      96             :     {
      97           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      98             : 
      99           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     100             :         {
     101           0 :             bool        stopped = false;
     102             : 
     103           0 :             SpinLockAcquire(&walrcv->mutex);
     104           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     105             :             {
     106           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     107           0 :                 stopped = true;
     108             :             }
     109           0 :             SpinLockRelease(&walrcv->mutex);
     110             : 
     111           0 :             if (stopped)
     112           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     113             :         }
     114             :     }
     115             : 
     116        2090 :     if (state != WALRCV_STOPPED)
     117          90 :         return true;
     118             :     else
     119        2000 :         return false;
     120             : }
     121             : 
     122             : /* Return the state of the walreceiver. */
     123             : WalRcvState
     124           0 : WalRcvGetState(void)
     125             : {
     126           0 :     WalRcvData *walrcv = WalRcv;
     127             :     WalRcvState state;
     128             : 
     129           0 :     SpinLockAcquire(&walrcv->mutex);
     130           0 :     state = walrcv->walRcvState;
     131           0 :     SpinLockRelease(&walrcv->mutex);
     132             : 
     133           0 :     return state;
     134             : }
     135             : 
     136             : /*
     137             :  * Is walreceiver running and streaming (or at least attempting to connect,
     138             :  * or starting up)?
     139             :  */
     140             : bool
     141       20730 : WalRcvStreaming(void)
     142             : {
     143       20730 :     WalRcvData *walrcv = WalRcv;
     144             :     WalRcvState state;
     145             :     pg_time_t   startTime;
     146             : 
     147       20730 :     SpinLockAcquire(&walrcv->mutex);
     148             : 
     149       20730 :     state = walrcv->walRcvState;
     150       20730 :     startTime = walrcv->startTime;
     151             : 
     152       20730 :     SpinLockRelease(&walrcv->mutex);
     153             : 
     154             :     /*
     155             :      * If it has taken too long for walreceiver to start up, give up. Setting
     156             :      * the state to STOPPED ensures that if walreceiver later does start up
     157             :      * after all, it will see that it's not supposed to be running and die
     158             :      * without doing anything.
     159             :      */
     160       20730 :     if (state == WALRCV_STARTING)
     161             :     {
     162         600 :         pg_time_t   now = (pg_time_t) time(NULL);
     163             : 
     164         600 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     165             :         {
     166           0 :             bool        stopped = false;
     167             : 
     168           0 :             SpinLockAcquire(&walrcv->mutex);
     169           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     170             :             {
     171           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     172           0 :                 stopped = true;
     173             :             }
     174           0 :             SpinLockRelease(&walrcv->mutex);
     175             : 
     176           0 :             if (stopped)
     177           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     178             :         }
     179             :     }
     180             : 
     181       20730 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     182         658 :         state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
     183       20086 :         return true;
     184             :     else
     185         644 :         return false;
     186             : }
     187             : 
     188             : /*
     189             :  * Stop walreceiver (if running) and wait for it to die.
     190             :  * Executed by the Startup process.
     191             :  */
     192             : void
     193        1992 : ShutdownWalRcv(void)
     194             : {
     195        1992 :     WalRcvData *walrcv = WalRcv;
     196        1992 :     pid_t       walrcvpid = 0;
     197        1992 :     bool        stopped = false;
     198             : 
     199             :     /*
     200             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     201             :      * mode once it's finished, and will also request postmaster to not
     202             :      * restart itself.
     203             :      */
     204        1992 :     SpinLockAcquire(&walrcv->mutex);
     205        1992 :     switch (walrcv->walRcvState)
     206             :     {
     207        1902 :         case WALRCV_STOPPED:
     208        1902 :             break;
     209          12 :         case WALRCV_STARTING:
     210          12 :             walrcv->walRcvState = WALRCV_STOPPED;
     211          12 :             stopped = true;
     212          12 :             break;
     213             : 
     214          78 :         case WALRCV_CONNECTING:
     215             :         case WALRCV_STREAMING:
     216             :         case WALRCV_WAITING:
     217             :         case WALRCV_RESTARTING:
     218          78 :             walrcv->walRcvState = WALRCV_STOPPING;
     219             :             /* fall through */
     220          78 :         case WALRCV_STOPPING:
     221          78 :             walrcvpid = walrcv->pid;
     222          78 :             break;
     223             :     }
     224        1992 :     SpinLockRelease(&walrcv->mutex);
     225             : 
     226             :     /* Unnecessary but consistent. */
     227        1992 :     if (stopped)
     228          12 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     229             : 
     230             :     /*
     231             :      * Signal walreceiver process if it was still running.
     232             :      */
     233        1992 :     if (walrcvpid != 0)
     234          78 :         kill(walrcvpid, SIGTERM);
     235             : 
     236             :     /*
     237             :      * Wait for walreceiver to acknowledge its death by setting state to
     238             :      * WALRCV_STOPPED.
     239             :      */
     240        1992 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     241        2068 :     while (WalRcvRunning())
     242          76 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     243             :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     244        1992 :     ConditionVariableCancelSleep();
     245        1992 : }
     246             : 
     247             : /*
     248             :  * Request postmaster to start walreceiver.
     249             :  *
     250             :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     251             :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     252             :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     253             :  * a temporary slot when no "slotname" is given.
     254             :  *
     255             :  * WAL receivers do not directly load GUC parameters used for the connection
     256             :  * to the primary, and rely on the values passed down by the caller of this
     257             :  * routine instead.  Hence, the addition of any new parameters should happen
     258             :  * through this code path.
     259             :  */
     260             : void
     261         394 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     262             :                      const char *slotname, bool create_temp_slot)
     263             : {
     264         394 :     WalRcvData *walrcv = WalRcv;
     265         394 :     bool        launch = false;
     266         394 :     pg_time_t   now = (pg_time_t) time(NULL);
     267             :     ProcNumber  walrcv_proc;
     268             : 
     269             :     /*
     270             :      * We always start at the beginning of the segment. That prevents a broken
     271             :      * segment (i.e., with no records in the first half of a segment) from
     272             :      * being created by XLOG streaming, which might cause trouble later on if
     273             :      * the segment is e.g archived.
     274             :      */
     275         394 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     276         394 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     277             : 
     278         394 :     SpinLockAcquire(&walrcv->mutex);
     279             : 
     280             :     /* It better be stopped if we try to restart it */
     281             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     282             :            walrcv->walRcvState == WALRCV_WAITING);
     283             : 
     284         394 :     if (conninfo != NULL)
     285         394 :         strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
     286             :     else
     287           0 :         walrcv->conninfo[0] = '\0';
     288             : 
     289             :     /*
     290             :      * Use configured replication slot if present, and ignore the value of
     291             :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     292             :      * create_temp_slot to determine whether this WAL receiver should create a
     293             :      * temporary slot by itself and use it, or not.
     294             :      */
     295         394 :     if (slotname != NULL && slotname[0] != '\0')
     296             :     {
     297         112 :         strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
     298         112 :         walrcv->is_temp_slot = false;
     299             :     }
     300             :     else
     301             :     {
     302         282 :         walrcv->slotname[0] = '\0';
     303         282 :         walrcv->is_temp_slot = create_temp_slot;
     304             :     }
     305             : 
     306         394 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     307             :     {
     308         380 :         launch = true;
     309         380 :         walrcv->walRcvState = WALRCV_STARTING;
     310             :     }
     311             :     else
     312          14 :         walrcv->walRcvState = WALRCV_RESTARTING;
     313         394 :     walrcv->startTime = now;
     314             : 
     315             :     /*
     316             :      * If this is the first startup of walreceiver (on this timeline),
     317             :      * initialize flushedUpto and latestChunkStart to the starting point.
     318             :      */
     319         394 :     if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
     320             :     {
     321         214 :         walrcv->flushedUpto = recptr;
     322         214 :         walrcv->receivedTLI = tli;
     323         214 :         walrcv->latestChunkStart = recptr;
     324             :     }
     325         394 :     walrcv->receiveStart = recptr;
     326         394 :     walrcv->receiveStartTLI = tli;
     327             : 
     328         394 :     walrcv_proc = walrcv->procno;
     329             : 
     330         394 :     SpinLockRelease(&walrcv->mutex);
     331             : 
     332         394 :     if (launch)
     333         380 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     334          14 :     else if (walrcv_proc != INVALID_PROC_NUMBER)
     335          14 :         SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
     336         394 : }
     337             : 
     338             : /*
     339             :  * Returns the last+1 byte position that walreceiver has flushed.
     340             :  *
     341             :  * Optionally, returns the previous chunk start, that is the first byte
     342             :  * written in the most recent walreceiver flush cycle.  Callers not
     343             :  * interested in that value may pass NULL for latestChunkStart. Same for
     344             :  * receiveTLI.
     345             :  */
     346             : XLogRecPtr
     347       18560 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     348             : {
     349       18560 :     WalRcvData *walrcv = WalRcv;
     350             :     XLogRecPtr  recptr;
     351             : 
     352       18560 :     SpinLockAcquire(&walrcv->mutex);
     353       18560 :     recptr = walrcv->flushedUpto;
     354       18560 :     if (latestChunkStart)
     355       16194 :         *latestChunkStart = walrcv->latestChunkStart;
     356       18560 :     if (receiveTLI)
     357       18112 :         *receiveTLI = walrcv->receivedTLI;
     358       18560 :     SpinLockRelease(&walrcv->mutex);
     359             : 
     360       18560 :     return recptr;
     361             : }
     362             : 
     363             : /*
     364             :  * Returns the last+1 byte position that walreceiver has written.
     365             :  * This returns a recently written value without taking a lock.
     366             :  */
     367             : XLogRecPtr
     368          52 : GetWalRcvWriteRecPtr(void)
     369             : {
     370          52 :     WalRcvData *walrcv = WalRcv;
     371             : 
     372          52 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     373             : }
     374             : 
     375             : /*
     376             :  * Returns the replication apply delay in ms or -1
     377             :  * if the apply delay info is not available
     378             :  */
     379             : int
     380         766 : GetReplicationApplyDelay(void)
     381             : {
     382         766 :     WalRcvData *walrcv = WalRcv;
     383             :     XLogRecPtr  receivePtr;
     384             :     XLogRecPtr  replayPtr;
     385             :     TimestampTz chunkReplayStartTime;
     386             : 
     387         766 :     SpinLockAcquire(&walrcv->mutex);
     388         766 :     receivePtr = walrcv->flushedUpto;
     389         766 :     SpinLockRelease(&walrcv->mutex);
     390             : 
     391         766 :     replayPtr = GetXLogReplayRecPtr(NULL);
     392             : 
     393         766 :     if (receivePtr == replayPtr)
     394         156 :         return 0;
     395             : 
     396         610 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     397             : 
     398         610 :     if (chunkReplayStartTime == 0)
     399         198 :         return -1;
     400             : 
     401         412 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     402             :                                            GetCurrentTimestamp());
     403             : }
     404             : 
     405             : /*
     406             :  * Returns the network latency in ms, note that this includes any
     407             :  * difference in clock settings between the servers, as well as timezone.
     408             :  */
     409             : int
     410         766 : GetReplicationTransferLatency(void)
     411             : {
     412         766 :     WalRcvData *walrcv = WalRcv;
     413             :     TimestampTz lastMsgSendTime;
     414             :     TimestampTz lastMsgReceiptTime;
     415             : 
     416         766 :     SpinLockAcquire(&walrcv->mutex);
     417         766 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     418         766 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     419         766 :     SpinLockRelease(&walrcv->mutex);
     420             : 
     421         766 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     422             :                                            lastMsgReceiptTime);
     423             : }

Generated by: LCOV version 1.16