LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 122 147 83.0 %
Date: 2024-09-16 12:11:48 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "access/xlogrecovery.h"
      27             : #include "pgstat.h"
      28             : #include "replication/walreceiver.h"
      29             : #include "storage/pmsignal.h"
      30             : #include "storage/shmem.h"
      31             : #include "utils/timestamp.h"
      32             : 
      33             : WalRcvData *WalRcv = NULL;
      34             : 
      35             : /*
      36             :  * How long to wait for walreceiver to start up after requesting
      37             :  * postmaster to launch it. In seconds.
      38             :  */
      39             : #define WALRCV_STARTUP_TIMEOUT 10
      40             : 
      41             : /* Report shared memory space needed by WalRcvShmemInit */
      42             : Size
      43        7002 : WalRcvShmemSize(void)
      44             : {
      45        7002 :     Size        size = 0;
      46             : 
      47        7002 :     size = add_size(size, sizeof(WalRcvData));
      48             : 
      49        7002 :     return size;
      50             : }
      51             : 
      52             : /* Allocate and initialize walreceiver-related shared memory */
      53             : void
      54        1810 : WalRcvShmemInit(void)
      55             : {
      56             :     bool        found;
      57             : 
      58        1810 :     WalRcv = (WalRcvData *)
      59        1810 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      60             : 
      61        1810 :     if (!found)
      62             :     {
      63             :         /* First time through, so initialize */
      64        1810 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      65        1810 :         WalRcv->walRcvState = WALRCV_STOPPED;
      66        1810 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      67        1810 :         SpinLockInit(&WalRcv->mutex);
      68        1810 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      69        1810 :         WalRcv->latch = NULL;
      70             :     }
      71        1810 : }
      72             : 
      73             : /* Is walreceiver running (or starting up)? */
      74             : bool
      75        1808 : WalRcvRunning(void)
      76             : {
      77        1808 :     WalRcvData *walrcv = WalRcv;
      78             :     WalRcvState state;
      79             :     pg_time_t   startTime;
      80             : 
      81        1808 :     SpinLockAcquire(&walrcv->mutex);
      82             : 
      83        1808 :     state = walrcv->walRcvState;
      84        1808 :     startTime = walrcv->startTime;
      85             : 
      86        1808 :     SpinLockRelease(&walrcv->mutex);
      87             : 
      88             :     /*
      89             :      * If it has taken too long for walreceiver to start up, give up. Setting
      90             :      * the state to STOPPED ensures that if walreceiver later does start up
      91             :      * after all, it will see that it's not supposed to be running and die
      92             :      * without doing anything.
      93             :      */
      94        1808 :     if (state == WALRCV_STARTING)
      95             :     {
      96           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      97             : 
      98           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
      99             :         {
     100           0 :             bool        stopped = false;
     101             : 
     102           0 :             SpinLockAcquire(&walrcv->mutex);
     103           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     104             :             {
     105           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     106           0 :                 stopped = true;
     107             :             }
     108           0 :             SpinLockRelease(&walrcv->mutex);
     109             : 
     110           0 :             if (stopped)
     111           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     112             :         }
     113             :     }
     114             : 
     115        1808 :     if (state != WALRCV_STOPPED)
     116          74 :         return true;
     117             :     else
     118        1734 :         return false;
     119             : }
     120             : 
     121             : /*
     122             :  * Is walreceiver running and streaming (or at least attempting to connect,
     123             :  * or starting up)?
     124             :  */
     125             : bool
     126       60666 : WalRcvStreaming(void)
     127             : {
     128       60666 :     WalRcvData *walrcv = WalRcv;
     129             :     WalRcvState state;
     130             :     pg_time_t   startTime;
     131             : 
     132       60666 :     SpinLockAcquire(&walrcv->mutex);
     133             : 
     134       60666 :     state = walrcv->walRcvState;
     135       60666 :     startTime = walrcv->startTime;
     136             : 
     137       60666 :     SpinLockRelease(&walrcv->mutex);
     138             : 
     139             :     /*
     140             :      * If it has taken too long for walreceiver to start up, give up. Setting
     141             :      * the state to STOPPED ensures that if walreceiver later does start up
     142             :      * after all, it will see that it's not supposed to be running and die
     143             :      * without doing anything.
     144             :      */
     145       60666 :     if (state == WALRCV_STARTING)
     146             :     {
     147         438 :         pg_time_t   now = (pg_time_t) time(NULL);
     148             : 
     149         438 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     150             :         {
     151           0 :             bool        stopped = false;
     152             : 
     153           0 :             SpinLockAcquire(&walrcv->mutex);
     154           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     155             :             {
     156           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     157           0 :                 stopped = true;
     158             :             }
     159           0 :             SpinLockRelease(&walrcv->mutex);
     160             : 
     161           0 :             if (stopped)
     162           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     163             :         }
     164             :     }
     165             : 
     166       60666 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     167             :         state == WALRCV_RESTARTING)
     168       60488 :         return true;
     169             :     else
     170         178 :         return false;
     171             : }
     172             : 
     173             : /*
     174             :  * Stop walreceiver (if running) and wait for it to die.
     175             :  * Executed by the Startup process.
     176             :  */
     177             : void
     178        1734 : ShutdownWalRcv(void)
     179             : {
     180        1734 :     WalRcvData *walrcv = WalRcv;
     181        1734 :     pid_t       walrcvpid = 0;
     182        1734 :     bool        stopped = false;
     183             : 
     184             :     /*
     185             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     186             :      * mode once it's finished, and will also request postmaster to not
     187             :      * restart itself.
     188             :      */
     189        1734 :     SpinLockAcquire(&walrcv->mutex);
     190        1734 :     switch (walrcv->walRcvState)
     191             :     {
     192        1656 :         case WALRCV_STOPPED:
     193        1656 :             break;
     194          10 :         case WALRCV_STARTING:
     195          10 :             walrcv->walRcvState = WALRCV_STOPPED;
     196          10 :             stopped = true;
     197          10 :             break;
     198             : 
     199          68 :         case WALRCV_STREAMING:
     200             :         case WALRCV_WAITING:
     201             :         case WALRCV_RESTARTING:
     202          68 :             walrcv->walRcvState = WALRCV_STOPPING;
     203             :             /* fall through */
     204          68 :         case WALRCV_STOPPING:
     205          68 :             walrcvpid = walrcv->pid;
     206          68 :             break;
     207             :     }
     208        1734 :     SpinLockRelease(&walrcv->mutex);
     209             : 
     210             :     /* Unnecessary but consistent. */
     211        1734 :     if (stopped)
     212          10 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     213             : 
     214             :     /*
     215             :      * Signal walreceiver process if it was still running.
     216             :      */
     217        1734 :     if (walrcvpid != 0)
     218          68 :         kill(walrcvpid, SIGTERM);
     219             : 
     220             :     /*
     221             :      * Wait for walreceiver to acknowledge its death by setting state to
     222             :      * WALRCV_STOPPED.
     223             :      */
     224        1734 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     225        1802 :     while (WalRcvRunning())
     226          68 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     227             :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     228        1734 :     ConditionVariableCancelSleep();
     229        1734 : }
     230             : 
     231             : /*
     232             :  * Request postmaster to start walreceiver.
     233             :  *
     234             :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     235             :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     236             :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     237             :  * a temporary slot when no "slotname" is given.
     238             :  *
     239             :  * WAL receivers do not directly load GUC parameters used for the connection
     240             :  * to the primary, and rely on the values passed down by the caller of this
     241             :  * routine instead.  Hence, the addition of any new parameters should happen
     242             :  * through this code path.
     243             :  */
     244             : void
     245         292 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     246             :                      const char *slotname, bool create_temp_slot)
     247             : {
     248         292 :     WalRcvData *walrcv = WalRcv;
     249         292 :     bool        launch = false;
     250         292 :     pg_time_t   now = (pg_time_t) time(NULL);
     251             :     Latch      *latch;
     252             : 
     253             :     /*
     254             :      * We always start at the beginning of the segment. That prevents a broken
     255             :      * segment (i.e., with no records in the first half of a segment) from
     256             :      * being created by XLOG streaming, which might cause trouble later on if
     257             :      * the segment is e.g archived.
     258             :      */
     259         292 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     260         292 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     261             : 
     262         292 :     SpinLockAcquire(&walrcv->mutex);
     263             : 
     264             :     /* It better be stopped if we try to restart it */
     265             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     266             :            walrcv->walRcvState == WALRCV_WAITING);
     267             : 
     268         292 :     if (conninfo != NULL)
     269         292 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     270             :     else
     271           0 :         walrcv->conninfo[0] = '\0';
     272             : 
     273             :     /*
     274             :      * Use configured replication slot if present, and ignore the value of
     275             :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     276             :      * create_temp_slot to determine whether this WAL receiver should create a
     277             :      * temporary slot by itself and use it, or not.
     278             :      */
     279         292 :     if (slotname != NULL && slotname[0] != '\0')
     280             :     {
     281          90 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
     282          90 :         walrcv->is_temp_slot = false;
     283             :     }
     284             :     else
     285             :     {
     286         202 :         walrcv->slotname[0] = '\0';
     287         202 :         walrcv->is_temp_slot = create_temp_slot;
     288             :     }
     289             : 
     290         292 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     291             :     {
     292         292 :         launch = true;
     293         292 :         walrcv->walRcvState = WALRCV_STARTING;
     294             :     }
     295             :     else
     296           0 :         walrcv->walRcvState = WALRCV_RESTARTING;
     297         292 :     walrcv->startTime = now;
     298             : 
     299             :     /*
     300             :      * If this is the first startup of walreceiver (on this timeline),
     301             :      * initialize flushedUpto and latestChunkStart to the starting point.
     302             :      */
     303         292 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
     304             :     {
     305         184 :         walrcv->flushedUpto = recptr;
     306         184 :         walrcv->receivedTLI = tli;
     307         184 :         walrcv->latestChunkStart = recptr;
     308             :     }
     309         292 :     walrcv->receiveStart = recptr;
     310         292 :     walrcv->receiveStartTLI = tli;
     311             : 
     312         292 :     latch = walrcv->latch;
     313             : 
     314         292 :     SpinLockRelease(&walrcv->mutex);
     315             : 
     316         292 :     if (launch)
     317         292 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     318           0 :     else if (latch)
     319           0 :         SetLatch(latch);
     320         292 : }
     321             : 
     322             : /*
     323             :  * Returns the last+1 byte position that walreceiver has flushed.
     324             :  *
     325             :  * Optionally, returns the previous chunk start, that is the first byte
     326             :  * written in the most recent walreceiver flush cycle.  Callers not
     327             :  * interested in that value may pass NULL for latestChunkStart. Same for
     328             :  * receiveTLI.
     329             :  */
     330             : XLogRecPtr
     331       62128 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     332             : {
     333       62128 :     WalRcvData *walrcv = WalRcv;
     334             :     XLogRecPtr  recptr;
     335             : 
     336       62128 :     SpinLockAcquire(&walrcv->mutex);
     337       62128 :     recptr = walrcv->flushedUpto;
     338       62128 :     if (latestChunkStart)
     339       60174 :         *latestChunkStart = walrcv->latestChunkStart;
     340       62128 :     if (receiveTLI)
     341       62042 :         *receiveTLI = walrcv->receivedTLI;
     342       62128 :     SpinLockRelease(&walrcv->mutex);
     343             : 
     344       62128 :     return recptr;
     345             : }
     346             : 
     347             : /*
     348             :  * Returns the last+1 byte position that walreceiver has written.
     349             :  * This returns a recently written value without taking a lock.
     350             :  */
     351             : XLogRecPtr
     352           0 : GetWalRcvWriteRecPtr(void)
     353             : {
     354           0 :     WalRcvData *walrcv = WalRcv;
     355             : 
     356           0 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     357             : }
     358             : 
     359             : /*
     360             :  * Returns the replication apply delay in ms or -1
     361             :  * if the apply delay info is not available
     362             :  */
     363             : int
     364         780 : GetReplicationApplyDelay(void)
     365             : {
     366         780 :     WalRcvData *walrcv = WalRcv;
     367             :     XLogRecPtr  receivePtr;
     368             :     XLogRecPtr  replayPtr;
     369             :     TimestampTz chunkReplayStartTime;
     370             : 
     371         780 :     SpinLockAcquire(&walrcv->mutex);
     372         780 :     receivePtr = walrcv->flushedUpto;
     373         780 :     SpinLockRelease(&walrcv->mutex);
     374             : 
     375         780 :     replayPtr = GetXLogReplayRecPtr(NULL);
     376             : 
     377         780 :     if (receivePtr == replayPtr)
     378         300 :         return 0;
     379             : 
     380         480 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     381             : 
     382         480 :     if (chunkReplayStartTime == 0)
     383           2 :         return -1;
     384             : 
     385         478 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     386             :                                            GetCurrentTimestamp());
     387             : }
     388             : 
     389             : /*
     390             :  * Returns the network latency in ms, note that this includes any
     391             :  * difference in clock settings between the servers, as well as timezone.
     392             :  */
     393             : int
     394         780 : GetReplicationTransferLatency(void)
     395             : {
     396         780 :     WalRcvData *walrcv = WalRcv;
     397             :     TimestampTz lastMsgSendTime;
     398             :     TimestampTz lastMsgReceiptTime;
     399             : 
     400         780 :     SpinLockAcquire(&walrcv->mutex);
     401         780 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     402         780 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     403         780 :     SpinLockRelease(&walrcv->mutex);
     404             : 
     405         780 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     406             :                                            lastMsgReceiptTime);
     407             : }

Generated by: LCOV version 1.14