LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 118 137 86.1 %
Date: 2020-06-03 10:06:28 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "postmaster/startup.h"
      27             : #include "replication/walreceiver.h"
      28             : #include "storage/pmsignal.h"
      29             : #include "storage/shmem.h"
      30             : #include "utils/timestamp.h"
      31             : 
      32             : WalRcvData *WalRcv = NULL;
      33             : 
      34             : /*
      35             :  * How long to wait for walreceiver to start up after requesting
      36             :  * postmaster to launch it. In seconds.
      37             :  */
      38             : #define WALRCV_STARTUP_TIMEOUT 10
      39             : 
      40             : /* Report shared memory space needed by WalRcvShmemInit */
      41             : Size
      42        6514 : WalRcvShmemSize(void)
      43             : {
      44        6514 :     Size        size = 0;
      45             : 
      46        6514 :     size = add_size(size, sizeof(WalRcvData));
      47             : 
      48        6514 :     return size;
      49             : }
      50             : 
      51             : /* Allocate and initialize walreceiver-related shared memory */
      52             : void
      53        2170 : WalRcvShmemInit(void)
      54             : {
      55             :     bool        found;
      56             : 
      57        2170 :     WalRcv = (WalRcvData *)
      58        2170 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      59             : 
      60        2170 :     if (!found)
      61             :     {
      62             :         /* First time through, so initialize */
      63        2170 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      64        2170 :         WalRcv->walRcvState = WALRCV_STOPPED;
      65        2170 :         SpinLockInit(&WalRcv->mutex);
      66        2170 :         WalRcv->latch = NULL;
      67             :     }
      68        2170 : }
      69             : 
      70             : /* Is walreceiver running (or starting up)? */
      71             : bool
      72        1434 : WalRcvRunning(void)
      73             : {
      74        1434 :     WalRcvData *walrcv = WalRcv;
      75             :     WalRcvState state;
      76             :     pg_time_t   startTime;
      77             : 
      78        1434 :     SpinLockAcquire(&walrcv->mutex);
      79             : 
      80        1434 :     state = walrcv->walRcvState;
      81        1434 :     startTime = walrcv->startTime;
      82             : 
      83        1434 :     SpinLockRelease(&walrcv->mutex);
      84             : 
      85             :     /*
      86             :      * If it has taken too long for walreceiver to start up, give up. Setting
      87             :      * the state to STOPPED ensures that if walreceiver later does start up
      88             :      * after all, it will see that it's not supposed to be running and die
      89             :      * without doing anything.
      90             :      */
      91        1434 :     if (state == WALRCV_STARTING)
      92             :     {
      93           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      94             : 
      95           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
      96             :         {
      97           0 :             SpinLockAcquire(&walrcv->mutex);
      98             : 
      99           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     100           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     101             : 
     102           0 :             SpinLockRelease(&walrcv->mutex);
     103             :         }
     104             :     }
     105             : 
     106        1434 :     if (state != WALRCV_STOPPED)
     107          30 :         return true;
     108             :     else
     109        1404 :         return false;
     110             : }
     111             : 
     112             : /*
     113             :  * Is walreceiver running and streaming (or at least attempting to connect,
     114             :  * or starting up)?
     115             :  */
     116             : bool
     117        1416 : WalRcvStreaming(void)
     118             : {
     119        1416 :     WalRcvData *walrcv = WalRcv;
     120             :     WalRcvState state;
     121             :     pg_time_t   startTime;
     122             : 
     123        1416 :     SpinLockAcquire(&walrcv->mutex);
     124             : 
     125        1416 :     state = walrcv->walRcvState;
     126        1416 :     startTime = walrcv->startTime;
     127             : 
     128        1416 :     SpinLockRelease(&walrcv->mutex);
     129             : 
     130             :     /*
     131             :      * If it has taken too long for walreceiver to start up, give up. Setting
     132             :      * the state to STOPPED ensures that if walreceiver later does start up
     133             :      * after all, it will see that it's not supposed to be running and die
     134             :      * without doing anything.
     135             :      */
     136        1416 :     if (state == WALRCV_STARTING)
     137             :     {
     138         260 :         pg_time_t   now = (pg_time_t) time(NULL);
     139             : 
     140         260 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     141             :         {
     142           0 :             SpinLockAcquire(&walrcv->mutex);
     143             : 
     144           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     145           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     146             : 
     147           0 :             SpinLockRelease(&walrcv->mutex);
     148             :         }
     149             :     }
     150             : 
     151        1416 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     152             :         state == WALRCV_RESTARTING)
     153        1212 :         return true;
     154             :     else
     155         204 :         return false;
     156             : }
     157             : 
     158             : /*
     159             :  * Stop walreceiver (if running) and wait for it to die.
     160             :  * Executed by the Startup process.
     161             :  */
     162             : void
     163        1404 : ShutdownWalRcv(void)
     164             : {
     165        1404 :     WalRcvData *walrcv = WalRcv;
     166        1404 :     pid_t       walrcvpid = 0;
     167             : 
     168             :     /*
     169             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     170             :      * mode once it's finished, and will also request postmaster to not
     171             :      * restart itself.
     172             :      */
     173        1404 :     SpinLockAcquire(&walrcv->mutex);
     174        1404 :     switch (walrcv->walRcvState)
     175             :     {
     176        1368 :         case WALRCV_STOPPED:
     177        1368 :             break;
     178           6 :         case WALRCV_STARTING:
     179           6 :             walrcv->walRcvState = WALRCV_STOPPED;
     180           6 :             break;
     181             : 
     182          30 :         case WALRCV_STREAMING:
     183             :         case WALRCV_WAITING:
     184             :         case WALRCV_RESTARTING:
     185          30 :             walrcv->walRcvState = WALRCV_STOPPING;
     186             :             /* fall through */
     187          30 :         case WALRCV_STOPPING:
     188          30 :             walrcvpid = walrcv->pid;
     189          30 :             break;
     190             :     }
     191        1404 :     SpinLockRelease(&walrcv->mutex);
     192             : 
     193             :     /*
     194             :      * Signal walreceiver process if it was still running.
     195             :      */
     196        1404 :     if (walrcvpid != 0)
     197          30 :         kill(walrcvpid, SIGTERM);
     198             : 
     199             :     /*
     200             :      * Wait for walreceiver to acknowledge its death by setting state to
     201             :      * WALRCV_STOPPED.
     202             :      */
     203        1434 :     while (WalRcvRunning())
     204             :     {
     205             :         /*
     206             :          * This possibly-long loop needs to handle interrupts of startup
     207             :          * process.
     208             :          */
     209          30 :         HandleStartupProcInterrupts();
     210             : 
     211          30 :         pg_usleep(100000);      /* 100ms */
     212             :     }
     213        1404 : }
     214             : 
     215             : /*
     216             :  * Request postmaster to start walreceiver.
     217             :  *
     218             :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     219             :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     220             :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     221             :  * a temporary slot when no "slotname" is given.
     222             :  *
     223             :  * WAL receivers do not directly load GUC parameters used for the connection
     224             :  * to the primary, and rely on the values passed down by the caller of this
     225             :  * routine instead.  Hence, the addition of any new parameters should happen
     226             :  * through this code path.
     227             :  */
     228             : void
     229         150 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     230             :                      const char *slotname, bool create_temp_slot)
     231             : {
     232         150 :     WalRcvData *walrcv = WalRcv;
     233         150 :     bool        launch = false;
     234         150 :     pg_time_t   now = (pg_time_t) time(NULL);
     235             :     Latch      *latch;
     236             : 
     237             :     /*
     238             :      * We always start at the beginning of the segment. That prevents a broken
     239             :      * segment (i.e., with no records in the first half of a segment) from
     240             :      * being created by XLOG streaming, which might cause trouble later on if
     241             :      * the segment is e.g archived.
     242             :      */
     243         150 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     244         150 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     245             : 
     246         150 :     SpinLockAcquire(&walrcv->mutex);
     247             : 
     248             :     /* It better be stopped if we try to restart it */
     249             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     250             :            walrcv->walRcvState == WALRCV_WAITING);
     251             : 
     252         150 :     if (conninfo != NULL)
     253         150 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     254             :     else
     255           0 :         walrcv->conninfo[0] = '\0';
     256             : 
     257             :     /*
     258             :      * Use configured replication slot if present, and ignore the value of
     259             :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     260             :      * create_temp_slot to determine whether this WAL receiver should create a
     261             :      * temporary slot by itself and use it, or not.
     262             :      */
     263         150 :     if (slotname != NULL && slotname[0] != '\0')
     264             :     {
     265          16 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
     266          16 :         walrcv->is_temp_slot = false;
     267             :     }
     268             :     else
     269             :     {
     270         134 :         walrcv->slotname[0] = '\0';
     271         134 :         walrcv->is_temp_slot = create_temp_slot;
     272             :     }
     273             : 
     274         150 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     275             :     {
     276         144 :         launch = true;
     277         144 :         walrcv->walRcvState = WALRCV_STARTING;
     278             :     }
     279             :     else
     280           6 :         walrcv->walRcvState = WALRCV_RESTARTING;
     281         150 :     walrcv->startTime = now;
     282             : 
     283             :     /*
     284             :      * If this is the first startup of walreceiver (on this timeline),
     285             :      * initialize flushedUpto and latestChunkStart to the starting point.
     286             :      */
     287         150 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
     288             :     {
     289          82 :         walrcv->flushedUpto = recptr;
     290          82 :         walrcv->receivedTLI = tli;
     291          82 :         walrcv->latestChunkStart = recptr;
     292             :     }
     293         150 :     walrcv->receiveStart = recptr;
     294         150 :     walrcv->receiveStartTLI = tli;
     295             : 
     296         150 :     latch = walrcv->latch;
     297             : 
     298         150 :     SpinLockRelease(&walrcv->mutex);
     299             : 
     300         150 :     if (launch)
     301         144 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     302           6 :     else if (latch)
     303           6 :         SetLatch(latch);
     304         150 : }
     305             : 
     306             : /*
     307             :  * Returns the last+1 byte position that walreceiver has flushed.
     308             :  *
     309             :  * Optionally, returns the previous chunk start, that is the first byte
     310             :  * written in the most recent walreceiver flush cycle.  Callers not
     311             :  * interested in that value may pass NULL for latestChunkStart. Same for
     312             :  * receiveTLI.
     313             :  */
     314             : XLogRecPtr
     315        1478 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     316             : {
     317        1478 :     WalRcvData *walrcv = WalRcv;
     318             :     XLogRecPtr  recptr;
     319             : 
     320        1478 :     SpinLockAcquire(&walrcv->mutex);
     321        1478 :     recptr = walrcv->flushedUpto;
     322        1478 :     if (latestChunkStart)
     323        1032 :         *latestChunkStart = walrcv->latestChunkStart;
     324        1478 :     if (receiveTLI)
     325        1448 :         *receiveTLI = walrcv->receivedTLI;
     326        1478 :     SpinLockRelease(&walrcv->mutex);
     327             : 
     328        1478 :     return recptr;
     329             : }
     330             : 
     331             : /*
     332             :  * Returns the last+1 byte position that walreceiver has written.
     333             :  * This returns a recently written value without taking a lock.
     334             :  */
     335             : XLogRecPtr
     336           0 : GetWalRcvWriteRecPtr(void)
     337             : {
     338           0 :     WalRcvData *walrcv = WalRcv;
     339             : 
     340           0 :     return pg_atomic_read_u64(&walrcv->writtenUpto);
     341             : }
     342             : 
     343             : /*
     344             :  * Returns the replication apply delay in ms or -1
     345             :  * if the apply delay info is not available
     346             :  */
     347             : int
     348          12 : GetReplicationApplyDelay(void)
     349             : {
     350          12 :     WalRcvData *walrcv = WalRcv;
     351             :     XLogRecPtr  receivePtr;
     352             :     XLogRecPtr  replayPtr;
     353             : 
     354             :     long        secs;
     355             :     int         usecs;
     356             : 
     357             :     TimestampTz chunkReplayStartTime;
     358             : 
     359          12 :     SpinLockAcquire(&walrcv->mutex);
     360          12 :     receivePtr = walrcv->flushedUpto;
     361          12 :     SpinLockRelease(&walrcv->mutex);
     362             : 
     363          12 :     replayPtr = GetXLogReplayRecPtr(NULL);
     364             : 
     365          12 :     if (receivePtr == replayPtr)
     366          12 :         return 0;
     367             : 
     368           0 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     369             : 
     370           0 :     if (chunkReplayStartTime == 0)
     371           0 :         return -1;
     372             : 
     373           0 :     TimestampDifference(chunkReplayStartTime,
     374             :                         GetCurrentTimestamp(),
     375             :                         &secs, &usecs);
     376             : 
     377           0 :     return (((int) secs * 1000) + (usecs / 1000));
     378             : }
     379             : 
     380             : /*
     381             :  * Returns the network latency in ms, note that this includes any
     382             :  * difference in clock settings between the servers, as well as timezone.
     383             :  */
     384             : int
     385          12 : GetReplicationTransferLatency(void)
     386             : {
     387          12 :     WalRcvData *walrcv = WalRcv;
     388             : 
     389             :     TimestampTz lastMsgSendTime;
     390             :     TimestampTz lastMsgReceiptTime;
     391             : 
     392          12 :     long        secs = 0;
     393          12 :     int         usecs = 0;
     394             :     int         ms;
     395             : 
     396          12 :     SpinLockAcquire(&walrcv->mutex);
     397          12 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     398          12 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     399          12 :     SpinLockRelease(&walrcv->mutex);
     400             : 
     401          12 :     TimestampDifference(lastMsgSendTime,
     402             :                         lastMsgReceiptTime,
     403             :                         &secs, &usecs);
     404             : 
     405          12 :     ms = ((int) secs * 1000) + (usecs / 1000);
     406             : 
     407          12 :     return ms;
     408             : }

Generated by: LCOV version 1.13