LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 85.1 % 154 131
Test Date: 2026-03-12 06:14:44 Functions: 90.9 % 11 10
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * walreceiverfuncs.c
       4              :  *
       5              :  * This file contains functions used by the startup process to communicate
       6              :  * with the walreceiver process. Functions implementing walreceiver itself
       7              :  * are in walreceiver.c.
       8              :  *
       9              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  *
      12              :  * IDENTIFICATION
      13              :  *    src/backend/replication/walreceiverfuncs.c
      14              :  *
      15              :  *-------------------------------------------------------------------------
      16              :  */
      17              : #include "postgres.h"
      18              : 
      19              : #include <sys/stat.h>
      20              : #include <sys/time.h>
      21              : #include <time.h>
      22              : #include <unistd.h>
      23              : #include <signal.h>
      24              : 
      25              : #include "access/xlog_internal.h"
      26              : #include "access/xlogrecovery.h"
      27              : #include "pgstat.h"
      28              : #include "replication/walreceiver.h"
      29              : #include "storage/pmsignal.h"
      30              : #include "storage/proc.h"
      31              : #include "storage/shmem.h"
      32              : #include "utils/timestamp.h"
      33              : #include "utils/wait_event.h"
      34              : 
      35              : WalRcvData *WalRcv = NULL;
      36              : 
      37              : /*
      38              :  * How long to wait for walreceiver to start up after requesting
      39              :  * postmaster to launch it. In seconds.
      40              :  */
      41              : #define WALRCV_STARTUP_TIMEOUT 10
      42              : 
      43              : /* Report shared memory space needed by WalRcvShmemInit */
      44              : Size
      45         4479 : WalRcvShmemSize(void)
      46              : {
      47         4479 :     Size        size = 0;
      48              : 
      49         4479 :     size = add_size(size, sizeof(WalRcvData));
      50              : 
      51         4479 :     return size;
      52              : }
      53              : 
      54              : /* Allocate and initialize walreceiver-related shared memory */
      55              : void
      56         1158 : WalRcvShmemInit(void)
      57              : {
      58              :     bool        found;
      59              : 
      60         1158 :     WalRcv = (WalRcvData *)
      61         1158 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      62              : 
      63         1158 :     if (!found)
      64              :     {
      65              :         /* First time through, so initialize */
      66         1158 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      67         1158 :         WalRcv->walRcvState = WALRCV_STOPPED;
      68         1158 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      69         1158 :         SpinLockInit(&WalRcv->mutex);
      70         1158 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      71         1158 :         WalRcv->procno = INVALID_PROC_NUMBER;
      72              :     }
      73         1158 : }
      74              : 
      75              : /* Is walreceiver running (or starting up)? */
      76              : bool
      77         1056 : WalRcvRunning(void)
      78              : {
      79         1056 :     WalRcvData *walrcv = WalRcv;
      80              :     WalRcvState state;
      81              :     pg_time_t   startTime;
      82              : 
      83         1056 :     SpinLockAcquire(&walrcv->mutex);
      84              : 
      85         1056 :     state = walrcv->walRcvState;
      86         1056 :     startTime = walrcv->startTime;
      87              : 
      88         1056 :     SpinLockRelease(&walrcv->mutex);
      89              : 
      90              :     /*
      91              :      * If it has taken too long for walreceiver to start up, give up. Setting
      92              :      * the state to STOPPED ensures that if walreceiver later does start up
      93              :      * after all, it will see that it's not supposed to be running and die
      94              :      * without doing anything.
      95              :      */
      96         1056 :     if (state == WALRCV_STARTING)
      97              :     {
      98            1 :         pg_time_t   now = (pg_time_t) time(NULL);
      99              : 
     100            1 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     101              :         {
     102            0 :             bool        stopped = false;
     103              : 
     104            0 :             SpinLockAcquire(&walrcv->mutex);
     105            0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     106              :             {
     107            0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     108            0 :                 stopped = true;
     109              :             }
     110            0 :             SpinLockRelease(&walrcv->mutex);
     111              : 
     112            0 :             if (stopped)
     113            0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     114              :         }
     115              :     }
     116              : 
     117         1056 :     if (state != WALRCV_STOPPED)
     118           42 :         return true;
     119              :     else
     120         1014 :         return false;
     121              : }
     122              : 
     123              : /* Return the state of the walreceiver. */
     124              : WalRcvState
     125            0 : WalRcvGetState(void)
     126              : {
     127            0 :     WalRcvData *walrcv = WalRcv;
     128              :     WalRcvState state;
     129              : 
     130            0 :     SpinLockAcquire(&walrcv->mutex);
     131            0 :     state = walrcv->walRcvState;
     132            0 :     SpinLockRelease(&walrcv->mutex);
     133              : 
     134            0 :     return state;
     135              : }
     136              : 
     137              : /*
     138              :  * Is walreceiver running and streaming (or at least attempting to connect,
     139              :  * or starting up)?
     140              :  */
     141              : bool
     142        25718 : WalRcvStreaming(void)
     143              : {
     144        25718 :     WalRcvData *walrcv = WalRcv;
     145              :     WalRcvState state;
     146              :     pg_time_t   startTime;
     147              : 
     148        25718 :     SpinLockAcquire(&walrcv->mutex);
     149              : 
     150        25718 :     state = walrcv->walRcvState;
     151        25718 :     startTime = walrcv->startTime;
     152              : 
     153        25718 :     SpinLockRelease(&walrcv->mutex);
     154              : 
     155              :     /*
     156              :      * If it has taken too long for walreceiver to start up, give up. Setting
     157              :      * the state to STOPPED ensures that if walreceiver later does start up
     158              :      * after all, it will see that it's not supposed to be running and die
     159              :      * without doing anything.
     160              :      */
     161        25718 :     if (state == WALRCV_STARTING)
     162              :     {
     163          284 :         pg_time_t   now = (pg_time_t) time(NULL);
     164              : 
     165          284 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     166              :         {
     167            0 :             bool        stopped = false;
     168              : 
     169            0 :             SpinLockAcquire(&walrcv->mutex);
     170            0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     171              :             {
     172            0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     173            0 :                 stopped = true;
     174              :             }
     175            0 :             SpinLockRelease(&walrcv->mutex);
     176              : 
     177            0 :             if (stopped)
     178            0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     179              :         }
     180              :     }
     181              : 
     182        25718 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     183          293 :         state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
     184        25430 :         return true;
     185              :     else
     186          288 :         return false;
     187              : }
     188              : 
     189              : /*
     190              :  * Stop walreceiver (if running) and wait for it to die.
     191              :  * Executed by the Startup process.
     192              :  */
     193              : void
     194         1010 : ShutdownWalRcv(void)
     195              : {
     196         1010 :     WalRcvData *walrcv = WalRcv;
     197         1010 :     pid_t       walrcvpid = 0;
     198         1010 :     bool        stopped = false;
     199              : 
     200              :     /*
     201              :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     202              :      * mode once it's finished, and will also request postmaster to not
     203              :      * restart itself.
     204              :      */
     205         1010 :     SpinLockAcquire(&walrcv->mutex);
     206         1010 :     switch (walrcv->walRcvState)
     207              :     {
     208          968 :         case WALRCV_STOPPED:
     209          968 :             break;
     210            4 :         case WALRCV_STARTING:
     211            4 :             walrcv->walRcvState = WALRCV_STOPPED;
     212            4 :             stopped = true;
     213            4 :             break;
     214              : 
     215           38 :         case WALRCV_CONNECTING:
     216              :         case WALRCV_STREAMING:
     217              :         case WALRCV_WAITING:
     218              :         case WALRCV_RESTARTING:
     219           38 :             walrcv->walRcvState = WALRCV_STOPPING;
     220              :             pg_fallthrough;
     221           38 :         case WALRCV_STOPPING:
     222           38 :             walrcvpid = walrcv->pid;
     223           38 :             break;
     224              :     }
     225         1010 :     SpinLockRelease(&walrcv->mutex);
     226              : 
     227              :     /* Unnecessary but consistent. */
     228         1010 :     if (stopped)
     229            4 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     230              : 
     231              :     /*
     232              :      * Signal walreceiver process if it was still running.
     233              :      */
     234         1010 :     if (walrcvpid != 0)
     235           38 :         kill(walrcvpid, SIGTERM);
     236              : 
     237              :     /*
     238              :      * Wait for walreceiver to acknowledge its death by setting state to
     239              :      * WALRCV_STOPPED.
     240              :      */
     241         1010 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     242         1045 :     while (WalRcvRunning())
     243           35 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     244              :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     245         1010 :     ConditionVariableCancelSleep();
     246         1010 : }
     247              : 
     248              : /*
     249              :  * Request postmaster to start walreceiver.
     250              :  *
     251              :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     252              :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     253              :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     254              :  * a temporary slot when no "slotname" is given.
     255              :  *
     256              :  * WAL receivers do not directly load GUC parameters used for the connection
     257              :  * to the primary, and rely on the values passed down by the caller of this
     258              :  * routine instead.  Hence, the addition of any new parameters should happen
     259              :  * through this code path.
     260              :  */
     261              : void
     262          190 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     263              :                      const char *slotname, bool create_temp_slot)
     264              : {
     265          190 :     WalRcvData *walrcv = WalRcv;
     266          190 :     bool        launch = false;
     267          190 :     pg_time_t   now = (pg_time_t) time(NULL);
     268              :     ProcNumber  walrcv_proc;
     269              : 
     270              :     /*
     271              :      * We always start at the beginning of the segment. That prevents a broken
     272              :      * segment (i.e., with no records in the first half of a segment) from
     273              :      * being created by XLOG streaming, which might cause trouble later on if
     274              :      * the segment is e.g archived.
     275              :      */
     276          190 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     277          190 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     278              : 
     279          190 :     SpinLockAcquire(&walrcv->mutex);
     280              : 
     281              :     /* It better be stopped if we try to restart it */
     282              :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     283              :            walrcv->walRcvState == WALRCV_WAITING);
     284              : 
     285          190 :     if (conninfo != NULL)
     286          190 :         strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
     287              :     else
     288            0 :         walrcv->conninfo[0] = '\0';
     289              : 
     290              :     /*
     291              :      * Use configured replication slot if present, and ignore the value of
     292              :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     293              :      * create_temp_slot to determine whether this WAL receiver should create a
     294              :      * temporary slot by itself and use it, or not.
     295              :      */
     296          190 :     if (slotname != NULL && slotname[0] != '\0')
     297              :     {
     298           55 :         strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
     299           55 :         walrcv->is_temp_slot = false;
     300              :     }
     301              :     else
     302              :     {
     303          135 :         walrcv->slotname[0] = '\0';
     304          135 :         walrcv->is_temp_slot = create_temp_slot;
     305              :     }
     306              : 
     307          190 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     308              :     {
     309          183 :         launch = true;
     310          183 :         walrcv->walRcvState = WALRCV_STARTING;
     311              :     }
     312              :     else
     313            7 :         walrcv->walRcvState = WALRCV_RESTARTING;
     314          190 :     walrcv->startTime = now;
     315              : 
     316              :     /*
     317              :      * If this is the first startup of walreceiver (on this timeline),
     318              :      * initialize flushedUpto and latestChunkStart to the starting point.
     319              :      */
     320          190 :     if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
     321              :     {
     322          107 :         walrcv->flushedUpto = recptr;
     323          107 :         walrcv->receivedTLI = tli;
     324          107 :         walrcv->latestChunkStart = recptr;
     325              :     }
     326          190 :     walrcv->receiveStart = recptr;
     327          190 :     walrcv->receiveStartTLI = tli;
     328              : 
     329          190 :     walrcv_proc = walrcv->procno;
     330              : 
     331          190 :     SpinLockRelease(&walrcv->mutex);
     332              : 
     333          190 :     if (launch)
     334          183 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     335            7 :     else if (walrcv_proc != INVALID_PROC_NUMBER)
     336            7 :         SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
     337          190 : }
     338              : 
     339              : /*
     340              :  * Returns the last+1 byte position that walreceiver has flushed.
     341              :  *
     342              :  * Optionally, returns the previous chunk start, that is the first byte
     343              :  * written in the most recent walreceiver flush cycle.  Callers not
     344              :  * interested in that value may pass NULL for latestChunkStart. Same for
     345              :  * receiveTLI.
     346              :  */
     347              : XLogRecPtr
     348        24905 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     349              : {
     350        24905 :     WalRcvData *walrcv = WalRcv;
     351              :     XLogRecPtr  recptr;
     352              : 
     353        24905 :     SpinLockAcquire(&walrcv->mutex);
     354        24905 :     recptr = walrcv->flushedUpto;
     355        24905 :     if (latestChunkStart)
     356        23684 :         *latestChunkStart = walrcv->latestChunkStart;
     357        24905 :     if (receiveTLI)
     358        24672 :         *receiveTLI = walrcv->receivedTLI;
     359        24905 :     SpinLockRelease(&walrcv->mutex);
     360              : 
     361        24905 :     return recptr;
     362              : }
     363              : 
     364              : /*
     365              :  * Returns the last+1 byte position that walreceiver has written.
     366              :  * This returns a recently written value without taking a lock.
     367              :  */
     368              : XLogRecPtr
     369           26 : GetWalRcvWriteRecPtr(void)
     370              : {
     371           26 :     WalRcvData *walrcv = WalRcv;
     372              : 
     373           26 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     374              : }
     375              : 
     376              : /*
     377              :  * Returns the replication apply delay in ms or -1
     378              :  * if the apply delay info is not available
     379              :  */
     380              : int
     381          402 : GetReplicationApplyDelay(void)
     382              : {
     383          402 :     WalRcvData *walrcv = WalRcv;
     384              :     XLogRecPtr  receivePtr;
     385              :     XLogRecPtr  replayPtr;
     386              :     TimestampTz chunkReplayStartTime;
     387              : 
     388          402 :     SpinLockAcquire(&walrcv->mutex);
     389          402 :     receivePtr = walrcv->flushedUpto;
     390          402 :     SpinLockRelease(&walrcv->mutex);
     391              : 
     392          402 :     replayPtr = GetXLogReplayRecPtr(NULL);
     393              : 
     394          402 :     if (receivePtr == replayPtr)
     395          127 :         return 0;
     396              : 
     397          275 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     398              : 
     399          275 :     if (chunkReplayStartTime == 0)
     400           17 :         return -1;
     401              : 
     402          258 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     403              :                                            GetCurrentTimestamp());
     404              : }
     405              : 
     406              : /*
     407              :  * Returns the network latency in ms, note that this includes any
     408              :  * difference in clock settings between the servers, as well as timezone.
     409              :  */
     410              : int
     411          402 : GetReplicationTransferLatency(void)
     412              : {
     413          402 :     WalRcvData *walrcv = WalRcv;
     414              :     TimestampTz lastMsgSendTime;
     415              :     TimestampTz lastMsgReceiptTime;
     416              : 
     417          402 :     SpinLockAcquire(&walrcv->mutex);
     418          402 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     419          402 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     420          402 :     SpinLockRelease(&walrcv->mutex);
     421              : 
     422          402 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     423              :                                            lastMsgReceiptTime);
     424              : }
        

Generated by: LCOV version 2.0-1