LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 122 147 83.0 %
Date: 2025-01-18 04:15:08 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2025, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "access/xlogrecovery.h"
      27             : #include "pgstat.h"
      28             : #include "replication/walreceiver.h"
      29             : #include "storage/pmsignal.h"
      30             : #include "storage/proc.h"
      31             : #include "storage/shmem.h"
      32             : #include "utils/timestamp.h"
      33             : 
      34             : WalRcvData *WalRcv = NULL;
      35             : 
      36             : /*
      37             :  * How long to wait for walreceiver to start up after requesting
      38             :  * postmaster to launch it. In seconds.
      39             :  */
      40             : #define WALRCV_STARTUP_TIMEOUT 10
      41             : 
      42             : /* Report shared memory space needed by WalRcvShmemInit */
      43             : Size
      44        7402 : WalRcvShmemSize(void)
      45             : {
      46        7402 :     Size        size = 0;
      47             : 
      48        7402 :     size = add_size(size, sizeof(WalRcvData));
      49             : 
      50        7402 :     return size;
      51             : }
      52             : 
      53             : /* Allocate and initialize walreceiver-related shared memory */
      54             : void
      55        1918 : WalRcvShmemInit(void)
      56             : {
      57             :     bool        found;
      58             : 
      59        1918 :     WalRcv = (WalRcvData *)
      60        1918 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      61             : 
      62        1918 :     if (!found)
      63             :     {
      64             :         /* First time through, so initialize */
      65        1918 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      66        1918 :         WalRcv->walRcvState = WALRCV_STOPPED;
      67        1918 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      68        1918 :         SpinLockInit(&WalRcv->mutex);
      69        1918 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      70        1918 :         WalRcv->procno = INVALID_PROC_NUMBER;
      71             :     }
      72        1918 : }
      73             : 
      74             : /* Is walreceiver running (or starting up)? */
      75             : bool
      76        1922 : WalRcvRunning(void)
      77             : {
      78        1922 :     WalRcvData *walrcv = WalRcv;
      79             :     WalRcvState state;
      80             :     pg_time_t   startTime;
      81             : 
      82        1922 :     SpinLockAcquire(&walrcv->mutex);
      83             : 
      84        1922 :     state = walrcv->walRcvState;
      85        1922 :     startTime = walrcv->startTime;
      86             : 
      87        1922 :     SpinLockRelease(&walrcv->mutex);
      88             : 
      89             :     /*
      90             :      * If it has taken too long for walreceiver to start up, give up. Setting
      91             :      * the state to STOPPED ensures that if walreceiver later does start up
      92             :      * after all, it will see that it's not supposed to be running and die
      93             :      * without doing anything.
      94             :      */
      95        1922 :     if (state == WALRCV_STARTING)
      96             :     {
      97           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      98             : 
      99           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     100             :         {
     101           0 :             bool        stopped = false;
     102             : 
     103           0 :             SpinLockAcquire(&walrcv->mutex);
     104           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     105             :             {
     106           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     107           0 :                 stopped = true;
     108             :             }
     109           0 :             SpinLockRelease(&walrcv->mutex);
     110             : 
     111           0 :             if (stopped)
     112           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     113             :         }
     114             :     }
     115             : 
     116        1922 :     if (state != WALRCV_STOPPED)
     117          74 :         return true;
     118             :     else
     119        1848 :         return false;
     120             : }
     121             : 
     122             : /*
     123             :  * Is walreceiver running and streaming (or at least attempting to connect,
     124             :  * or starting up)?
     125             :  */
     126             : bool
     127       12818 : WalRcvStreaming(void)
     128             : {
     129       12818 :     WalRcvData *walrcv = WalRcv;
     130             :     WalRcvState state;
     131             :     pg_time_t   startTime;
     132             : 
     133       12818 :     SpinLockAcquire(&walrcv->mutex);
     134             : 
     135       12818 :     state = walrcv->walRcvState;
     136       12818 :     startTime = walrcv->startTime;
     137             : 
     138       12818 :     SpinLockRelease(&walrcv->mutex);
     139             : 
     140             :     /*
     141             :      * If it has taken too long for walreceiver to start up, give up. Setting
     142             :      * the state to STOPPED ensures that if walreceiver later does start up
     143             :      * after all, it will see that it's not supposed to be running and die
     144             :      * without doing anything.
     145             :      */
     146       12818 :     if (state == WALRCV_STARTING)
     147             :     {
     148         442 :         pg_time_t   now = (pg_time_t) time(NULL);
     149             : 
     150         442 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     151             :         {
     152           0 :             bool        stopped = false;
     153             : 
     154           0 :             SpinLockAcquire(&walrcv->mutex);
     155           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     156             :             {
     157           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     158           0 :                 stopped = true;
     159             :             }
     160           0 :             SpinLockRelease(&walrcv->mutex);
     161             : 
     162           0 :             if (stopped)
     163           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     164             :         }
     165             :     }
     166             : 
     167       12818 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     168             :         state == WALRCV_RESTARTING)
     169       12626 :         return true;
     170             :     else
     171         192 :         return false;
     172             : }
     173             : 
     174             : /*
     175             :  * Stop walreceiver (if running) and wait for it to die.
     176             :  * Executed by the Startup process.
     177             :  */
     178             : void
     179        1848 : ShutdownWalRcv(void)
     180             : {
     181        1848 :     WalRcvData *walrcv = WalRcv;
     182        1848 :     pid_t       walrcvpid = 0;
     183        1848 :     bool        stopped = false;
     184             : 
     185             :     /*
     186             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     187             :      * mode once it's finished, and will also request postmaster to not
     188             :      * restart itself.
     189             :      */
     190        1848 :     SpinLockAcquire(&walrcv->mutex);
     191        1848 :     switch (walrcv->walRcvState)
     192             :     {
     193        1768 :         case WALRCV_STOPPED:
     194        1768 :             break;
     195          12 :         case WALRCV_STARTING:
     196          12 :             walrcv->walRcvState = WALRCV_STOPPED;
     197          12 :             stopped = true;
     198          12 :             break;
     199             : 
     200          68 :         case WALRCV_STREAMING:
     201             :         case WALRCV_WAITING:
     202             :         case WALRCV_RESTARTING:
     203          68 :             walrcv->walRcvState = WALRCV_STOPPING;
     204             :             /* fall through */
     205          68 :         case WALRCV_STOPPING:
     206          68 :             walrcvpid = walrcv->pid;
     207          68 :             break;
     208             :     }
     209        1848 :     SpinLockRelease(&walrcv->mutex);
     210             : 
     211             :     /* Unnecessary but consistent. */
     212        1848 :     if (stopped)
     213          12 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     214             : 
     215             :     /*
     216             :      * Signal walreceiver process if it was still running.
     217             :      */
     218        1848 :     if (walrcvpid != 0)
     219          68 :         kill(walrcvpid, SIGTERM);
     220             : 
     221             :     /*
     222             :      * Wait for walreceiver to acknowledge its death by setting state to
     223             :      * WALRCV_STOPPED.
     224             :      */
     225        1848 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     226        1916 :     while (WalRcvRunning())
     227          68 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     228             :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     229        1848 :     ConditionVariableCancelSleep();
     230        1848 : }
     231             : 
     232             : /*
     233             :  * Request postmaster to start walreceiver.
     234             :  *
     235             :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     236             :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     237             :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     238             :  * a temporary slot when no "slotname" is given.
     239             :  *
     240             :  * WAL receivers do not directly load GUC parameters used for the connection
     241             :  * to the primary, and rely on the values passed down by the caller of this
     242             :  * routine instead.  Hence, the addition of any new parameters should happen
     243             :  * through this code path.
     244             :  */
     245             : void
     246         294 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     247             :                      const char *slotname, bool create_temp_slot)
     248             : {
     249         294 :     WalRcvData *walrcv = WalRcv;
     250         294 :     bool        launch = false;
     251         294 :     pg_time_t   now = (pg_time_t) time(NULL);
     252             :     ProcNumber  walrcv_proc;
     253             : 
     254             :     /*
     255             :      * We always start at the beginning of the segment. That prevents a broken
     256             :      * segment (i.e., with no records in the first half of a segment) from
     257             :      * being created by XLOG streaming, which might cause trouble later on if
     258             :      * the segment is e.g archived.
     259             :      */
     260         294 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     261         294 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     262             : 
     263         294 :     SpinLockAcquire(&walrcv->mutex);
     264             : 
     265             :     /* It better be stopped if we try to restart it */
     266             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     267             :            walrcv->walRcvState == WALRCV_WAITING);
     268             : 
     269         294 :     if (conninfo != NULL)
     270         294 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     271             :     else
     272           0 :         walrcv->conninfo[0] = '\0';
     273             : 
     274             :     /*
     275             :      * Use configured replication slot if present, and ignore the value of
     276             :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     277             :      * create_temp_slot to determine whether this WAL receiver should create a
     278             :      * temporary slot by itself and use it, or not.
     279             :      */
     280         294 :     if (slotname != NULL && slotname[0] != '\0')
     281             :     {
     282          90 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
     283          90 :         walrcv->is_temp_slot = false;
     284             :     }
     285             :     else
     286             :     {
     287         204 :         walrcv->slotname[0] = '\0';
     288         204 :         walrcv->is_temp_slot = create_temp_slot;
     289             :     }
     290             : 
     291         294 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     292             :     {
     293         294 :         launch = true;
     294         294 :         walrcv->walRcvState = WALRCV_STARTING;
     295             :     }
     296             :     else
     297           0 :         walrcv->walRcvState = WALRCV_RESTARTING;
     298         294 :     walrcv->startTime = now;
     299             : 
     300             :     /*
     301             :      * If this is the first startup of walreceiver (on this timeline),
     302             :      * initialize flushedUpto and latestChunkStart to the starting point.
     303             :      */
     304         294 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
     305             :     {
     306         186 :         walrcv->flushedUpto = recptr;
     307         186 :         walrcv->receivedTLI = tli;
     308         186 :         walrcv->latestChunkStart = recptr;
     309             :     }
     310         294 :     walrcv->receiveStart = recptr;
     311         294 :     walrcv->receiveStartTLI = tli;
     312             : 
     313         294 :     walrcv_proc = walrcv->procno;
     314             : 
     315         294 :     SpinLockRelease(&walrcv->mutex);
     316             : 
     317         294 :     if (launch)
     318         294 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     319           0 :     else if (walrcv_proc != INVALID_PROC_NUMBER)
     320           0 :         SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
     321         294 : }
     322             : 
     323             : /*
     324             :  * Returns the last+1 byte position that walreceiver has flushed.
     325             :  *
     326             :  * Optionally, returns the previous chunk start, that is the first byte
     327             :  * written in the most recent walreceiver flush cycle.  Callers not
     328             :  * interested in that value may pass NULL for latestChunkStart. Same for
     329             :  * receiveTLI.
     330             :  */
     331             : XLogRecPtr
     332       11236 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     333             : {
     334       11236 :     WalRcvData *walrcv = WalRcv;
     335             :     XLogRecPtr  recptr;
     336             : 
     337       11236 :     SpinLockAcquire(&walrcv->mutex);
     338       11236 :     recptr = walrcv->flushedUpto;
     339       11236 :     if (latestChunkStart)
     340        9068 :         *latestChunkStart = walrcv->latestChunkStart;
     341       11236 :     if (receiveTLI)
     342       10868 :         *receiveTLI = walrcv->receivedTLI;
     343       11236 :     SpinLockRelease(&walrcv->mutex);
     344             : 
     345       11236 :     return recptr;
     346             : }
     347             : 
     348             : /*
     349             :  * Returns the last+1 byte position that walreceiver has written.
     350             :  * This returns a recently written value without taking a lock.
     351             :  */
     352             : XLogRecPtr
     353           0 : GetWalRcvWriteRecPtr(void)
     354             : {
     355           0 :     WalRcvData *walrcv = WalRcv;
     356             : 
     357           0 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     358             : }
     359             : 
     360             : /*
     361             :  * Returns the replication apply delay in ms or -1
     362             :  * if the apply delay info is not available
     363             :  */
     364             : int
     365         768 : GetReplicationApplyDelay(void)
     366             : {
     367         768 :     WalRcvData *walrcv = WalRcv;
     368             :     XLogRecPtr  receivePtr;
     369             :     XLogRecPtr  replayPtr;
     370             :     TimestampTz chunkReplayStartTime;
     371             : 
     372         768 :     SpinLockAcquire(&walrcv->mutex);
     373         768 :     receivePtr = walrcv->flushedUpto;
     374         768 :     SpinLockRelease(&walrcv->mutex);
     375             : 
     376         768 :     replayPtr = GetXLogReplayRecPtr(NULL);
     377             : 
     378         768 :     if (receivePtr == replayPtr)
     379         562 :         return 0;
     380             : 
     381         206 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     382             : 
     383         206 :     if (chunkReplayStartTime == 0)
     384           2 :         return -1;
     385             : 
     386         204 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     387             :                                            GetCurrentTimestamp());
     388             : }
     389             : 
     390             : /*
     391             :  * Returns the network latency in ms, note that this includes any
     392             :  * difference in clock settings between the servers, as well as timezone.
     393             :  */
     394             : int
     395         768 : GetReplicationTransferLatency(void)
     396             : {
     397         768 :     WalRcvData *walrcv = WalRcv;
     398             :     TimestampTz lastMsgSendTime;
     399             :     TimestampTz lastMsgReceiptTime;
     400             : 
     401         768 :     SpinLockAcquire(&walrcv->mutex);
     402         768 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     403         768 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     404         768 :     SpinLockRelease(&walrcv->mutex);
     405             : 
     406         768 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     407             :                                            lastMsgReceiptTime);
     408             : }

Generated by: LCOV version 1.14