LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 122 147 83.0 %
Date: 2024-03-19 07:11:08 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "access/xlogrecovery.h"
      27             : #include "pgstat.h"
      28             : #include "replication/walreceiver.h"
      29             : #include "storage/pmsignal.h"
      30             : #include "storage/shmem.h"
      31             : #include "utils/timestamp.h"
      32             : 
      33             : WalRcvData *WalRcv = NULL;
      34             : 
      35             : /*
      36             :  * How long to wait for walreceiver to start up after requesting
      37             :  * postmaster to launch it. In seconds.
      38             :  */
      39             : #define WALRCV_STARTUP_TIMEOUT 10
      40             : 
      41             : /* Report shared memory space needed by WalRcvShmemInit */
      42             : Size
      43        6598 : WalRcvShmemSize(void)
      44             : {
      45        6598 :     Size        size = 0;
      46             : 
      47        6598 :     size = add_size(size, sizeof(WalRcvData));
      48             : 
      49        6598 :     return size;
      50             : }
      51             : 
      52             : /* Allocate and initialize walreceiver-related shared memory */
      53             : void
      54        1706 : WalRcvShmemInit(void)
      55             : {
      56             :     bool        found;
      57             : 
      58        1706 :     WalRcv = (WalRcvData *)
      59        1706 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      60             : 
      61        1706 :     if (!found)
      62             :     {
      63             :         /* First time through, so initialize */
      64        1706 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      65        1706 :         WalRcv->walRcvState = WALRCV_STOPPED;
      66        1706 :         ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      67        1706 :         SpinLockInit(&WalRcv->mutex);
      68        1706 :         pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      69        1706 :         WalRcv->latch = NULL;
      70             :     }
      71        1706 : }
      72             : 
      73             : /* Is walreceiver running (or starting up)? */
      74             : bool
      75        1722 : WalRcvRunning(void)
      76             : {
      77        1722 :     WalRcvData *walrcv = WalRcv;
      78             :     WalRcvState state;
      79             :     pg_time_t   startTime;
      80             : 
      81        1722 :     SpinLockAcquire(&walrcv->mutex);
      82             : 
      83        1722 :     state = walrcv->walRcvState;
      84        1722 :     startTime = walrcv->startTime;
      85             : 
      86        1722 :     SpinLockRelease(&walrcv->mutex);
      87             : 
      88             :     /*
      89             :      * If it has taken too long for walreceiver to start up, give up. Setting
      90             :      * the state to STOPPED ensures that if walreceiver later does start up
      91             :      * after all, it will see that it's not supposed to be running and die
      92             :      * without doing anything.
      93             :      */
      94        1722 :     if (state == WALRCV_STARTING)
      95             :     {
      96           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      97             : 
      98           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
      99             :         {
     100           0 :             bool        stopped = false;
     101             : 
     102           0 :             SpinLockAcquire(&walrcv->mutex);
     103           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     104             :             {
     105           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     106           0 :                 stopped = true;
     107             :             }
     108           0 :             SpinLockRelease(&walrcv->mutex);
     109             : 
     110           0 :             if (stopped)
     111           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     112             :         }
     113             :     }
     114             : 
     115        1722 :     if (state != WALRCV_STOPPED)
     116          68 :         return true;
     117             :     else
     118        1654 :         return false;
     119             : }
     120             : 
     121             : /*
     122             :  * Is walreceiver running and streaming (or at least attempting to connect,
     123             :  * or starting up)?
     124             :  */
     125             : bool
     126       59696 : WalRcvStreaming(void)
     127             : {
     128       59696 :     WalRcvData *walrcv = WalRcv;
     129             :     WalRcvState state;
     130             :     pg_time_t   startTime;
     131             : 
     132       59696 :     SpinLockAcquire(&walrcv->mutex);
     133             : 
     134       59696 :     state = walrcv->walRcvState;
     135       59696 :     startTime = walrcv->startTime;
     136             : 
     137       59696 :     SpinLockRelease(&walrcv->mutex);
     138             : 
     139             :     /*
     140             :      * If it has taken too long for walreceiver to start up, give up. Setting
     141             :      * the state to STOPPED ensures that if walreceiver later does start up
     142             :      * after all, it will see that it's not supposed to be running and die
     143             :      * without doing anything.
     144             :      */
     145       59696 :     if (state == WALRCV_STARTING)
     146             :     {
     147         398 :         pg_time_t   now = (pg_time_t) time(NULL);
     148             : 
     149         398 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     150             :         {
     151           0 :             bool        stopped = false;
     152             : 
     153           0 :             SpinLockAcquire(&walrcv->mutex);
     154           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     155             :             {
     156           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     157           0 :                 stopped = true;
     158             :             }
     159           0 :             SpinLockRelease(&walrcv->mutex);
     160             : 
     161           0 :             if (stopped)
     162           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     163             :         }
     164             :     }
     165             : 
     166       59696 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     167             :         state == WALRCV_RESTARTING)
     168       59524 :         return true;
     169             :     else
     170         172 :         return false;
     171             : }
     172             : 
     173             : /*
     174             :  * Stop walreceiver (if running) and wait for it to die.
     175             :  * Executed by the Startup process.
     176             :  */
     177             : void
     178        1654 : ShutdownWalRcv(void)
     179             : {
     180        1654 :     WalRcvData *walrcv = WalRcv;
     181        1654 :     pid_t       walrcvpid = 0;
     182        1654 :     bool        stopped = false;
     183             : 
     184             :     /*
     185             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     186             :      * mode once it's finished, and will also request postmaster to not
     187             :      * restart itself.
     188             :      */
     189        1654 :     SpinLockAcquire(&walrcv->mutex);
     190        1654 :     switch (walrcv->walRcvState)
     191             :     {
     192        1584 :         case WALRCV_STOPPED:
     193        1584 :             break;
     194           8 :         case WALRCV_STARTING:
     195           8 :             walrcv->walRcvState = WALRCV_STOPPED;
     196           8 :             stopped = true;
     197           8 :             break;
     198             : 
     199          62 :         case WALRCV_STREAMING:
     200             :         case WALRCV_WAITING:
     201             :         case WALRCV_RESTARTING:
     202          62 :             walrcv->walRcvState = WALRCV_STOPPING;
     203             :             /* fall through */
     204          62 :         case WALRCV_STOPPING:
     205          62 :             walrcvpid = walrcv->pid;
     206          62 :             break;
     207             :     }
     208        1654 :     SpinLockRelease(&walrcv->mutex);
     209             : 
     210             :     /* Unnecessary but consistent. */
     211        1654 :     if (stopped)
     212           8 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     213             : 
     214             :     /*
     215             :      * Signal walreceiver process if it was still running.
     216             :      */
     217        1654 :     if (walrcvpid != 0)
     218          62 :         kill(walrcvpid, SIGTERM);
     219             : 
     220             :     /*
     221             :      * Wait for walreceiver to acknowledge its death by setting state to
     222             :      * WALRCV_STOPPED.
     223             :      */
     224        1654 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     225        1716 :     while (WalRcvRunning())
     226          62 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     227             :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     228        1654 :     ConditionVariableCancelSleep();
     229        1654 : }
     230             : 
     231             : /*
     232             :  * Request postmaster to start walreceiver.
     233             :  *
     234             :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     235             :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     236             :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     237             :  * a temporary slot when no "slotname" is given.
     238             :  *
     239             :  * WAL receivers do not directly load GUC parameters used for the connection
     240             :  * to the primary, and rely on the values passed down by the caller of this
     241             :  * routine instead.  Hence, the addition of any new parameters should happen
     242             :  * through this code path.
     243             :  */
     244             : void
     245         246 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     246             :                      const char *slotname, bool create_temp_slot)
     247             : {
     248         246 :     WalRcvData *walrcv = WalRcv;
     249         246 :     bool        launch = false;
     250         246 :     pg_time_t   now = (pg_time_t) time(NULL);
     251             :     Latch      *latch;
     252             : 
     253             :     /*
     254             :      * We always start at the beginning of the segment. That prevents a broken
     255             :      * segment (i.e., with no records in the first half of a segment) from
     256             :      * being created by XLOG streaming, which might cause trouble later on if
     257             :      * the segment is e.g archived.
     258             :      */
     259         246 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     260         246 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     261             : 
     262         246 :     SpinLockAcquire(&walrcv->mutex);
     263             : 
     264             :     /* It better be stopped if we try to restart it */
     265             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     266             :            walrcv->walRcvState == WALRCV_WAITING);
     267             : 
     268         246 :     if (conninfo != NULL)
     269         246 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     270             :     else
     271           0 :         walrcv->conninfo[0] = '\0';
     272             : 
     273             :     /*
     274             :      * Use configured replication slot if present, and ignore the value of
     275             :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     276             :      * create_temp_slot to determine whether this WAL receiver should create a
     277             :      * temporary slot by itself and use it, or not.
     278             :      */
     279         246 :     if (slotname != NULL && slotname[0] != '\0')
     280             :     {
     281          58 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
     282          58 :         walrcv->is_temp_slot = false;
     283             :     }
     284             :     else
     285             :     {
     286         188 :         walrcv->slotname[0] = '\0';
     287         188 :         walrcv->is_temp_slot = create_temp_slot;
     288             :     }
     289             : 
     290         246 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     291             :     {
     292         246 :         launch = true;
     293         246 :         walrcv->walRcvState = WALRCV_STARTING;
     294             :     }
     295             :     else
     296           0 :         walrcv->walRcvState = WALRCV_RESTARTING;
     297         246 :     walrcv->startTime = now;
     298             : 
     299             :     /*
     300             :      * If this is the first startup of walreceiver (on this timeline),
     301             :      * initialize flushedUpto and latestChunkStart to the starting point.
     302             :      */
     303         246 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
     304             :     {
     305         148 :         walrcv->flushedUpto = recptr;
     306         148 :         walrcv->receivedTLI = tli;
     307         148 :         walrcv->latestChunkStart = recptr;
     308             :     }
     309         246 :     walrcv->receiveStart = recptr;
     310         246 :     walrcv->receiveStartTLI = tli;
     311             : 
     312         246 :     latch = walrcv->latch;
     313             : 
     314         246 :     SpinLockRelease(&walrcv->mutex);
     315             : 
     316         246 :     if (launch)
     317         246 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     318           0 :     else if (latch)
     319           0 :         SetLatch(latch);
     320         246 : }
     321             : 
     322             : /*
     323             :  * Returns the last+1 byte position that walreceiver has flushed.
     324             :  *
     325             :  * Optionally, returns the previous chunk start, that is the first byte
     326             :  * written in the most recent walreceiver flush cycle.  Callers not
     327             :  * interested in that value may pass NULL for latestChunkStart. Same for
     328             :  * receiveTLI.
     329             :  */
     330             : XLogRecPtr
     331       60786 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     332             : {
     333       60786 :     WalRcvData *walrcv = WalRcv;
     334             :     XLogRecPtr  recptr;
     335             : 
     336       60786 :     SpinLockAcquire(&walrcv->mutex);
     337       60786 :     recptr = walrcv->flushedUpto;
     338       60786 :     if (latestChunkStart)
     339       59222 :         *latestChunkStart = walrcv->latestChunkStart;
     340       60786 :     if (receiveTLI)
     341       60710 :         *receiveTLI = walrcv->receivedTLI;
     342       60786 :     SpinLockRelease(&walrcv->mutex);
     343             : 
     344       60786 :     return recptr;
     345             : }
     346             : 
     347             : /*
     348             :  * Returns the last+1 byte position that walreceiver has written.
     349             :  * This returns a recently written value without taking a lock.
     350             :  */
     351             : XLogRecPtr
     352           0 : GetWalRcvWriteRecPtr(void)
     353             : {
     354           0 :     WalRcvData *walrcv = WalRcv;
     355             : 
     356           0 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     357             : }
     358             : 
     359             : /*
     360             :  * Returns the replication apply delay in ms or -1
     361             :  * if the apply delay info is not available
     362             :  */
     363             : int
     364         778 : GetReplicationApplyDelay(void)
     365             : {
     366         778 :     WalRcvData *walrcv = WalRcv;
     367             :     XLogRecPtr  receivePtr;
     368             :     XLogRecPtr  replayPtr;
     369             :     TimestampTz chunkReplayStartTime;
     370             : 
     371         778 :     SpinLockAcquire(&walrcv->mutex);
     372         778 :     receivePtr = walrcv->flushedUpto;
     373         778 :     SpinLockRelease(&walrcv->mutex);
     374             : 
     375         778 :     replayPtr = GetXLogReplayRecPtr(NULL);
     376             : 
     377         778 :     if (receivePtr == replayPtr)
     378         292 :         return 0;
     379             : 
     380         486 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     381             : 
     382         486 :     if (chunkReplayStartTime == 0)
     383           2 :         return -1;
     384             : 
     385         484 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     386             :                                            GetCurrentTimestamp());
     387             : }
     388             : 
     389             : /*
     390             :  * Returns the network latency in ms, note that this includes any
     391             :  * difference in clock settings between the servers, as well as timezone.
     392             :  */
     393             : int
     394         778 : GetReplicationTransferLatency(void)
     395             : {
     396         778 :     WalRcvData *walrcv = WalRcv;
     397             :     TimestampTz lastMsgSendTime;
     398             :     TimestampTz lastMsgReceiptTime;
     399             : 
     400         778 :     SpinLockAcquire(&walrcv->mutex);
     401         778 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     402         778 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     403         778 :     SpinLockRelease(&walrcv->mutex);
     404             : 
     405         778 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     406             :                                            lastMsgReceiptTime);
     407             : }

Generated by: LCOV version 1.14