LCOV - code coverage report
Current view: top level - src/backend/replication - walreceiverfuncs.c (source / functions) Coverage Total Hit
Test: PostgreSQL 20devel Lines: 84.8 % 151 128
Test Date: 2026-07-03 19:57:34 Functions: 90.9 % 11 10
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
Branches: 68.5 % 73 50

             Branch data     Line data    Source code
       1                 :             : /*-------------------------------------------------------------------------
       2                 :             :  *
       3                 :             :  * walreceiverfuncs.c
       4                 :             :  *
       5                 :             :  * This file contains functions used by the startup process to communicate
       6                 :             :  * with the walreceiver process. Functions implementing walreceiver itself
       7                 :             :  * are in walreceiver.c.
       8                 :             :  *
       9                 :             :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      10                 :             :  *
      11                 :             :  *
      12                 :             :  * IDENTIFICATION
      13                 :             :  *    src/backend/replication/walreceiverfuncs.c
      14                 :             :  *
      15                 :             :  *-------------------------------------------------------------------------
      16                 :             :  */
      17                 :             : #include "postgres.h"
      18                 :             : 
      19                 :             : #include <sys/stat.h>
      20                 :             : #include <sys/time.h>
      21                 :             : #include <time.h>
      22                 :             : #include <unistd.h>
      23                 :             : #include <signal.h>
      24                 :             : 
      25                 :             : #include "access/xlog_internal.h"
      26                 :             : #include "access/xlogrecovery.h"
      27                 :             : #include "pgstat.h"
      28                 :             : #include "replication/walreceiver.h"
      29                 :             : #include "storage/pmsignal.h"
      30                 :             : #include "storage/proc.h"
      31                 :             : #include "storage/shmem.h"
      32                 :             : #include "storage/subsystems.h"
      33                 :             : #include "utils/timestamp.h"
      34                 :             : #include "utils/wait_event.h"
      35                 :             : 
      36                 :             : WalRcvData *WalRcv = NULL;
      37                 :             : 
      38                 :             : static void WalRcvShmemRequest(void *arg);
      39                 :             : static void WalRcvShmemInit(void *arg);
      40                 :             : 
      41                 :             : const ShmemCallbacks WalRcvShmemCallbacks = {
      42                 :             :     .request_fn = WalRcvShmemRequest,
      43                 :             :     .init_fn = WalRcvShmemInit,
      44                 :             : };
      45                 :             : 
      46                 :             : /*
      47                 :             :  * How long to wait for walreceiver to start up after requesting
      48                 :             :  * postmaster to launch it. In seconds.
      49                 :             :  */
      50                 :             : #define WALRCV_STARTUP_TIMEOUT 10
      51                 :             : 
      52                 :             : /* Register shared memory space needed by walreceiver */
      53                 :             : static void
      54                 :        1249 : WalRcvShmemRequest(void *arg)
      55                 :             : {
      56                 :        1249 :     ShmemRequestStruct(.name = "Wal Receiver Ctl",
      57                 :             :                        .size = sizeof(WalRcvData),
      58                 :             :                        .ptr = (void **) &WalRcv,
      59                 :             :         );
      60                 :        1249 : }
      61                 :             : 
      62                 :             : /* Initialize walreceiver-related shared memory */
      63                 :             : static void
      64                 :        1246 : WalRcvShmemInit(void *arg)
      65                 :             : {
      66   [ +  -  +  -  :        1246 :     MemSet(WalRcv, 0, sizeof(WalRcvData));
          +  -  -  +  -  
                      - ]
      67                 :        1246 :     WalRcv->walRcvState = WALRCV_STOPPED;
      68                 :        1246 :     ConditionVariableInit(&WalRcv->walRcvStoppedCV);
      69                 :        1246 :     SpinLockInit(&WalRcv->mutex);
      70                 :        1246 :     pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
      71                 :        1246 :     WalRcv->procno = INVALID_PROC_NUMBER;
      72                 :        1246 : }
      73                 :             : 
      74                 :             : /* Is walreceiver running (or starting up)? */
      75                 :             : bool
      76                 :        1141 : WalRcvRunning(void)
      77                 :             : {
      78                 :        1141 :     WalRcvData *walrcv = WalRcv;
      79                 :             :     WalRcvState state;
      80                 :             :     pg_time_t   startTime;
      81                 :             : 
      82                 :        1141 :     SpinLockAcquire(&walrcv->mutex);
      83                 :             : 
      84                 :        1141 :     state = walrcv->walRcvState;
      85                 :        1141 :     startTime = walrcv->startTime;
      86                 :             : 
      87                 :        1141 :     SpinLockRelease(&walrcv->mutex);
      88                 :             : 
      89                 :             :     /*
      90                 :             :      * If it has taken too long for walreceiver to start up, give up. Setting
      91                 :             :      * the state to STOPPED ensures that if walreceiver later does start up
      92                 :             :      * after all, it will see that it's not supposed to be running and die
      93                 :             :      * without doing anything.
      94                 :             :      */
      95         [ +  + ]:        1141 :     if (state == WALRCV_STARTING)
      96                 :             :     {
      97                 :           1 :         pg_time_t   now = (pg_time_t) time(NULL);
      98                 :             : 
      99         [ -  + ]:           1 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     100                 :             :         {
     101                 :           0 :             bool        stopped = false;
     102                 :             : 
     103                 :           0 :             SpinLockAcquire(&walrcv->mutex);
     104         [ #  # ]:           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     105                 :             :             {
     106                 :           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     107                 :           0 :                 stopped = true;
     108                 :             :             }
     109                 :           0 :             SpinLockRelease(&walrcv->mutex);
     110                 :             : 
     111         [ #  # ]:           0 :             if (stopped)
     112                 :           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     113                 :             :         }
     114                 :             :     }
     115                 :             : 
     116         [ +  + ]:        1141 :     if (state != WALRCV_STOPPED)
     117                 :          50 :         return true;
     118                 :             :     else
     119                 :        1091 :         return false;
     120                 :             : }
     121                 :             : 
     122                 :             : /* Return the state of the walreceiver. */
     123                 :             : WalRcvState
     124                 :           0 : WalRcvGetState(void)
     125                 :             : {
     126                 :           0 :     WalRcvData *walrcv = WalRcv;
     127                 :             :     WalRcvState state;
     128                 :             : 
     129                 :           0 :     SpinLockAcquire(&walrcv->mutex);
     130                 :           0 :     state = walrcv->walRcvState;
     131                 :           0 :     SpinLockRelease(&walrcv->mutex);
     132                 :             : 
     133                 :           0 :     return state;
     134                 :             : }
     135                 :             : 
     136                 :             : /*
     137                 :             :  * Is walreceiver running and streaming (or at least attempting to connect,
     138                 :             :  * or starting up)?
     139                 :             :  */
     140                 :             : bool
     141                 :       26796 : WalRcvStreaming(void)
     142                 :             : {
     143                 :       26796 :     WalRcvData *walrcv = WalRcv;
     144                 :             :     WalRcvState state;
     145                 :             :     pg_time_t   startTime;
     146                 :             : 
     147                 :       26796 :     SpinLockAcquire(&walrcv->mutex);
     148                 :             : 
     149                 :       26796 :     state = walrcv->walRcvState;
     150                 :       26796 :     startTime = walrcv->startTime;
     151                 :             : 
     152                 :       26796 :     SpinLockRelease(&walrcv->mutex);
     153                 :             : 
     154                 :             :     /*
     155                 :             :      * If it has taken too long for walreceiver to start up, give up. Setting
     156                 :             :      * the state to STOPPED ensures that if walreceiver later does start up
     157                 :             :      * after all, it will see that it's not supposed to be running and die
     158                 :             :      * without doing anything.
     159                 :             :      */
     160         [ +  + ]:       26796 :     if (state == WALRCV_STARTING)
     161                 :             :     {
     162                 :         368 :         pg_time_t   now = (pg_time_t) time(NULL);
     163                 :             : 
     164         [ -  + ]:         368 :         if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
     165                 :             :         {
     166                 :           0 :             bool        stopped = false;
     167                 :             : 
     168                 :           0 :             SpinLockAcquire(&walrcv->mutex);
     169         [ #  # ]:           0 :             if (walrcv->walRcvState == WALRCV_STARTING)
     170                 :             :             {
     171                 :           0 :                 state = walrcv->walRcvState = WALRCV_STOPPED;
     172                 :           0 :                 stopped = true;
     173                 :             :             }
     174                 :           0 :             SpinLockRelease(&walrcv->mutex);
     175                 :             : 
     176         [ #  # ]:           0 :             if (stopped)
     177                 :           0 :                 ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     178                 :             :         }
     179                 :             :     }
     180                 :             : 
     181   [ +  +  +  +  :       26796 :     if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
                   +  + ]
     182         [ +  + ]:         373 :         state == WALRCV_CONNECTING || state == WALRCV_RESTARTING)
     183                 :       26430 :         return true;
     184                 :             :     else
     185                 :         366 :         return false;
     186                 :             : }
     187                 :             : 
     188                 :             : /*
     189                 :             :  * Stop walreceiver (if running) and wait for it to die.
     190                 :             :  * Executed by the Startup process.
     191                 :             :  */
     192                 :             : void
     193                 :        1086 : ShutdownWalRcv(void)
     194                 :             : {
     195                 :        1086 :     WalRcvData *walrcv = WalRcv;
     196                 :        1086 :     pid_t       walrcvpid = 0;
     197                 :        1086 :     bool        stopped = false;
     198                 :             : 
     199                 :             :     /*
     200                 :             :      * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
     201                 :             :      * mode once it's finished, and will also request postmaster to not
     202                 :             :      * restart itself.
     203                 :             :      */
     204                 :        1086 :     SpinLockAcquire(&walrcv->mutex);
     205   [ +  +  +  -  :        1086 :     switch (walrcv->walRcvState)
                      - ]
     206                 :             :     {
     207                 :        1039 :         case WALRCV_STOPPED:
     208                 :        1039 :             break;
     209                 :           5 :         case WALRCV_STARTING:
     210                 :           5 :             walrcv->walRcvState = WALRCV_STOPPED;
     211                 :           5 :             stopped = true;
     212                 :           5 :             break;
     213                 :             : 
     214                 :          42 :         case WALRCV_CONNECTING:
     215                 :             :         case WALRCV_STREAMING:
     216                 :             :         case WALRCV_WAITING:
     217                 :             :         case WALRCV_RESTARTING:
     218                 :          42 :             walrcv->walRcvState = WALRCV_STOPPING;
     219                 :             :             pg_fallthrough;
     220                 :          42 :         case WALRCV_STOPPING:
     221                 :          42 :             walrcvpid = walrcv->pid;
     222                 :          42 :             break;
     223                 :             :     }
     224                 :        1086 :     SpinLockRelease(&walrcv->mutex);
     225                 :             : 
     226                 :             :     /* Unnecessary but consistent. */
     227         [ +  + ]:        1086 :     if (stopped)
     228                 :           5 :         ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
     229                 :             : 
     230                 :             :     /*
     231                 :             :      * Signal walreceiver process if it was still running.
     232                 :             :      */
     233         [ +  + ]:        1086 :     if (walrcvpid != 0)
     234                 :          42 :         kill(walrcvpid, SIGTERM);
     235                 :             : 
     236                 :             :     /*
     237                 :             :      * Wait for walreceiver to acknowledge its death by setting state to
     238                 :             :      * WALRCV_STOPPED.
     239                 :             :      */
     240                 :        1086 :     ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
     241         [ +  + ]:        1128 :     while (WalRcvRunning())
     242                 :          42 :         ConditionVariableSleep(&walrcv->walRcvStoppedCV,
     243                 :             :                                WAIT_EVENT_WAL_RECEIVER_EXIT);
     244                 :        1086 :     ConditionVariableCancelSleep();
     245                 :        1086 : }
     246                 :             : 
     247                 :             : /*
     248                 :             :  * Request postmaster to start walreceiver.
     249                 :             :  *
     250                 :             :  * "recptr" indicates the position where streaming should begin.  "conninfo"
     251                 :             :  * is a libpq connection string to use.  "slotname" is, optionally, the name
     252                 :             :  * of a replication slot to acquire.  "create_temp_slot" indicates to create
     253                 :             :  * a temporary slot when no "slotname" is given.
     254                 :             :  *
     255                 :             :  * WAL receivers do not directly load GUC parameters used for the connection
     256                 :             :  * to the primary, and rely on the values passed down by the caller of this
     257                 :             :  * routine instead.  Hence, the addition of any new parameters should happen
     258                 :             :  * through this code path.
     259                 :             :  */
     260                 :             : void
     261                 :         230 : RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo,
     262                 :             :                      const char *slotname, bool create_temp_slot)
     263                 :             : {
     264                 :         230 :     WalRcvData *walrcv = WalRcv;
     265                 :         230 :     bool        launch = false;
     266                 :         230 :     pg_time_t   now = (pg_time_t) time(NULL);
     267                 :             :     ProcNumber  walrcv_proc;
     268                 :             : 
     269                 :             :     /*
     270                 :             :      * We always start at the beginning of the segment. That prevents a broken
     271                 :             :      * segment (i.e., with no records in the first half of a segment) from
     272                 :             :      * being created by XLOG streaming, which might cause trouble later on if
     273                 :             :      * the segment is e.g archived.
     274                 :             :      */
     275         [ +  - ]:         230 :     if (XLogSegmentOffset(recptr, wal_segment_size) != 0)
     276                 :         230 :         recptr -= XLogSegmentOffset(recptr, wal_segment_size);
     277                 :             : 
     278                 :         230 :     SpinLockAcquire(&walrcv->mutex);
     279                 :             : 
     280                 :             :     /* It better be stopped if we try to restart it */
     281                 :             :     Assert(walrcv->walRcvState == WALRCV_STOPPED ||
     282                 :             :            walrcv->walRcvState == WALRCV_WAITING);
     283                 :             : 
     284                 :             :     /*
     285                 :             :      * Use configured replication slot if present, and ignore the value of
     286                 :             :      * create_temp_slot as the slot name should be persistent.  Otherwise, use
     287                 :             :      * create_temp_slot to determine whether this WAL receiver should create a
     288                 :             :      * temporary slot by itself and use it, or not.
     289                 :             :      */
     290   [ +  -  +  + ]:         230 :     if (slotname != NULL && slotname[0] != '\0')
     291                 :             :     {
     292                 :          68 :         strlcpy(walrcv->slotname, slotname, NAMEDATALEN);
     293                 :          68 :         walrcv->is_temp_slot = false;
     294                 :             :     }
     295                 :             :     else
     296                 :             :     {
     297                 :         162 :         walrcv->slotname[0] = '\0';
     298                 :         162 :         walrcv->is_temp_slot = create_temp_slot;
     299                 :             :     }
     300                 :             : 
     301                 :             :     /*
     302                 :             :      * While waiting for instructions, the WAL receiver uses the same
     303                 :             :      * connection, so do not clobber the user-visible conninfo already saved.
     304                 :             :      */
     305         [ +  + ]:         230 :     if (walrcv->walRcvState == WALRCV_STOPPED)
     306                 :             :     {
     307                 :         221 :         launch = true;
     308                 :         221 :         walrcv->walRcvState = WALRCV_STARTING;
     309                 :             : 
     310         [ +  - ]:         221 :         if (conninfo != NULL)
     311                 :         221 :             strlcpy(walrcv->conninfo, conninfo, MAXCONNINFO);
     312                 :             :         else
     313                 :           0 :             walrcv->conninfo[0] = '\0';
     314                 :             :     }
     315                 :             :     else
     316                 :           9 :         walrcv->walRcvState = WALRCV_RESTARTING;
     317                 :         230 :     walrcv->startTime = now;
     318                 :             : 
     319                 :             :     /*
     320                 :             :      * If this is the first startup of walreceiver (on this timeline),
     321                 :             :      * initialize flushedUpto and latestChunkStart to the starting point.
     322                 :             :      */
     323   [ +  +  -  + ]:         230 :     if (!XLogRecPtrIsValid(walrcv->receiveStart) || walrcv->receivedTLI != tli)
     324                 :             :     {
     325                 :         118 :         walrcv->flushedUpto = recptr;
     326                 :         118 :         walrcv->receivedTLI = tli;
     327                 :         118 :         walrcv->latestChunkStart = recptr;
     328                 :             : 
     329                 :             :         /*
     330                 :             :          * Pairs with pg_atomic_read_membarrier_u64() in
     331                 :             :          * GetWalRcvWriteRecPtr().
     332                 :             :          */
     333                 :         118 :         pg_atomic_write_membarrier_u64(&walrcv->writtenUpto, recptr);
     334                 :             :     }
     335                 :         230 :     walrcv->receiveStart = recptr;
     336                 :         230 :     walrcv->receiveStartTLI = tli;
     337                 :             : 
     338                 :         230 :     walrcv_proc = walrcv->procno;
     339                 :             : 
     340                 :         230 :     SpinLockRelease(&walrcv->mutex);
     341                 :             : 
     342         [ +  + ]:         230 :     if (launch)
     343                 :         221 :         SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
     344         [ +  - ]:           9 :     else if (walrcv_proc != INVALID_PROC_NUMBER)
     345                 :           9 :         SetLatch(&GetPGProcByNumber(walrcv_proc)->procLatch);
     346                 :         230 : }
     347                 :             : 
     348                 :             : /*
     349                 :             :  * Returns the last+1 byte position that walreceiver has flushed.
     350                 :             :  *
     351                 :             :  * Optionally, returns the previous chunk start, that is the first byte
     352                 :             :  * written in the most recent walreceiver flush cycle.  Callers not
     353                 :             :  * interested in that value may pass NULL for latestChunkStart. Same for
     354                 :             :  * receiveTLI.
     355                 :             :  */
     356                 :             : XLogRecPtr
     357                 :       26052 : GetWalRcvFlushRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
     358                 :             : {
     359                 :       26052 :     WalRcvData *walrcv = WalRcv;
     360                 :             :     XLogRecPtr  recptr;
     361                 :             : 
     362                 :       26052 :     SpinLockAcquire(&walrcv->mutex);
     363                 :       26052 :     recptr = walrcv->flushedUpto;
     364         [ +  + ]:       26052 :     if (latestChunkStart)
     365                 :       24645 :         *latestChunkStart = walrcv->latestChunkStart;
     366         [ +  + ]:       26052 :     if (receiveTLI)
     367                 :       25798 :         *receiveTLI = walrcv->receivedTLI;
     368                 :       26052 :     SpinLockRelease(&walrcv->mutex);
     369                 :             : 
     370                 :       26052 :     return recptr;
     371                 :             : }
     372                 :             : 
     373                 :             : /*
     374                 :             :  * Returns the last+1 byte position that walreceiver has written.
     375                 :             :  *
     376                 :             :  * Use pg_atomic_read_membarrier_u64() to ensure that callers see up-to-date
     377                 :             :  * shared memory state, matching the barrier semantics provided by the
     378                 :             :  * spinlock in GetWalRcvFlushRecPtr() and other LSN-position functions.
     379                 :             :  */
     380                 :             : XLogRecPtr
     381                 :          50 : GetWalRcvWriteRecPtr(void)
     382                 :             : {
     383                 :          50 :     WalRcvData *walrcv = WalRcv;
     384                 :             : 
     385                 :          50 :     return pg_atomic_read_membarrier_u64(&walrcv->writtenUpto);
     386                 :             : }
     387                 :             : 
     388                 :             : /*
     389                 :             :  * Returns the replication apply delay in ms or -1
     390                 :             :  * if the apply delay info is not available
     391                 :             :  */
     392                 :             : int
     393                 :         416 : GetReplicationApplyDelay(void)
     394                 :             : {
     395                 :         416 :     WalRcvData *walrcv = WalRcv;
     396                 :             :     XLogRecPtr  receivePtr;
     397                 :             :     XLogRecPtr  replayPtr;
     398                 :             :     TimestampTz chunkReplayStartTime;
     399                 :             : 
     400                 :         416 :     SpinLockAcquire(&walrcv->mutex);
     401                 :         416 :     receivePtr = walrcv->flushedUpto;
     402                 :         416 :     SpinLockRelease(&walrcv->mutex);
     403                 :             : 
     404                 :         416 :     replayPtr = GetXLogReplayRecPtr(NULL);
     405                 :             : 
     406         [ +  + ]:         416 :     if (receivePtr == replayPtr)
     407                 :         182 :         return 0;
     408                 :             : 
     409                 :         234 :     chunkReplayStartTime = GetCurrentChunkReplayStartTime();
     410                 :             : 
     411         [ +  + ]:         234 :     if (chunkReplayStartTime == 0)
     412                 :           1 :         return -1;
     413                 :             : 
     414                 :         233 :     return TimestampDifferenceMilliseconds(chunkReplayStartTime,
     415                 :             :                                            GetCurrentTimestamp());
     416                 :             : }
     417                 :             : 
     418                 :             : /*
     419                 :             :  * Returns the network latency in ms, note that this includes any
     420                 :             :  * difference in clock settings between the servers, as well as timezone.
     421                 :             :  */
     422                 :             : int
     423                 :         416 : GetReplicationTransferLatency(void)
     424                 :             : {
     425                 :         416 :     WalRcvData *walrcv = WalRcv;
     426                 :             :     TimestampTz lastMsgSendTime;
     427                 :             :     TimestampTz lastMsgReceiptTime;
     428                 :             : 
     429                 :         416 :     SpinLockAcquire(&walrcv->mutex);
     430                 :         416 :     lastMsgSendTime = walrcv->lastMsgSendTime;
     431                 :         416 :     lastMsgReceiptTime = walrcv->lastMsgReceiptTime;
     432                 :         416 :     SpinLockRelease(&walrcv->mutex);
     433                 :             : 
     434                 :         416 :     return TimestampDifferenceMilliseconds(lastMsgSendTime,
     435                 :             :                                            lastMsgReceiptTime);
     436                 :             : }
        

Generated by: LCOV version 2.0-1