LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 115 128 89.8 %
Date: 2019-06-19 14:06:47 Functions: 9 9 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walreceiverfuncs.c
       4             :  *
       5             :  * This file contains functions used by the startup process to communicate
       6             :  * with the walreceiver process. Functions implementing walreceiver itself
       7             :  * are in walreceiver.c.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2019, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/walreceiverfuncs.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <sys/stat.h>
      20             : #include <sys/time.h>
      21             : #include <time.h>
      22             : #include <unistd.h>
      23             : #include <signal.h>
      24             : 
      25             : #include "access/xlog_internal.h"
      26             : #include "postmaster/startup.h"
      27             : #include "replication/walreceiver.h"
      28             : #include "storage/pmsignal.h"
      29             : #include "storage/shmem.h"
      30             : #include "utils/timestamp.h"
      31             : 
      32             : WalRcvData *WalRcv = NULL;
      33             : 
      34             : /*
      35             :  * How long to wait for walreceiver to start up after requesting
      36             :  * postmaster to launch it. In seconds.
      37             :  */
      38             : #define WALRCV_STARTUP_TIMEOUT 10
      39             : 
      40             : /* Report shared memory space needed by WalRcvShmemInit */
      41             : Size
      42        5548 : WalRcvShmemSize(void)
      43             : {
      44        5548 :     Size        size = 0;
      45             : 
      46        5548 :     size = add_size(size, sizeof(WalRcvData));
      47             : 
      48        5548 :     return size;
      49             : }
      50             : 
      51             : /* Allocate and initialize walreceiver-related shared memory */
      52             : void
      53        1848 : WalRcvShmemInit(void)
      54             : {
      55             :     bool        found;
      56             : 
      57        1848 :     WalRcv = (WalRcvData *)
      58        1848 :         ShmemInitStruct("Wal Receiver Ctl", WalRcvShmemSize(), &found);
      59             : 
      60        1848 :     if (!found)
      61             :     {
      62             :         /* First time through, so initialize */
      63        1848 :         MemSet(WalRcv, 0, WalRcvShmemSize());
      64        1848 :         WalRcv->walRcvState = WALRCV_STOPPED;
      65        1848 :         SpinLockInit(&WalRcv->mutex);
      66        1848 :         WalRcv->latch = NULL;
      67             :     }
      68        1848 : }
      69             : 
      70             : /* Is walreceiver running (or starting up)? */
      71             : bool
      72        1194 : WalRcvRunning(void)
      73             : {
      74        1194 :     WalRcvData *walrcv = WalRcv;
      75             :     WalRcvState state;
      76             :     pg_time_t   startTime;
      77             : 
      78        1194 :     SpinLockAcquire(&walrcv->mutex);
      79             : 
      80        1194 :     state = walrcv->walRcvState;
      81        1194 :     startTime = walrcv->startTime;
      82             : 
      83        1194 :     SpinLockRelease(&walrcv->mutex);
      84             : 
      85             :     /*
      86             :      * If it has taken too long for walreceiver to start up, give up. Setting
      87             :      * the state to STOPPED ensures that if walreceiver later does start up
      88             :      * after all, it will see that it's not supposed to be running and die
      89             :      * without doing anything.
      90             :      */
      91        1194 :     if (state == WALRCV_STARTING)
      92             :     {
      93           0 :         pg_time_t   now = (pg_time_t) time(NULL);
      94             : 
      95           0 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
      96             :         {
      97           0 :             SpinLockAcquire(&walrcv->mutex);
      98             : 
      99           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     100           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     101             : 
     102           0 :             SpinLockRelease(&walrcv->mutex);
     103             :         }
     104             :     }
     105             : 
     106        1194 :     if (state != WALRCV_STOPPED)
     107          24 :         return true;
     108             :     else
     109        1170 :         return false;
     110             : }
     111             : 
     112             : /*
     113             :  * Is walreceiver running and streaming (or at least attempting to connect,
     114             :  * or starting up)?
     115             :  */
     116             : bool
     117        1290 : WalRcvStreaming(void)
     118             : {
     119        1290 :     WalRcvData *walrcv = WalRcv;
     120             :     WalRcvState state;
     121             :     pg_time_t   startTime;
     122             : 
     123        1290 :     SpinLockAcquire(&walrcv->mutex);
     124             : 
     125        1290 :     state = walrcv->walRcvState;
     126        1290 :     startTime = walrcv->startTime;
     127             : 
     128        1290 :     SpinLockRelease(&walrcv->mutex);
     129             : 
     130             :     /*
     131             :      * If it has taken too long for walreceiver to start up, give up. Setting
     132             :      * the state to STOPPED ensures that if walreceiver later does start up
     133             :      * after all, it will see that it's not supposed to be running and die
     134             :      * without doing anything.
     135             :      */
     136        1290 :     if (state == WALRCV_STARTING)
     137             :     {
     138         246 :         pg_time_t   now = (pg_time_t) time(NULL);
     139             : 
     140         246 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     141             :         {
     142           0 :             SpinLockAcquire(&walrcv->mutex);
     143             : 
     144           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     145           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     146             : 
     147           0 :             SpinLockRelease(&walrcv->mutex);
     148             :         }
     149             :     }
     150             : 
     151        1290 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
     152             :         state == WALRCV_RESTARTING)
     153        1046 :         return true;
     154             :     else
     155         244 :         return false;
     156             : }
     157             : 
     158             : /*
     159             :  * Stop walreceiver (if running) and wait for it to die.
     160             :  * Executed by the Startup process.
     161             :  */
     162             : void
     163        1170 : ShutdownWalRcv(void)
     164             : {
     165        1170 :     WalRcvData *walrcv = WalRcv;
     166        1170 :     pid_t       walrcvpid = 0;
     167             : 
     168             :     /*
     169             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     170             :      * mode once it's finished, and will also request postmaster to not
     171             :      * restart itself.
     172             :      */
     173        1170 :     SpinLockAcquire(&walrcv->mutex);
     174        1170 :     switch (walrcv->walRcvState)
     175             :     {
     176             :         case WALRCV_STOPPED:
     177        1142 :             break;
     178             :         case WALRCV_STARTING:
     179           4 :             walrcv->walRcvState = WALRCV_STOPPED;
     180           4 :             break;
     181             : 
     182             :         case WALRCV_STREAMING:
     183             :         case WALRCV_WAITING:
     184             :         case WALRCV_RESTARTING:
     185          24 :             walrcv->walRcvState = WALRCV_STOPPING;
     186             :             /* fall through */
     187             :         case WALRCV_STOPPING:
     188          24 :             walrcvpid = walrcv->pid;
     189          24 :             break;
     190             :     }
     191        1170 :     SpinLockRelease(&walrcv->mutex);
     192             : 
     193             :     /*
     194             :      * Signal walreceiver process if it was still running.
     195             :      */
     196        1170 :     if (walrcvpid != 0)
     197          24 :         kill(walrcvpid, SIGTERM);
     198             : 
     199             :     /*
     200             :      * Wait for walreceiver to acknowledge its death by setting state to
     201             :      * WALRCV_STOPPED.
     202             :      */
     203        2364 :     while (WalRcvRunning())
     204             :     {
     205             :         /*
     206             :          * This possibly-long loop needs to handle interrupts of startup
     207             :          * process.
     208             :          */
     209          24 :         HandleStartupProcInterrupts();
     210             : 
     211          24 :         pg_usleep(100000);      /* 100ms */
     212             :     }
     213        1170 : }
     214             : 
     215             : /*
     216             :  * Request postmaster to start walreceiver.
     217             :  *
     218             :  * recptr indicates the position where streaming should begin, conninfo
     219             :  * is a libpq connection string to use, and slotname is, optionally, the name
     220             :  * of a replication slot to acquire.
     221             :  */
     222             : void
     223         144 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     224             :                      const char *slotname)
     225             : {
     226         144 :     WalRcvData *walrcv = WalRcv;
     227         144 :     bool        launch = false;
     228         144 :     pg_time_t   now = (pg_time_t) time(NULL);
     229             :     Latch      *latch;
     230             : 
     231             :     /*
     232             :      * We always start at the beginning of the segment. That prevents a broken
     233             :      * segment (i.e., with no records in the first half of a segment) from
     234             :      * being created by XLOG streaming, which might cause trouble later on if
     235             :      * the segment is e.g archived.
     236             :      */
     237         144 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     238         144 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     239             : 
     240         144 :     SpinLockAcquire(&walrcv->mutex);
     241             : 
     242             :     /* It better be stopped if we try to restart it */
     243             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     244             :            walrcv->walRcvState == WALRCV_WAITING);
     245             : 
     246         144 :     if (conninfo != NULL)
     247         144 :         strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
     248             :     else
     249           0 :         walrcv->conninfo[0] = '\0';
     250             : 
     251         144 :     if (slotname != NULL)
     252         144 :         strlcpy((char *) walrcv->slotname, slotname, NAMEDATALEN);
     253             :     else
     254           0 :         walrcv->slotname[0] = '\0';
     255             : 
     256         144 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     257             :     {
     258         138 :         launch = true;
     259         138 :         walrcv->walRcvState = WALRCV_STARTING;
     260             :     }
     261             :     else
     262           6 :         walrcv->walRcvState = WALRCV_RESTARTING;
     263         144 :     walrcv->startTime = now;
     264             : 
     265             :     /*
     266             :      * If this is the first startup of walreceiver (on this timeline),
     267             :      * initialize receivedUpto and latestChunkStart to the starting point.
     268             :      */
     269         144 :     if (walrcv->receiveStart == 0 || walrcv->receivedTLI != tli)
     270             :     {
     271          70 :         walrcv->receivedUpto = recptr;
     272          70 :         walrcv->receivedTLI = tli;
     273          70 :         walrcv->latestChunkStart = recptr;
     274             :     }
     275         144 :     walrcv->receiveStart = recptr;
     276         144 :     walrcv->receiveStartTLI = tli;
     277             : 
     278         144 :     latch = walrcv->latch;
     279             : 
     280         144 :     SpinLockRelease(&walrcv->mutex);
     281             : 
     282         144 :     if (launch)
     283         138 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     284           6 :     else if (latch)
     285           6 :         SetLatch(latch);
     286         144 : }
     287             : 
     288             : /*
     289             :  * Returns the last+1 byte position that walreceiver has written.
     290             :  *
     291             :  * Optionally, returns the previous chunk start, that is the first byte
     292             :  * written in the most recent walreceiver flush cycle.  Callers not
     293             :  * interested in that value may pass NULL for latestChunkStart. Same for
     294             :  * receiveTLI.
     295             :  */
     296             : XLogRecPtr
     297        1262 : GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     298             : {
     299        1262 :     WalRcvData *walrcv = WalRcv;
     300             :     XLogRecPtr  recptr;
     301             : 
     302        1262 :     SpinLockAcquire(&walrcv->mutex);
     303        1262 :     recptr = walrcv->receivedUpto;
     304        1262 :     if (latestChunkStart)
     305         920 :         *latestChunkStart = walrcv->latestChunkStart;
     306        1262 :     if (receiveTLI)
     307        1244 :         *receiveTLI = walrcv->receivedTLI;
     308        1262 :     SpinLockRelease(&walrcv->mutex);
     309             : 
     310        1262 :     return recptr;
     311             : }
     312             : 
     313             : /*
     314             :  * Returns the replication apply delay in ms or -1
     315             :  * if the apply delay info is not available
     316             :  */
     317             : int
     318          16 : GetReplicationApplyDelay(void)
     319             : {
     320          16 :     WalRcvData *walrcv = WalRcv;
     321             :     XLogRecPtr  receivePtr;
     322             :     XLogRecPtr  replayPtr;
     323             : 
     324             :     long        secs;
     325             :     int         usecs;
     326             : 
     327             :     TimestampTz chunkReplayStartTime;
     328             : 
     329          16 :     SpinLockAcquire(&walrcv->mutex);
     330          16 :     receivePtr = walrcv->receivedUpto;
     331          16 :     SpinLockRelease(&walrcv->mutex);
     332             : 
     333          16 :     replayPtr = GetXLogReplayRecPtr(NULL);
     334             : 
     335          16 :     if (receivePtr == replayPtr)
     336          14 :         return 0;
     337             : 
     338           2 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     339             : 
     340           2 :     if (chunkReplayStartTime == 0)
     341           0 :         return -1;
     342             : 
     343           2 :     TimestampDifference(chunkReplayStartTime,
     344             :                         GetCurrentTimestamp(),
     345             :                         &secs, &usecs);
     346             : 
     347           2 :     return (((int) secs * 1000) + (usecs / 1000));
     348             : }
     349             : 
     350             : /*
     351             :  * Returns the network latency in ms, note that this includes any
     352             :  * difference in clock settings between the servers, as well as timezone.
     353             :  */
     354             : int
     355          16 : GetReplicationTransferLatency(void)
     356             : {
     357          16 :     WalRcvData *walrcv = WalRcv;
     358             : 
     359             :     TimestampTz lastMsgSendTime;
     360             :     TimestampTz lastMsgReceiptTime;
     361             : 
     362          16 :     long        secs = 0;
     363          16 :     int         usecs = 0;
     364             :     int         ms;
     365             : 
     366          16 :     SpinLockAcquire(&walrcv->mutex);
     367          16 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     368          16 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     369          16 :     SpinLockRelease(&walrcv->mutex);
     370             : 
     371          16 :     TimestampDifference(lastMsgSendTime,
     372             :                         lastMsgReceiptTime,
     373             :                         &secs, &usecs);
     374             : 
     375          16 :     ms = ((int) secs * 1000) + (usecs / 1000);
     376             : 
     377          16 :     return ms;
     378             : }

Generated by: LCOV version 1.13