LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.4 % 151 126
Test Date: 2026-05-03 21:16:32 Functions: 90.9 % 11 10
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * walreceiverfuncs.c
       4              :  *
       5              :  * This file contains functions used by the startup process to communicate
       6              :  * with the walreceiver process. Functions implementing walreceiver itself
       7              :  * are in walreceiver.c.
       8              :  *
       9              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      10              :  *
      11              :  *
      12              :  * IDENTIFICATION
      13              :  *    src/backend/replication/walreceiverfuncs.c
      14              :  *
      15              :  *-------------------------------------------------------------------------
      16              :  */
      17              : #include "postgres.h"
      18              : 
      19              : #include <sys/stat.h>
      20              : #include <sys/time.h>
      21              : #include <time.h>
      22              : #include <unistd.h>
      23              : #include <signal.h>
      24              : 
      25              : #include "access/xlog_internal.h"
      26              : #include "access/xlogrecovery.h"
      27              : #include "pgstat.h"
      28              : #include "replication/walreceiver.h"
      29              : #include "storage/pmsignal.h"
      30              : #include "storage/proc.h"
      31              : #include "storage/shmem.h"
      32              : #include "storage/subsystems.h"
      33              : #include "utils/timestamp.h"
      34              : #include "utils/wait_event.h"
      35              : 
      36              : WalRcvData *WalRcv = NULL;
      37              : 
      38              : static void WalRcvShmemRequest(void *arg);
      39              : static void WalRcvShmemInit(void *arg);
      40              : 
      41              : const ShmemCallbacks WalRcvShmemCallbacks = {
      42              :     .request_fn = WalRcvShmemRequest,
      43              :     .init_fn = WalRcvShmemInit,
      44              : };
      45              : 
      46              : /*
      47              :  * How long to wait for walreceiver to start up after requesting
      48              :  * postmaster to launch it. In seconds.
      49              :  */
      50              : #define WALRCV_STARTUP_TIMEOUT 10
      51              : 
      52              : /* Register shared memory space needed by walreceiver */
      53              : static void
      54         1248 : WalRcvShmemRequest(void *arg)
      55              : {
      56         1248 :     ShmemRequestStruct(.name = "Wal Receiver Ctl",
      57              :                        .size = sizeof(WalRcvData),
      58              :                        .ptr = (void **) &WalRcv,
      59              :         );
      60         1248 : }
      61              : 
      62              : /* Initialize walreceiver-related shared memory */
      63              : static void
      64         1245 : WalRcvShmemInit(void *arg)
      65              : {
      66         1245 :     MemSet(WalRcv, 0, sizeof(WalRcvData));
      67         1245 :     WalRcv->walRcvState = WALRCV_STOPPED;
      68         1245 :     ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      69         1245 :     SpinLockInit(&WalRcv->mutex);
      70         1245 :     pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      71         1245 :     WalRcv->procno = INVALID_PROC_NUMBER;
      72         1245 : }
      73              : 
      74              : /* Is walreceiver running (or starting up)? */
      75              : bool
      76         1128 : WalRcvRunning(void)
      77              : {
      78         1128 :     WalRcvData *walrcv = WalRcv;
      79              :     WalRcvState state;
      80              :     pg_time_t   startTime;
      81              : 
      82         1128 :     SpinLockAcquire(&walrcv->mutex);
      83              : 
      84         1128 :     state = walrcv->walRcvState;
      85         1128 :     startTime = walrcv->startTime;
      86              : 
      87         1128 :     SpinLockRelease(&walrcv->mutex);
      88              : 
      89              :     /*
      90              :      * If it has taken too long for walreceiver to start up, give up. Setting
      91              :      * the state to STOPPED ensures that if walreceiver later does start up
      92              :      * after all, it will see that it's not supposed to be running and die
      93              :      * without doing anything.
      94              :      */
      95         1128 :     if (state == WALRCV_STARTING)
      96              :     {
      97            0 :         pg_time_t   now = (pg_time_t) time(NULL);
      98              : 
      99            0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     100              :         {
     101            0 :             bool        stopped = false;
     102              : 
     103            0 :             SpinLockAcquire(&walrcv->mutex);
     104            0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     105              :             {
     106            0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     107            0 :                 stopped = true;
     108              :             }
     109            0 :             SpinLockRelease(&walrcv->mutex);
     110              : 
     111            0 :             if (stopped)
     112            0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     113              :         }
     114              :     }
     115              : 
     116         1128 :     if (state != WALRCV_STOPPED)
     117           43 :         return true;
     118              :     else
     119         1085 :         return false;
     120              : }
     121              : 
     122              : /* Return the state of the walreceiver. */
     123              : WalRcvState
     124            0 : WalRcvGetState(void)
     125              : {
     126            0 :     WalRcvData *walrcv = WalRcv;
     127              :     WalRcvState state;
     128              : 
     129            0 :     SpinLockAcquire(&walrcv->mutex);
     130            0 :     state = walrcv->walRcvState;
     131            0 :     SpinLockRelease(&walrcv->mutex);
     132              : 
     133            0 :     return state;
     134              : }
     135              : 
     136              : /*
     137              :  * Is walreceiver running and streaming (or at least attempting to connect,
     138              :  * or starting up)?
     139              :  */
     140              : bool
     141        35217 : WalRcvStreaming(void)
     142              : {
     143        35217 :     WalRcvData *walrcv = WalRcv;
     144              :     WalRcvState state;
     145              :     pg_time_t   startTime;
     146              : 
     147        35217 :     SpinLockAcquire(&walrcv->mutex);
     148              : 
     149        35217 :     state = walrcv->walRcvState;
     150        35217 :     startTime = walrcv->startTime;
     151              : 
     152        35217 :     SpinLockRelease(&walrcv->mutex);
     153              : 
     154              :     /*
     155              :      * If it has taken too long for walreceiver to start up, give up. Setting
     156              :      * the state to STOPPED ensures that if walreceiver later does start up
     157              :      * after all, it will see that it's not supposed to be running and die
     158              :      * without doing anything.
     159              :      */
     160        35217 :     if (state == WALRCV_STARTING)
     161              :     {
     162          316 :         pg_time_t   now = (pg_time_t) time(NULL);
     163              : 
     164          316 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     165              :         {
     166            0 :             bool        stopped = false;
     167              : 
     168            0 :             SpinLockAcquire(&walrcv->mutex);
     169            0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     170              :             {
     171            0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     172            0 :                 stopped = true;
     173              :             }
     174            0 :             SpinLockRelease(&walrcv->mutex);
     175              : 
     176            0 :             if (stopped)
     177            0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     178              :         }
     179              :     }
     180              : 
     181        35217 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     182          345 :         state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
     183        34881 :         return true;
     184              :     else
     185          336 :         return false;
     186              : }
     187              : 
     188              : /*
     189              :  * Stop walreceiver (if running) and wait for it to die.
     190              :  * Executed by the Startup process.
     191              :  */
     192              : void
     193         1080 : ShutdownWalRcv(void)
     194              : {
     195         1080 :     WalRcvData *walrcv = WalRcv;
     196         1080 :     pid_t       walrcvpid = 0;
     197         1080 :     bool        stopped = false;
     198              : 
     199              :     /*
     200              :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     201              :      * mode once it's finished, and will also request postmaster to not
     202              :      * restart itself.
     203              :      */
     204         1080 :     SpinLockAcquire(&walrcv->mutex);
     205         1080 :     switch (walrcv->walRcvState)
     206              :     {
     207         1037 :         case WALRCV_STOPPED:
     208         1037 :             break;
     209            2 :         case WALRCV_STARTING:
     210            2 :             walrcv->walRcvState = WALRCV_STOPPED;
     211            2 :             stopped = true;
     212            2 :             break;
     213              : 
     214           41 :         case WALRCV_CONNECTING:
     215              :         case WALRCV_STREAMING:
     216              :         case WALRCV_WAITING:
     217              :         case WALRCV_RESTARTING:
     218           41 :             walrcv->walRcvState = WALRCV_STOPPING;
     219              :             pg_fallthrough;
     220           41 :         case WALRCV_STOPPING:
     221           41 :             walrcvpid = walrcv->pid;
     222           41 :             break;
     223              :     }
     224         1080 :     SpinLockRelease(&walrcv->mutex);
     225              : 
     226              :     /* Unnecessary but consistent. */
     227         1080 :     if (stopped)
     228            2 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     229              : 
     230              :     /*
     231              :      * Signal walreceiver process if it was still running.
     232              :      */
     233         1080 :     if (walrcvpid != 0)
     234           41 :         kill(walrcvpid, SIGTERM);
     235              : 
     236              :     /*
     237              :      * Wait for walreceiver to acknowledge its death by setting state to
     238              :      * WALRCV_STOPPED.
     239              :      */
     240         1080 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     241         1115 :     while (WalRcvRunning())
     242           35 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     243              :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     244         1080 :     ConditionVariableCancelSleep();
     245         1080 : }
     246              : 
     247              : /*
     248              :  * Request postmaster to start walreceiver.
     249              :  *
     250              :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     251              :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     252              :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     253              :  * a temporary slot when no "slotname" is given.
     254              :  *
     255              :  * WAL receivers do not directly load GUC parameters used for the connection
     256              :  * to the primary, and rely on the values passed down by the caller of this
     257              :  * routine instead.  Hence, the addition of any new parameters should happen
     258              :  * through this code path.
     259              :  */
     260              : void
     261          203 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     262              :                      const char *slotname, bool create_temp_slot)
     263              : {
     264          203 :     WalRcvData *walrcv = WalRcv;
     265          203 :     bool        launch = false;
     266          203 :     pg_time_t   now = (pg_time_t) time(NULL);
     267              :     ProcNumber  walrcv_proc;
     268              : 
     269              :     /*
     270              :      * We always start at the beginning of the segment. That prevents a broken
     271              :      * segment (i.e., with no records in the first half of a segment) from
     272              :      * being created by XLOG streaming, which might cause trouble later on if
     273              :      * the segment is e.g archived.
     274              :      */
     275          203 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     276          203 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     277              : 
     278          203 :     SpinLockAcquire(&walrcv->mutex);
     279              : 
     280              :     /* It better be stopped if we try to restart it */
     281              :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     282              :            walrcv->walRcvState == WALRCV_WAITING);
     283              : 
     284          203 :     if (conninfo != NULL)
     285          203 :         strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
     286              :     else
     287            0 :         walrcv->conninfo[0] = '\0';
     288              : 
     289              :     /*
     290              :      * Use configured replication slot if present, and ignore the value of
     291              :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     292              :      * create_temp_slot to determine whether this WAL receiver should create a
     293              :      * temporary slot by itself and use it, or not.
     294              :      */
     295          203 :     if (slotname != NULL && slotname[0] != '\0')
     296              :     {
     297           54 :         strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
     298           54 :         walrcv->is_temp_slot = false;
     299              :     }
     300              :     else
     301              :     {
     302          149 :         walrcv->slotname[0] = '\0';
     303          149 :         walrcv->is_temp_slot = create_temp_slot;
     304              :     }
     305              : 
     306          203 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     307              :     {
     308          195 :         launch = true;
     309          195 :         walrcv->walRcvState = WALRCV_STARTING;
     310              :     }
     311              :     else
     312            8 :         walrcv->walRcvState = WALRCV_RESTARTING;
     313          203 :     walrcv->startTime = now;
     314              : 
     315              :     /*
     316              :      * If this is the first startup of walreceiver (on this timeline),
     317              :      * initialize flushedUpto and latestChunkStart to the starting point.
     318              :      */
     319          203 :     if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
     320              :     {
     321          114 :         walrcv->flushedUpto = recptr;
     322          114 :         walrcv->receivedTLI = tli;
     323          114 :         walrcv->latestChunkStart = recptr;
     324              : 
     325              :         /*
     326              :          * Pairs with pg_atomic_read_membarrier_u64() in
     327              :          * GetWalRcvWriteRecPtr().
     328              :          */
     329          114 :         pg_atomic_write_membarrier_u64(&walrcv->writtenUpto, recptr);
     330              :     }
     331          203 :     walrcv->receiveStart = recptr;
     332          203 :     walrcv->receiveStartTLI = tli;
     333              : 
     334          203 :     walrcv_proc = walrcv->procno;
     335              : 
     336          203 :     SpinLockRelease(&walrcv->mutex);
     337              : 
     338          203 :     if (launch)
     339          195 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     340            8 :     else if (walrcv_proc != INVALID_PROC_NUMBER)
     341            8 :         SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
     342          203 : }
     343              : 
     344              : /*
     345              :  * Returns the last+1 byte position that walreceiver has flushed.
     346              :  *
     347              :  * Optionally, returns the previous chunk start, that is the first byte
     348              :  * written in the most recent walreceiver flush cycle.  Callers not
     349              :  * interested in that value may pass NULL for latestChunkStart. Same for
     350              :  * receiveTLI.
     351              :  */
     352              : XLogRecPtr
     353        34391 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     354              : {
     355        34391 :     WalRcvData *walrcv = WalRcv;
     356              :     XLogRecPtr  recptr;
     357              : 
     358        34391 :     SpinLockAcquire(&walrcv->mutex);
     359        34391 :     recptr = walrcv->flushedUpto;
     360        34391 :     if (latestChunkStart)
     361        33017 :         *latestChunkStart = walrcv->latestChunkStart;
     362        34391 :     if (receiveTLI)
     363        34137 :         *receiveTLI = walrcv->receivedTLI;
     364        34391 :     SpinLockRelease(&walrcv->mutex);
     365              : 
     366        34391 :     return recptr;
     367              : }
     368              : 
     369              : /*
     370              :  * Returns the last+1 byte position that walreceiver has written.
     371              :  *
     372              :  * Use pg_atomic_read_membarrier_u64() to ensure that callers see up-to-date
     373              :  * shared memory state, matching the barrier semantics provided by the
     374              :  * spinlock in GetWalRcvFlushRecPtr() and other LSN-position functions.
     375              :  */
     376              : XLogRecPtr
     377           47 : GetWalRcvWriteRecPtr(void)
     378              : {
     379           47 :     WalRcvData *walrcv = WalRcv;
     380              : 
     381           47 :     return pg_atomic_read_membarrier_u64(&walrcv->writtenUpto);
     382              : }
     383              : 
     384              : /*
     385              :  * Returns the replication apply delay in ms or -1
     386              :  * if the apply delay info is not available
     387              :  */
     388              : int
     389          405 : GetReplicationApplyDelay(void)
     390              : {
     391          405 :     WalRcvData *walrcv = WalRcv;
     392              :     XLogRecPtr  receivePtr;
     393              :     XLogRecPtr  replayPtr;
     394              :     TimestampTz chunkReplayStartTime;
     395              : 
     396          405 :     SpinLockAcquire(&walrcv->mutex);
     397          405 :     receivePtr = walrcv->flushedUpto;
     398          405 :     SpinLockRelease(&walrcv->mutex);
     399              : 
     400          405 :     replayPtr = GetXLogReplayRecPtr(NULL);
     401              : 
     402          405 :     if (receivePtr == replayPtr)
     403          148 :         return 0;
     404              : 
     405          257 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     406              : 
     407          257 :     if (chunkReplayStartTime == 0)
     408            1 :         return -1;
     409              : 
     410          256 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     411              :                                            GetCurrentTimestamp());
     412              : }
     413              : 
     414              : /*
     415              :  * Returns the network latency in ms, note that this includes any
     416              :  * difference in clock settings between the servers, as well as timezone.
     417              :  */
     418              : int
     419          405 : GetReplicationTransferLatency(void)
     420              : {
     421          405 :     WalRcvData *walrcv = WalRcv;
     422              :     TimestampTz lastMsgSendTime;
     423              :     TimestampTz lastMsgReceiptTime;
     424              : 
     425          405 :     SpinLockAcquire(&walrcv->mutex);
     426          405 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     427          405 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     428          405 :     SpinLockRelease(&walrcv->mutex);
     429              : 
     430          405 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     431              :                                            lastMsgReceiptTime);
     432              : }
        

Generated by: LCOV version 2.0-1