LCOV - code coverage report
Current view: top level - src/backend/replication - walsender.c (source / functions) Hit Total Coverage
Test: PostgreSQL 14devel Lines: 1036 1168 88.7 %
Date: 2020-11-27 12:05:55 Functions: 50 51 98.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * walsender.c
       4             :  *
       5             :  * The WAL sender process (walsender) is new as of Postgres 9.0. It takes
       6             :  * care of sending XLOG from the primary server to a single recipient.
       7             :  * (Note that there can be more than one walsender process concurrently.)
       8             :  * It is started by the postmaster when the walreceiver of a standby server
       9             :  * connects to the primary server and requests XLOG streaming replication.
      10             :  *
      11             :  * A walsender is similar to a regular backend, ie. there is a one-to-one
      12             :  * relationship between a connection and a walsender process, but instead
      13             :  * of processing SQL queries, it understands a small set of special
      14             :  * replication-mode commands. The START_REPLICATION command begins streaming
      15             :  * WAL to the client. While streaming, the walsender keeps reading XLOG
      16             :  * records from the disk and sends them to the standby server over the
      17             :  * COPY protocol, until either side ends the replication by exiting COPY
      18             :  * mode (or until the connection is closed).
      19             :  *
      20             :  * Normal termination is by SIGTERM, which instructs the walsender to
      21             :  * close the connection and exit(0) at the next convenient moment. Emergency
      22             :  * termination is by SIGQUIT; like any backend, the walsender will simply
      23             :  * abort and exit on SIGQUIT. A close of the connection and a FATAL error
      24             :  * are treated as not a crash but approximately normal termination;
      25             :  * the walsender will exit quickly without sending any more XLOG records.
      26             :  *
      27             :  * If the server is shut down, checkpointer sends us
      28             :  * PROCSIG_WALSND_INIT_STOPPING after all regular backends have exited.  If
      29             :  * the backend is idle or runs an SQL query this causes the backend to
      30             :  * shutdown, if logical replication is in progress all existing WAL records
      31             :  * are processed followed by a shutdown.  Otherwise this causes the walsender
      32             :  * to switch to the "stopping" state. In this state, the walsender will reject
      33             :  * any further replication commands. The checkpointer begins the shutdown
      34             :  * checkpoint once all walsenders are confirmed as stopping. When the shutdown
      35             :  * checkpoint finishes, the postmaster sends us SIGUSR2. This instructs
      36             :  * walsender to send any outstanding WAL, including the shutdown checkpoint
      37             :  * record, wait for it to be replicated to the standby, and then exit.
      38             :  *
      39             :  *
      40             :  * Portions Copyright (c) 2010-2020, PostgreSQL Global Development Group
      41             :  *
      42             :  * IDENTIFICATION
      43             :  *    src/backend/replication/walsender.c
      44             :  *
      45             :  *-------------------------------------------------------------------------
      46             :  */
      47             : #include "postgres.h"
      48             : 
      49             : #include <signal.h>
      50             : #include <unistd.h>
      51             : 
      52             : #include "access/printtup.h"
      53             : #include "access/timeline.h"
      54             : #include "access/transam.h"
      55             : #include "access/xact.h"
      56             : #include "access/xlog_internal.h"
      57             : #include "access/xlogreader.h"
      58             : #include "access/xlogutils.h"
      59             : #include "catalog/pg_authid.h"
      60             : #include "catalog/pg_type.h"
      61             : #include "commands/dbcommands.h"
      62             : #include "commands/defrem.h"
      63             : #include "funcapi.h"
      64             : #include "libpq/libpq.h"
      65             : #include "libpq/pqformat.h"
      66             : #include "miscadmin.h"
      67             : #include "nodes/replnodes.h"
      68             : #include "pgstat.h"
      69             : #include "postmaster/interrupt.h"
      70             : #include "replication/basebackup.h"
      71             : #include "replication/decode.h"
      72             : #include "replication/logical.h"
      73             : #include "replication/slot.h"
      74             : #include "replication/snapbuild.h"
      75             : #include "replication/syncrep.h"
      76             : #include "replication/walreceiver.h"
      77             : #include "replication/walsender.h"
      78             : #include "replication/walsender_private.h"
      79             : #include "storage/condition_variable.h"
      80             : #include "storage/fd.h"
      81             : #include "storage/ipc.h"
      82             : #include "storage/pmsignal.h"
      83             : #include "storage/proc.h"
      84             : #include "storage/procarray.h"
      85             : #include "tcop/dest.h"
      86             : #include "tcop/tcopprot.h"
      87             : #include "utils/acl.h"
      88             : #include "utils/builtins.h"
      89             : #include "utils/guc.h"
      90             : #include "utils/memutils.h"
      91             : #include "utils/pg_lsn.h"
      92             : #include "utils/portal.h"
      93             : #include "utils/ps_status.h"
      94             : #include "utils/timeout.h"
      95             : #include "utils/timestamp.h"
      96             : 
      97             : /*
      98             :  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
      99             :  *
     100             :  * We don't have a good idea of what a good value would be; there's some
     101             :  * overhead per message in both walsender and walreceiver, but on the other
     102             :  * hand sending large batches makes walsender less responsive to signals
     103             :  * because signals are checked only between messages.  128kB (with
     104             :  * default 8k blocks) seems like a reasonable guess for now.
     105             :  */
     106             : #define MAX_SEND_SIZE (XLOG_BLCKSZ * 16)
     107             : 
     108             : /* Array of WalSnds in shared memory */
     109             : WalSndCtlData *WalSndCtl = NULL;
     110             : 
     111             : /* My slot in the shared memory array */
     112             : WalSnd     *MyWalSnd = NULL;
     113             : 
     114             : /* Global state */
     115             : bool        am_walsender = false;   /* Am I a walsender process? */
     116             : bool        am_cascading_walsender = false; /* Am I cascading WAL to another
     117             :                                              * standby? */
     118             : bool        am_db_walsender = false;    /* Connected to a database? */
     119             : 
     120             : /* User-settable parameters for walsender */
     121             : int         max_wal_senders = 0;    /* the maximum number of concurrent
     122             :                                      * walsenders */
     123             : int         wal_sender_timeout = 60 * 1000; /* maximum time to send one WAL
     124             :                                              * data message */
     125             : bool        log_replication_commands = false;
     126             : 
     127             : /*
     128             :  * State for WalSndWakeupRequest
     129             :  */
     130             : bool        wake_wal_senders = false;
     131             : 
     132             : /*
     133             :  * xlogreader used for replication.  Note that a WAL sender doing physical
     134             :  * replication does not need xlogreader to read WAL, but it needs one to
     135             :  * keep a state of its work.
     136             :  */
     137             : static XLogReaderState *xlogreader = NULL;
     138             : 
     139             : /*
     140             :  * These variables keep track of the state of the timeline we're currently
     141             :  * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
     142             :  * the timeline is not the latest timeline on this server, and the server's
     143             :  * history forked off from that timeline at sendTimeLineValidUpto.
     144             :  */
     145             : static TimeLineID sendTimeLine = 0;
     146             : static TimeLineID sendTimeLineNextTLI = 0;
     147             : static bool sendTimeLineIsHistoric = false;
     148             : static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
     149             : 
     150             : /*
     151             :  * How far have we sent WAL already? This is also advertised in
     152             :  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
     153             :  */
     154             : static XLogRecPtr sentPtr = InvalidXLogRecPtr;
     155             : 
     156             : /* Buffers for constructing outgoing messages and processing reply messages. */
     157             : static StringInfoData output_message;
     158             : static StringInfoData reply_message;
     159             : static StringInfoData tmpbuf;
     160             : 
     161             : /* Timestamp of last ProcessRepliesIfAny(). */
     162             : static TimestampTz last_processing = 0;
     163             : 
     164             : /*
     165             :  * Timestamp of last ProcessRepliesIfAny() that saw a reply from the
     166             :  * standby. Set to 0 if wal_sender_timeout doesn't need to be active.
     167             :  */
     168             : static TimestampTz last_reply_timestamp = 0;
     169             : 
     170             : /* Have we sent a heartbeat message asking for reply, since last reply? */
     171             : static bool waiting_for_ping_response = false;
     172             : 
     173             : /*
     174             :  * While streaming WAL in Copy mode, streamingDoneSending is set to true
     175             :  * after we have sent CopyDone. We should not send any more CopyData messages
     176             :  * after that. streamingDoneReceiving is set to true when we receive CopyDone
     177             :  * from the other end. When both become true, it's time to exit Copy mode.
     178             :  */
     179             : static bool streamingDoneSending;
     180             : static bool streamingDoneReceiving;
     181             : 
     182             : /* Are we there yet? */
     183             : static bool WalSndCaughtUp = false;
     184             : 
     185             : /* Flags set by signal handlers for later service in main loop */
     186             : static volatile sig_atomic_t got_SIGUSR2 = false;
     187             : static volatile sig_atomic_t got_STOPPING = false;
     188             : 
     189             : /*
     190             :  * This is set while we are streaming. When not set
     191             :  * PROCSIG_WALSND_INIT_STOPPING signal will be handled like SIGTERM. When set,
     192             :  * the main loop is responsible for checking got_STOPPING and terminating when
     193             :  * it's set (after streaming any remaining WAL).
     194             :  */
     195             : static volatile sig_atomic_t replication_active = false;
     196             : 
     197             : static LogicalDecodingContext *logical_decoding_ctx = NULL;
     198             : 
     199             : /* A sample associating a WAL location with the time it was written. */
     200             : typedef struct
     201             : {
     202             :     XLogRecPtr  lsn;
     203             :     TimestampTz time;
     204             : } WalTimeSample;
     205             : 
     206             : /* The size of our buffer of time samples. */
     207             : #define LAG_TRACKER_BUFFER_SIZE 8192
     208             : 
     209             : /* A mechanism for tracking replication lag. */
     210             : typedef struct
     211             : {
     212             :     XLogRecPtr  last_lsn;
     213             :     WalTimeSample buffer[LAG_TRACKER_BUFFER_SIZE];
     214             :     int         write_head;
     215             :     int         read_heads[NUM_SYNC_REP_WAIT_MODE];
     216             :     WalTimeSample last_read[NUM_SYNC_REP_WAIT_MODE];
     217             : } LagTracker;
     218             : 
     219             : static LagTracker *lag_tracker;
     220             : 
     221             : /* Signal handlers */
     222             : static void WalSndLastCycleHandler(SIGNAL_ARGS);
     223             : 
     224             : /* Prototypes for private functions */
     225             : typedef void (*WalSndSendDataCallback) (void);
     226             : static void WalSndLoop(WalSndSendDataCallback send_data);
     227             : static void InitWalSenderSlot(void);
     228             : static void WalSndKill(int code, Datum arg);
     229             : static void WalSndShutdown(void) pg_attribute_noreturn();
     230             : static void XLogSendPhysical(void);
     231             : static void XLogSendLogical(void);
     232             : static void WalSndDone(WalSndSendDataCallback send_data);
     233             : static XLogRecPtr GetStandbyFlushRecPtr(void);
     234             : static void IdentifySystem(void);
     235             : static void CreateReplicationSlot(CreateReplicationSlotCmd *cmd);
     236             : static void DropReplicationSlot(DropReplicationSlotCmd *cmd);
     237             : static void StartReplication(StartReplicationCmd *cmd);
     238             : static void StartLogicalReplication(StartReplicationCmd *cmd);
     239             : static void ProcessStandbyMessage(void);
     240             : static void ProcessStandbyReplyMessage(void);
     241             : static void ProcessStandbyHSFeedbackMessage(void);
     242             : static void ProcessRepliesIfAny(void);
     243             : static void WalSndKeepalive(bool requestReply);
     244             : static void WalSndKeepaliveIfNecessary(void);
     245             : static void WalSndCheckTimeOut(void);
     246             : static long WalSndComputeSleeptime(TimestampTz now);
     247             : static void WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
     248             : static void WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write);
     249             : static void WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid);
     250             : static XLogRecPtr WalSndWaitForWal(XLogRecPtr loc);
     251             : static void LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time);
     252             : static TimeOffset LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now);
     253             : static bool TransactionIdInRecentPast(TransactionId xid, uint32 epoch);
     254             : 
     255             : static void WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
     256             :                               TimeLineID *tli_p);
     257             : 
     258             : 
     259             : /* Initialize walsender process before entering the main command loop */
     260             : void
     261         692 : InitWalSender(void)
     262             : {
     263         692 :     am_cascading_walsender = RecoveryInProgress();
     264             : 
     265             :     /* Create a per-walsender data structure in shared memory */
     266         692 :     InitWalSenderSlot();
     267             : 
     268             :     /*
     269             :      * We don't currently need any ResourceOwner in a walsender process, but
     270             :      * if we did, we could call CreateAuxProcessResourceOwner here.
     271             :      */
     272             : 
     273             :     /*
     274             :      * Let postmaster know that we're a WAL sender. Once we've declared us as
     275             :      * a WAL sender process, postmaster will let us outlive the bgwriter and
     276             :      * kill us last in the shutdown sequence, so we get a chance to stream all
     277             :      * remaining WAL at shutdown, including the shutdown checkpoint. Note that
     278             :      * there's no going back, and we mustn't write any WAL records after this.
     279             :      */
     280         692 :     MarkPostmasterChildWalSender();
     281         692 :     SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
     282             : 
     283             :     /* Initialize empty timestamp buffer for lag tracking. */
     284         692 :     lag_tracker = MemoryContextAllocZero(TopMemoryContext, sizeof(LagTracker));
     285         692 : }
     286             : 
     287             : /*
     288             :  * Clean up after an error.
     289             :  *
     290             :  * WAL sender processes don't use transactions like regular backends do.
     291             :  * This function does any cleanup required after an error in a WAL sender
     292             :  * process, similar to what transaction abort does in a regular backend.
     293             :  */
     294             : void
     295          28 : WalSndErrorCleanup(void)
     296             : {
     297          28 :     LWLockReleaseAll();
     298          28 :     ConditionVariableCancelSleep();
     299          28 :     pgstat_report_wait_end();
     300             : 
     301          28 :     if (xlogreader != NULL && xlogreader->seg.ws_file >= 0)
     302           0 :         wal_segment_close(xlogreader);
     303             : 
     304          28 :     if (MyReplicationSlot != NULL)
     305           8 :         ReplicationSlotRelease();
     306             : 
     307          28 :     ReplicationSlotCleanup();
     308             : 
     309          28 :     replication_active = false;
     310             : 
     311             :     /*
     312             :      * If there is a transaction in progress, it will clean up our
     313             :      * ResourceOwner, but if a replication command set up a resource owner
     314             :      * without a transaction, we've got to clean that up now.
     315             :      */
     316          28 :     if (!IsTransactionOrTransactionBlock())
     317          28 :         WalSndResourceCleanup(false);
     318             : 
     319          28 :     if (got_STOPPING || got_SIGUSR2)
     320           0 :         proc_exit(0);
     321             : 
     322             :     /* Revert back to startup state */
     323          28 :     WalSndSetState(WALSNDSTATE_STARTUP);
     324          28 : }
     325             : 
     326             : /*
     327             :  * Clean up any ResourceOwner we created.
     328             :  */
     329             : void
     330         166 : WalSndResourceCleanup(bool isCommit)
     331             : {
     332             :     ResourceOwner resowner;
     333             : 
     334         166 :     if (CurrentResourceOwner == NULL)
     335          16 :         return;
     336             : 
     337             :     /*
     338             :      * Deleting CurrentResourceOwner is not allowed, so we must save a pointer
     339             :      * in a local variable and clear it first.
     340             :      */
     341         150 :     resowner = CurrentResourceOwner;
     342         150 :     CurrentResourceOwner = NULL;
     343             : 
     344             :     /* Now we can release resources and delete it. */
     345         150 :     ResourceOwnerRelease(resowner,
     346             :                          RESOURCE_RELEASE_BEFORE_LOCKS, isCommit, true);
     347         150 :     ResourceOwnerRelease(resowner,
     348             :                          RESOURCE_RELEASE_LOCKS, isCommit, true);
     349         150 :     ResourceOwnerRelease(resowner,
     350             :                          RESOURCE_RELEASE_AFTER_LOCKS, isCommit, true);
     351         150 :     ResourceOwnerDelete(resowner);
     352             : }
     353             : 
     354             : /*
     355             :  * Handle a client's connection abort in an orderly manner.
     356             :  */
     357             : static void
     358           0 : WalSndShutdown(void)
     359             : {
     360             :     /*
     361             :      * Reset whereToSendOutput to prevent ereport from attempting to send any
     362             :      * more messages to the standby.
     363             :      */
     364           0 :     if (whereToSendOutput == DestRemote)
     365           0 :         whereToSendOutput = DestNone;
     366             : 
     367           0 :     proc_exit(0);
     368             :     abort();                    /* keep the compiler quiet */
     369             : }
     370             : 
     371             : /*
     372             :  * Handle the IDENTIFY_SYSTEM command.
     373             :  */
     374             : static void
     375         482 : IdentifySystem(void)
     376             : {
     377             :     char        sysid[32];
     378             :     char        xloc[MAXFNAMELEN];
     379             :     XLogRecPtr  logptr;
     380         482 :     char       *dbname = NULL;
     381             :     DestReceiver *dest;
     382             :     TupOutputState *tstate;
     383             :     TupleDesc   tupdesc;
     384             :     Datum       values[4];
     385             :     bool        nulls[4];
     386             : 
     387             :     /*
     388             :      * Reply with a result set with one row, four columns. First col is system
     389             :      * ID, second is timeline ID, third is current xlog location and the
     390             :      * fourth contains the database name if we are connected to one.
     391             :      */
     392             : 
     393         482 :     snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
     394             :              GetSystemIdentifier());
     395             : 
     396         482 :     am_cascading_walsender = RecoveryInProgress();
     397         482 :     if (am_cascading_walsender)
     398             :     {
     399             :         /* this also updates ThisTimeLineID */
     400          18 :         logptr = GetStandbyFlushRecPtr();
     401             :     }
     402             :     else
     403         464 :         logptr = GetFlushRecPtr();
     404             : 
     405         482 :     snprintf(xloc, sizeof(xloc), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
     406             : 
     407         482 :     if (MyDatabaseId != InvalidOid)
     408             :     {
     409          68 :         MemoryContext cur = CurrentMemoryContext;
     410             : 
     411             :         /* syscache access needs a transaction env. */
     412          68 :         StartTransactionCommand();
     413             :         /* make dbname live outside TX context */
     414          68 :         MemoryContextSwitchTo(cur);
     415          68 :         dbname = get_database_name(MyDatabaseId);
     416          68 :         CommitTransactionCommand();
     417             :         /* CommitTransactionCommand switches to TopMemoryContext */
     418          68 :         MemoryContextSwitchTo(cur);
     419             :     }
     420             : 
     421         482 :     dest = CreateDestReceiver(DestRemoteSimple);
     422         482 :     MemSet(nulls, false, sizeof(nulls));
     423             : 
     424             :     /* need a tuple descriptor representing four columns */
     425         482 :     tupdesc = CreateTemplateTupleDesc(4);
     426         482 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "systemid",
     427             :                               TEXTOID, -1, 0);
     428         482 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "timeline",
     429             :                               INT4OID, -1, 0);
     430         482 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "xlogpos",
     431             :                               TEXTOID, -1, 0);
     432         482 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "dbname",
     433             :                               TEXTOID, -1, 0);
     434             : 
     435             :     /* prepare for projection of tuples */
     436         482 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     437             : 
     438             :     /* column 1: system identifier */
     439         482 :     values[0] = CStringGetTextDatum(sysid);
     440             : 
     441             :     /* column 2: timeline */
     442         482 :     values[1] = Int32GetDatum(ThisTimeLineID);
     443             : 
     444             :     /* column 3: wal location */
     445         482 :     values[2] = CStringGetTextDatum(xloc);
     446             : 
     447             :     /* column 4: database name, or NULL if none */
     448         482 :     if (dbname)
     449          68 :         values[3] = CStringGetTextDatum(dbname);
     450             :     else
     451         414 :         nulls[3] = true;
     452             : 
     453             :     /* send it to dest */
     454         482 :     do_tup_output(tstate, values, nulls);
     455             : 
     456         482 :     end_tup_output(tstate);
     457         482 : }
     458             : 
     459             : 
     460             : /*
     461             :  * Handle TIMELINE_HISTORY command.
     462             :  */
     463             : static void
     464          10 : SendTimeLineHistory(TimeLineHistoryCmd *cmd)
     465             : {
     466             :     StringInfoData buf;
     467             :     char        histfname[MAXFNAMELEN];
     468             :     char        path[MAXPGPATH];
     469             :     int         fd;
     470             :     off_t       histfilelen;
     471             :     off_t       bytesleft;
     472             :     Size        len;
     473             : 
     474             :     /*
     475             :      * Reply with a result set with one row, and two columns. The first col is
     476             :      * the name of the history file, 2nd is the contents.
     477             :      */
     478             : 
     479          10 :     TLHistoryFileName(histfname, cmd->timeline);
     480          10 :     TLHistoryFilePath(path, cmd->timeline);
     481             : 
     482             :     /* Send a RowDescription message */
     483          10 :     pq_beginmessage(&buf, 'T');
     484          10 :     pq_sendint16(&buf, 2);      /* 2 fields */
     485             : 
     486             :     /* first field */
     487          10 :     pq_sendstring(&buf, "filename");  /* col name */
     488          10 :     pq_sendint32(&buf, 0);      /* table oid */
     489          10 :     pq_sendint16(&buf, 0);      /* attnum */
     490          10 :     pq_sendint32(&buf, TEXTOID);    /* type oid */
     491          10 :     pq_sendint16(&buf, -1);     /* typlen */
     492          10 :     pq_sendint32(&buf, 0);      /* typmod */
     493          10 :     pq_sendint16(&buf, 0);      /* format code */
     494             : 
     495             :     /* second field */
     496          10 :     pq_sendstring(&buf, "content"); /* col name */
     497          10 :     pq_sendint32(&buf, 0);      /* table oid */
     498          10 :     pq_sendint16(&buf, 0);      /* attnum */
     499          10 :     pq_sendint32(&buf, TEXTOID);    /* type oid */
     500          10 :     pq_sendint16(&buf, -1);     /* typlen */
     501          10 :     pq_sendint32(&buf, 0);      /* typmod */
     502          10 :     pq_sendint16(&buf, 0);      /* format code */
     503          10 :     pq_endmessage(&buf);
     504             : 
     505             :     /* Send a DataRow message */
     506          10 :     pq_beginmessage(&buf, 'D');
     507          10 :     pq_sendint16(&buf, 2);      /* # of columns */
     508          10 :     len = strlen(histfname);
     509          10 :     pq_sendint32(&buf, len);    /* col1 len */
     510          10 :     pq_sendbytes(&buf, histfname, len);
     511             : 
     512          10 :     fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
     513          10 :     if (fd < 0)
     514           0 :         ereport(ERROR,
     515             :                 (errcode_for_file_access(),
     516             :                  errmsg("could not open file \"%s\": %m", path)));
     517             : 
     518             :     /* Determine file length and send it to client */
     519          10 :     histfilelen = lseek(fd, 0, SEEK_END);
     520          10 :     if (histfilelen < 0)
     521           0 :         ereport(ERROR,
     522             :                 (errcode_for_file_access(),
     523             :                  errmsg("could not seek to end of file \"%s\": %m", path)));
     524          10 :     if (lseek(fd, 0, SEEK_SET) != 0)
     525           0 :         ereport(ERROR,
     526             :                 (errcode_for_file_access(),
     527             :                  errmsg("could not seek to beginning of file \"%s\": %m", path)));
     528             : 
     529          10 :     pq_sendint32(&buf, histfilelen);    /* col2 len */
     530             : 
     531          10 :     bytesleft = histfilelen;
     532          20 :     while (bytesleft > 0)
     533             :     {
     534             :         PGAlignedBlock rbuf;
     535             :         int         nread;
     536             : 
     537          10 :         pgstat_report_wait_start(WAIT_EVENT_WALSENDER_TIMELINE_HISTORY_READ);
     538          10 :         nread = read(fd, rbuf.data, sizeof(rbuf));
     539          10 :         pgstat_report_wait_end();
     540          10 :         if (nread < 0)
     541           0 :             ereport(ERROR,
     542             :                     (errcode_for_file_access(),
     543             :                      errmsg("could not read file \"%s\": %m",
     544             :                             path)));
     545          10 :         else if (nread == 0)
     546           0 :             ereport(ERROR,
     547             :                     (errcode(ERRCODE_DATA_CORRUPTED),
     548             :                      errmsg("could not read file \"%s\": read %d of %zu",
     549             :                             path, nread, (Size) bytesleft)));
     550             : 
     551          10 :         pq_sendbytes(&buf, rbuf.data, nread);
     552          10 :         bytesleft -= nread;
     553             :     }
     554             : 
     555          10 :     if (CloseTransientFile(fd) != 0)
     556           0 :         ereport(ERROR,
     557             :                 (errcode_for_file_access(),
     558             :                  errmsg("could not close file \"%s\": %m", path)));
     559             : 
     560          10 :     pq_endmessage(&buf);
     561          10 : }
     562             : 
     563             : /*
     564             :  * Handle START_REPLICATION command.
     565             :  *
     566             :  * At the moment, this never returns, but an ereport(ERROR) will take us back
     567             :  * to the main loop.
     568             :  */
     569             : static void
     570         256 : StartReplication(StartReplicationCmd *cmd)
     571             : {
     572             :     StringInfoData buf;
     573             :     XLogRecPtr  FlushPtr;
     574             : 
     575         256 :     if (ThisTimeLineID == 0)
     576           0 :         ereport(ERROR,
     577             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     578             :                  errmsg("IDENTIFY_SYSTEM has not been run before START_REPLICATION")));
     579             : 
     580             :     /* create xlogreader for physical replication */
     581         256 :     xlogreader =
     582         256 :         XLogReaderAllocate(wal_segment_size, NULL,
     583         256 :                            XL_ROUTINE(.segment_open = WalSndSegmentOpen,
     584             :                                       .segment_close = wal_segment_close),
     585             :                            NULL);
     586             : 
     587         256 :     if (!xlogreader)
     588           0 :         ereport(ERROR,
     589             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     590             :                  errmsg("out of memory")));
     591             : 
     592             :     /*
     593             :      * We assume here that we're logging enough information in the WAL for
     594             :      * log-shipping, since this is checked in PostmasterMain().
     595             :      *
     596             :      * NOTE: wal_level can only change at shutdown, so in most cases it is
     597             :      * difficult for there to be WAL data that we can still see that was
     598             :      * written at wal_level='minimal'.
     599             :      */
     600             : 
     601         256 :     if (cmd->slotname)
     602             :     {
     603         158 :         (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
     604         156 :         if (SlotIsLogical(MyReplicationSlot))
     605           0 :             ereport(ERROR,
     606             :                     (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
     607             :                      errmsg("cannot use a logical replication slot for physical replication")));
     608             : 
     609             :         /*
     610             :          * We don't need to verify the slot's restart_lsn here; instead we
     611             :          * rely on the caller requesting the starting point to use.  If the
     612             :          * WAL segment doesn't exist, we'll fail later.
     613             :          */
     614             :     }
     615             : 
     616             :     /*
     617             :      * Select the timeline. If it was given explicitly by the client, use
     618             :      * that. Otherwise use the timeline of the last replayed record, which is
     619             :      * kept in ThisTimeLineID.
     620             :      */
     621         254 :     if (am_cascading_walsender)
     622             :     {
     623             :         /* this also updates ThisTimeLineID */
     624          12 :         FlushPtr = GetStandbyFlushRecPtr();
     625             :     }
     626             :     else
     627         242 :         FlushPtr = GetFlushRecPtr();
     628             : 
     629         254 :     if (cmd->timeline != 0)
     630             :     {
     631             :         XLogRecPtr  switchpoint;
     632             : 
     633         254 :         sendTimeLine = cmd->timeline;
     634         254 :         if (sendTimeLine == ThisTimeLineID)
     635             :         {
     636         244 :             sendTimeLineIsHistoric = false;
     637         244 :             sendTimeLineValidUpto = InvalidXLogRecPtr;
     638             :         }
     639             :         else
     640             :         {
     641             :             List       *timeLineHistory;
     642             : 
     643          10 :             sendTimeLineIsHistoric = true;
     644             : 
     645             :             /*
     646             :              * Check that the timeline the client requested exists, and the
     647             :              * requested start location is on that timeline.
     648             :              */
     649          10 :             timeLineHistory = readTimeLineHistory(ThisTimeLineID);
     650          10 :             switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory,
     651             :                                          &sendTimeLineNextTLI);
     652          10 :             list_free_deep(timeLineHistory);
     653             : 
     654             :             /*
     655             :              * Found the requested timeline in the history. Check that
     656             :              * requested startpoint is on that timeline in our history.
     657             :              *
     658             :              * This is quite loose on purpose. We only check that we didn't
     659             :              * fork off the requested timeline before the switchpoint. We
     660             :              * don't check that we switched *to* it before the requested
     661             :              * starting point. This is because the client can legitimately
     662             :              * request to start replication from the beginning of the WAL
     663             :              * segment that contains switchpoint, but on the new timeline, so
     664             :              * that it doesn't end up with a partial segment. If you ask for
     665             :              * too old a starting point, you'll get an error later when we
     666             :              * fail to find the requested WAL segment in pg_wal.
     667             :              *
     668             :              * XXX: we could be more strict here and only allow a startpoint
     669             :              * that's older than the switchpoint, if it's still in the same
     670             :              * WAL segment.
     671             :              */
     672          10 :             if (!XLogRecPtrIsInvalid(switchpoint) &&
     673          10 :                 switchpoint < cmd->startpoint)
     674             :             {
     675           0 :                 ereport(ERROR,
     676             :                         (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
     677             :                                 (uint32) (cmd->startpoint >> 32),
     678             :                                 (uint32) (cmd->startpoint),
     679             :                                 cmd->timeline),
     680             :                          errdetail("This server's history forked from timeline %u at %X/%X.",
     681             :                                    cmd->timeline,
     682             :                                    (uint32) (switchpoint >> 32),
     683             :                                    (uint32) (switchpoint))));
     684             :             }
     685          10 :             sendTimeLineValidUpto = switchpoint;
     686             :         }
     687             :     }
     688             :     else
     689             :     {
     690           0 :         sendTimeLine = ThisTimeLineID;
     691           0 :         sendTimeLineValidUpto = InvalidXLogRecPtr;
     692           0 :         sendTimeLineIsHistoric = false;
     693             :     }
     694             : 
     695         254 :     streamingDoneSending = streamingDoneReceiving = false;
     696             : 
     697             :     /* If there is nothing to stream, don't even enter COPY mode */
     698         254 :     if (!sendTimeLineIsHistoric || cmd->startpoint < sendTimeLineValidUpto)
     699             :     {
     700             :         /*
     701             :          * When we first start replication the standby will be behind the
     702             :          * primary. For some applications, for example synchronous
     703             :          * replication, it is important to have a clear state for this initial
     704             :          * catchup mode, so we can trigger actions when we change streaming
     705             :          * state later. We may stay in this state for a long time, which is
     706             :          * exactly why we want to be able to monitor whether or not we are
     707             :          * still here.
     708             :          */
     709         254 :         WalSndSetState(WALSNDSTATE_CATCHUP);
     710             : 
     711             :         /* Send a CopyBothResponse message, and start streaming */
     712         254 :         pq_beginmessage(&buf, 'W');
     713         254 :         pq_sendbyte(&buf, 0);
     714         254 :         pq_sendint16(&buf, 0);
     715         254 :         pq_endmessage(&buf);
     716         254 :         pq_flush();
     717             : 
     718             :         /*
     719             :          * Don't allow a request to stream from a future point in WAL that
     720             :          * hasn't been flushed to disk in this server yet.
     721             :          */
     722         254 :         if (FlushPtr < cmd->startpoint)
     723             :         {
     724           0 :             ereport(ERROR,
     725             :                     (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
     726             :                             (uint32) (cmd->startpoint >> 32),
     727             :                             (uint32) (cmd->startpoint),
     728             :                             (uint32) (FlushPtr >> 32),
     729             :                             (uint32) (FlushPtr))));
     730             :         }
     731             : 
     732             :         /* Start streaming from the requested point */
     733         254 :         sentPtr = cmd->startpoint;
     734             : 
     735             :         /* Initialize shared memory status, too */
     736         254 :         SpinLockAcquire(&MyWalSnd->mutex);
     737         254 :         MyWalSnd->sentPtr = sentPtr;
     738         254 :         SpinLockRelease(&MyWalSnd->mutex);
     739             : 
     740         254 :         SyncRepInitConfig();
     741             : 
     742             :         /* Main loop of walsender */
     743         254 :         replication_active = true;
     744             : 
     745         254 :         WalSndLoop(XLogSendPhysical);
     746             : 
     747         144 :         replication_active = false;
     748         144 :         if (got_STOPPING)
     749           0 :             proc_exit(0);
     750         144 :         WalSndSetState(WALSNDSTATE_STARTUP);
     751             : 
     752             :         Assert(streamingDoneSending && streamingDoneReceiving);
     753             :     }
     754             : 
     755         144 :     if (cmd->slotname)
     756         130 :         ReplicationSlotRelease();
     757             : 
     758             :     /*
     759             :      * Copy is finished now. Send a single-row result set indicating the next
     760             :      * timeline.
     761             :      */
     762         144 :     if (sendTimeLineIsHistoric)
     763             :     {
     764             :         char        startpos_str[8 + 1 + 8 + 1];
     765             :         DestReceiver *dest;
     766             :         TupOutputState *tstate;
     767             :         TupleDesc   tupdesc;
     768             :         Datum       values[2];
     769             :         bool        nulls[2];
     770             : 
     771          20 :         snprintf(startpos_str, sizeof(startpos_str), "%X/%X",
     772          10 :                  (uint32) (sendTimeLineValidUpto >> 32),
     773             :                  (uint32) sendTimeLineValidUpto);
     774             : 
     775          10 :         dest = CreateDestReceiver(DestRemoteSimple);
     776          10 :         MemSet(nulls, false, sizeof(nulls));
     777             : 
     778             :         /*
     779             :          * Need a tuple descriptor representing two columns. int8 may seem
     780             :          * like a surprising data type for this, but in theory int4 would not
     781             :          * be wide enough for this, as TimeLineID is unsigned.
     782             :          */
     783          10 :         tupdesc = CreateTemplateTupleDesc(2);
     784          10 :         TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "next_tli",
     785             :                                   INT8OID, -1, 0);
     786          10 :         TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "next_tli_startpos",
     787             :                                   TEXTOID, -1, 0);
     788             : 
     789             :         /* prepare for projection of tuple */
     790          10 :         tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
     791             : 
     792          10 :         values[0] = Int64GetDatum((int64) sendTimeLineNextTLI);
     793          10 :         values[1] = CStringGetTextDatum(startpos_str);
     794             : 
     795             :         /* send it to dest */
     796          10 :         do_tup_output(tstate, values, nulls);
     797             : 
     798          10 :         end_tup_output(tstate);
     799             :     }
     800             : 
     801             :     /* Send CommandComplete message */
     802         144 :     EndReplicationCommand("START_STREAMING");
     803         144 : }
     804             : 
     805             : /*
     806             :  * XLogReaderRoutine->page_read callback for logical decoding contexts, as a
     807             :  * walsender process.
     808             :  *
     809             :  * Inside the walsender we can do better than read_local_xlog_page,
     810             :  * which has to do a plain sleep/busy loop, because the walsender's latch gets
     811             :  * set every time WAL is flushed.
     812             :  */
     813             : static int
     814       15024 : logical_read_xlog_page(XLogReaderState *state, XLogRecPtr targetPagePtr, int reqLen,
     815             :                        XLogRecPtr targetRecPtr, char *cur_page)
     816             : {
     817             :     XLogRecPtr  flushptr;
     818             :     int         count;
     819             :     WALReadError errinfo;
     820             :     XLogSegNo   segno;
     821             : 
     822       15024 :     XLogReadDetermineTimeline(state, targetPagePtr, reqLen);
     823       15024 :     sendTimeLineIsHistoric = (state->currTLI != ThisTimeLineID);
     824       15024 :     sendTimeLine = state->currTLI;
     825       15024 :     sendTimeLineValidUpto = state->currTLIValidUntil;
     826       15024 :     sendTimeLineNextTLI = state->nextTLI;
     827             : 
     828             :     /* make sure we have enough WAL available */
     829       15024 :     flushptr = WalSndWaitForWal(targetPagePtr + reqLen);
     830             : 
     831             :     /* fail if not (implies we are going to shut down) */
     832       14968 :     if (flushptr < targetPagePtr + reqLen)
     833        5650 :         return -1;
     834             : 
     835        9318 :     if (targetPagePtr + XLOG_BLCKSZ <= flushptr)
     836        8522 :         count = XLOG_BLCKSZ;    /* more than one block available */
     837             :     else
     838         796 :         count = flushptr - targetPagePtr;   /* part of the page available */
     839             : 
     840             :     /* now actually read the data, we know it's there */
     841        9318 :     if (!WALRead(state,
     842             :                  cur_page,
     843             :                  targetPagePtr,
     844             :                  XLOG_BLCKSZ,
     845             :                  state->seg.ws_tli, /* Pass the current TLI because only
     846             :                                      * WalSndSegmentOpen controls whether new
     847             :                                      * TLI is needed. */
     848             :                  &errinfo))
     849           0 :         WALReadRaiseError(&errinfo);
     850             : 
     851             :     /*
     852             :      * After reading into the buffer, check that what we read was valid. We do
     853             :      * this after reading, because even though the segment was present when we
     854             :      * opened it, it might get recycled or removed while we read it. The
     855             :      * read() succeeds in that case, but the data we tried to read might
     856             :      * already have been overwritten with new WAL records.
     857             :      */
     858        9318 :     XLByteToSeg(targetPagePtr, segno, state->segcxt.ws_segsize);
     859        9318 :     CheckXLogRemoved(segno, state->seg.ws_tli);
     860             : 
     861        9318 :     return count;
     862             : }
     863             : 
     864             : /*
     865             :  * Process extra options given to CREATE_REPLICATION_SLOT.
     866             :  */
     867             : static void
     868         312 : parseCreateReplSlotOptions(CreateReplicationSlotCmd *cmd,
     869             :                            bool *reserve_wal,
     870             :                            CRSSnapshotAction *snapshot_action)
     871             : {
     872             :     ListCell   *lc;
     873         312 :     bool        snapshot_action_given = false;
     874         312 :     bool        reserve_wal_given = false;
     875             : 
     876             :     /* Parse options */
     877         622 :     foreach(lc, cmd->options)
     878             :     {
     879         310 :         DefElem    *defel = (DefElem *) lfirst(lc);
     880             : 
     881         310 :         if (strcmp(defel->defname, "export_snapshot") == 0)
     882             :         {
     883          54 :             if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
     884           0 :                 ereport(ERROR,
     885             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     886             :                          errmsg("conflicting or redundant options")));
     887             : 
     888          54 :             snapshot_action_given = true;
     889          54 :             *snapshot_action = defGetBoolean(defel) ? CRS_EXPORT_SNAPSHOT :
     890             :                 CRS_NOEXPORT_SNAPSHOT;
     891             :         }
     892         256 :         else if (strcmp(defel->defname, "use_snapshot") == 0)
     893             :         {
     894         120 :             if (snapshot_action_given || cmd->kind != REPLICATION_KIND_LOGICAL)
     895           0 :                 ereport(ERROR,
     896             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     897             :                          errmsg("conflicting or redundant options")));
     898             : 
     899         120 :             snapshot_action_given = true;
     900         120 :             *snapshot_action = CRS_USE_SNAPSHOT;
     901             :         }
     902         136 :         else if (strcmp(defel->defname, "reserve_wal") == 0)
     903             :         {
     904         136 :             if (reserve_wal_given || cmd->kind != REPLICATION_KIND_PHYSICAL)
     905           0 :                 ereport(ERROR,
     906             :                         (errcode(ERRCODE_SYNTAX_ERROR),
     907             :                          errmsg("conflicting or redundant options")));
     908             : 
     909         136 :             reserve_wal_given = true;
     910         136 :             *reserve_wal = true;
     911             :         }
     912             :         else
     913           0 :             elog(ERROR, "unrecognized option: %s", defel->defname);
     914             :     }
     915         312 : }
     916             : 
     917             : /*
     918             :  * Create a new replication slot.
     919             :  */
     920             : static void
     921         312 : CreateReplicationSlot(CreateReplicationSlotCmd *cmd)
     922             : {
     923         312 :     const char *snapshot_name = NULL;
     924             :     char        xloc[MAXFNAMELEN];
     925             :     char       *slot_name;
     926         312 :     bool        reserve_wal = false;
     927         312 :     CRSSnapshotAction snapshot_action = CRS_EXPORT_SNAPSHOT;
     928             :     DestReceiver *dest;
     929             :     TupOutputState *tstate;
     930             :     TupleDesc   tupdesc;
     931             :     Datum       values[4];
     932             :     bool        nulls[4];
     933             : 
     934             :     Assert(!MyReplicationSlot);
     935             : 
     936         312 :     parseCreateReplSlotOptions(cmd, &reserve_wal, &snapshot_action);
     937             : 
     938             :     /* setup state for WalSndSegmentOpen */
     939         312 :     sendTimeLineIsHistoric = false;
     940         312 :     sendTimeLine = ThisTimeLineID;
     941             : 
     942         312 :     if (cmd->kind == REPLICATION_KIND_PHYSICAL)
     943             :     {
     944         138 :         ReplicationSlotCreate(cmd->slotname, false,
     945         138 :                               cmd->temporary ? RS_TEMPORARY : RS_PERSISTENT);
     946             :     }
     947             :     else
     948             :     {
     949         174 :         CheckLogicalDecodingRequirements();
     950             : 
     951             :         /*
     952             :          * Initially create persistent slot as ephemeral - that allows us to
     953             :          * nicely handle errors during initialization because it'll get
     954             :          * dropped if this transaction fails. We'll make it persistent at the
     955             :          * end. Temporary slots can be created as temporary from beginning as
     956             :          * they get dropped on error as well.
     957             :          */
     958         174 :         ReplicationSlotCreate(cmd->slotname, true,
     959         174 :                               cmd->temporary ? RS_TEMPORARY : RS_EPHEMERAL);
     960             :     }
     961             : 
     962         310 :     if (cmd->kind == REPLICATION_KIND_LOGICAL)
     963             :     {
     964             :         LogicalDecodingContext *ctx;
     965         174 :         bool        need_full_snapshot = false;
     966             : 
     967             :         /*
     968             :          * Do options check early so that we can bail before calling the
     969             :          * DecodingContextFindStartpoint which can take long time.
     970             :          */
     971         174 :         if (snapshot_action == CRS_EXPORT_SNAPSHOT)
     972             :         {
     973           0 :             if (IsTransactionBlock())
     974           0 :                 ereport(ERROR,
     975             :                 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
     976             :                         (errmsg("%s must not be called inside a transaction",
     977             :                                 "CREATE_REPLICATION_SLOT ... EXPORT_SNAPSHOT")));
     978             : 
     979           0 :             need_full_snapshot = true;
     980             :         }
     981         174 :         else if (snapshot_action == CRS_USE_SNAPSHOT)
     982             :         {
     983         120 :             if (!IsTransactionBlock())
     984           0 :                 ereport(ERROR,
     985             :                 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
     986             :                         (errmsg("%s must be called inside a transaction",
     987             :                                 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
     988             : 
     989         120 :             if (XactIsoLevel != XACT_REPEATABLE_READ)
     990           0 :                 ereport(ERROR,
     991             :                 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
     992             :                         (errmsg("%s must be called in REPEATABLE READ isolation mode transaction",
     993             :                                 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
     994             : 
     995         120 :             if (FirstSnapshotSet)
     996           0 :                 ereport(ERROR,
     997             :                 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
     998             :                         (errmsg("%s must be called before any query",
     999             :                                 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
    1000             : 
    1001         120 :             if (IsSubTransaction())
    1002           0 :                 ereport(ERROR,
    1003             :                 /*- translator: %s is a CREATE_REPLICATION_SLOT statement */
    1004             :                         (errmsg("%s must not be called in a subtransaction",
    1005             :                                 "CREATE_REPLICATION_SLOT ... USE_SNAPSHOT")));
    1006             : 
    1007         120 :             need_full_snapshot = true;
    1008             :         }
    1009             : 
    1010         174 :         ctx = CreateInitDecodingContext(cmd->plugin, NIL, need_full_snapshot,
    1011             :                                         InvalidXLogRecPtr,
    1012         174 :                                         XL_ROUTINE(.page_read = logical_read_xlog_page,
    1013             :                                                    .segment_open = WalSndSegmentOpen,
    1014             :                                                    .segment_close = wal_segment_close),
    1015             :                                         WalSndPrepareWrite, WalSndWriteData,
    1016             :                                         WalSndUpdateProgress);
    1017             : 
    1018             :         /*
    1019             :          * Signal that we don't need the timeout mechanism. We're just
    1020             :          * creating the replication slot and don't yet accept feedback
    1021             :          * messages or send keepalives. As we possibly need to wait for
    1022             :          * further WAL the walsender would otherwise possibly be killed too
    1023             :          * soon.
    1024             :          */
    1025         174 :         last_reply_timestamp = 0;
    1026             : 
    1027             :         /* build initial snapshot, might take a while */
    1028         174 :         DecodingContextFindStartpoint(ctx);
    1029             : 
    1030             :         /*
    1031             :          * Export or use the snapshot if we've been asked to do so.
    1032             :          *
    1033             :          * NB. We will convert the snapbuild.c kind of snapshot to normal
    1034             :          * snapshot when doing this.
    1035             :          */
    1036         174 :         if (snapshot_action == CRS_EXPORT_SNAPSHOT)
    1037             :         {
    1038           0 :             snapshot_name = SnapBuildExportSnapshot(ctx->snapshot_builder);
    1039             :         }
    1040         174 :         else if (snapshot_action == CRS_USE_SNAPSHOT)
    1041             :         {
    1042             :             Snapshot    snap;
    1043             : 
    1044         120 :             snap = SnapBuildInitialSnapshot(ctx->snapshot_builder);
    1045         120 :             RestoreTransactionSnapshot(snap, MyProc);
    1046             :         }
    1047             : 
    1048             :         /* don't need the decoding context anymore */
    1049         174 :         FreeDecodingContext(ctx);
    1050             : 
    1051         174 :         if (!cmd->temporary)
    1052          54 :             ReplicationSlotPersist();
    1053             :     }
    1054         136 :     else if (cmd->kind == REPLICATION_KIND_PHYSICAL && reserve_wal)
    1055             :     {
    1056         134 :         ReplicationSlotReserveWal();
    1057             : 
    1058         134 :         ReplicationSlotMarkDirty();
    1059             : 
    1060             :         /* Write this slot to disk if it's a permanent one. */
    1061         134 :         if (!cmd->temporary)
    1062           2 :             ReplicationSlotSave();
    1063             :     }
    1064             : 
    1065         620 :     snprintf(xloc, sizeof(xloc), "%X/%X",
    1066         310 :              (uint32) (MyReplicationSlot->data.confirmed_flush >> 32),
    1067         310 :              (uint32) MyReplicationSlot->data.confirmed_flush);
    1068             : 
    1069         310 :     dest = CreateDestReceiver(DestRemoteSimple);
    1070         310 :     MemSet(nulls, false, sizeof(nulls));
    1071             : 
    1072             :     /*----------
    1073             :      * Need a tuple descriptor representing four columns:
    1074             :      * - first field: the slot name
    1075             :      * - second field: LSN at which we became consistent
    1076             :      * - third field: exported snapshot's name
    1077             :      * - fourth field: output plugin
    1078             :      *----------
    1079             :      */
    1080         310 :     tupdesc = CreateTemplateTupleDesc(4);
    1081         310 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 1, "slot_name",
    1082             :                               TEXTOID, -1, 0);
    1083         310 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 2, "consistent_point",
    1084             :                               TEXTOID, -1, 0);
    1085         310 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 3, "snapshot_name",
    1086             :                               TEXTOID, -1, 0);
    1087         310 :     TupleDescInitBuiltinEntry(tupdesc, (AttrNumber) 4, "output_plugin",
    1088             :                               TEXTOID, -1, 0);
    1089             : 
    1090             :     /* prepare for projection of tuples */
    1091         310 :     tstate = begin_tup_output_tupdesc(dest, tupdesc, &TTSOpsVirtual);
    1092             : 
    1093             :     /* slot_name */
    1094         310 :     slot_name = NameStr(MyReplicationSlot->data.name);
    1095         310 :     values[0] = CStringGetTextDatum(slot_name);
    1096             : 
    1097             :     /* consistent wal location */
    1098         310 :     values[1] = CStringGetTextDatum(xloc);
    1099             : 
    1100             :     /* snapshot name, or NULL if none */
    1101         310 :     if (snapshot_name != NULL)
    1102           0 :         values[2] = CStringGetTextDatum(snapshot_name);
    1103             :     else
    1104         310 :         nulls[2] = true;
    1105             : 
    1106             :     /* plugin, or NULL if none */
    1107         310 :     if (cmd->plugin != NULL)
    1108         174 :         values[3] = CStringGetTextDatum(cmd->plugin);
    1109             :     else
    1110         136 :         nulls[3] = true;
    1111             : 
    1112             :     /* send it to dest */
    1113         310 :     do_tup_output(tstate, values, nulls);
    1114         310 :     end_tup_output(tstate);
    1115             : 
    1116         310 :     ReplicationSlotRelease();
    1117         310 : }
    1118             : 
    1119             : /*
    1120             :  * Get rid of a replication slot that is no longer wanted.
    1121             :  */
    1122             : static void
    1123          16 : DropReplicationSlot(DropReplicationSlotCmd *cmd)
    1124             : {
    1125          16 :     ReplicationSlotDrop(cmd->slotname, !cmd->wait);
    1126          16 : }
    1127             : 
    1128             : /*
    1129             :  * Load previously initiated logical slot and prepare for sending data (via
    1130             :  * WalSndLoop).
    1131             :  */
    1132             : static void
    1133         186 : StartLogicalReplication(StartReplicationCmd *cmd)
    1134             : {
    1135             :     StringInfoData buf;
    1136             :     QueryCompletion qc;
    1137             : 
    1138             :     /* make sure that our requirements are still fulfilled */
    1139         186 :     CheckLogicalDecodingRequirements();
    1140             : 
    1141             :     Assert(!MyReplicationSlot);
    1142             : 
    1143         184 :     (void) ReplicationSlotAcquire(cmd->slotname, SAB_Error);
    1144             : 
    1145         184 :     if (XLogRecPtrIsInvalid(MyReplicationSlot->data.restart_lsn))
    1146           0 :         ereport(ERROR,
    1147             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1148             :                  errmsg("cannot read from logical replication slot \"%s\"",
    1149             :                         cmd->slotname),
    1150             :                  errdetail("This slot has been invalidated because it exceeded the maximum reserved size.")));
    1151             : 
    1152             :     /*
    1153             :      * Force a disconnect, so that the decoding code doesn't need to care
    1154             :      * about an eventual switch from running in recovery, to running in a
    1155             :      * normal environment. Client code is expected to handle reconnects.
    1156             :      */
    1157         184 :     if (am_cascading_walsender && !RecoveryInProgress())
    1158             :     {
    1159           0 :         ereport(LOG,
    1160             :                 (errmsg("terminating walsender process after promotion")));
    1161           0 :         got_STOPPING = true;
    1162             :     }
    1163             : 
    1164             :     /*
    1165             :      * Create our decoding context, making it start at the previously ack'ed
    1166             :      * position.
    1167             :      *
    1168             :      * Do this before sending a CopyBothResponse message, so that any errors
    1169             :      * are reported early.
    1170             :      */
    1171         182 :     logical_decoding_ctx =
    1172         184 :         CreateDecodingContext(cmd->startpoint, cmd->options, false,
    1173         184 :                               XL_ROUTINE(.page_read = logical_read_xlog_page,
    1174             :                                          .segment_open = WalSndSegmentOpen,
    1175             :                                          .segment_close = wal_segment_close),
    1176             :                               WalSndPrepareWrite, WalSndWriteData,
    1177             :                               WalSndUpdateProgress);
    1178         182 :     xlogreader = logical_decoding_ctx->reader;
    1179             : 
    1180         182 :     WalSndSetState(WALSNDSTATE_CATCHUP);
    1181             : 
    1182             :     /* Send a CopyBothResponse message, and start streaming */
    1183         182 :     pq_beginmessage(&buf, 'W');
    1184         182 :     pq_sendbyte(&buf, 0);
    1185         182 :     pq_sendint16(&buf, 0);
    1186         182 :     pq_endmessage(&buf);
    1187         182 :     pq_flush();
    1188             : 
    1189             :     /* Start reading WAL from the oldest required WAL. */
    1190         182 :     XLogBeginRead(logical_decoding_ctx->reader,
    1191         182 :                   MyReplicationSlot->data.restart_lsn);
    1192             : 
    1193             :     /*
    1194             :      * Report the location after which we'll send out further commits as the
    1195             :      * current sentPtr.
    1196             :      */
    1197         182 :     sentPtr = MyReplicationSlot->data.confirmed_flush;
    1198             : 
    1199             :     /* Also update the sent position status in shared memory */
    1200         182 :     SpinLockAcquire(&MyWalSnd->mutex);
    1201         182 :     MyWalSnd->sentPtr = MyReplicationSlot->data.restart_lsn;
    1202         182 :     SpinLockRelease(&MyWalSnd->mutex);
    1203             : 
    1204         182 :     replication_active = true;
    1205             : 
    1206         182 :     SyncRepInitConfig();
    1207             : 
    1208             :     /* Main loop of walsender */
    1209         182 :     WalSndLoop(XLogSendLogical);
    1210             : 
    1211         122 :     FreeDecodingContext(logical_decoding_ctx);
    1212         122 :     ReplicationSlotRelease();
    1213             : 
    1214         122 :     replication_active = false;
    1215         122 :     if (got_STOPPING)
    1216           0 :         proc_exit(0);
    1217         122 :     WalSndSetState(WALSNDSTATE_STARTUP);
    1218             : 
    1219             :     /* Get out of COPY mode (CommandComplete). */
    1220         122 :     SetQueryCompletion(&qc, CMDTAG_COPY, 0);
    1221         122 :     EndCommand(&qc, DestRemote, false);
    1222         122 : }
    1223             : 
    1224             : /*
    1225             :  * LogicalDecodingContext 'prepare_write' callback.
    1226             :  *
    1227             :  * Prepare a write into a StringInfo.
    1228             :  *
    1229             :  * Don't do anything lasting in here, it's quite possible that nothing will be done
    1230             :  * with the data.
    1231             :  */
    1232             : static void
    1233       95634 : WalSndPrepareWrite(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid, bool last_write)
    1234             : {
    1235             :     /* can't have sync rep confused by sending the same LSN several times */
    1236       95634 :     if (!last_write)
    1237         188 :         lsn = InvalidXLogRecPtr;
    1238             : 
    1239       95634 :     resetStringInfo(ctx->out);
    1240             : 
    1241       95634 :     pq_sendbyte(ctx->out, 'w');
    1242       95634 :     pq_sendint64(ctx->out, lsn); /* dataStart */
    1243       95634 :     pq_sendint64(ctx->out, lsn); /* walEnd */
    1244             : 
    1245             :     /*
    1246             :      * Fill out the sendtime later, just as it's done in XLogSendPhysical, but
    1247             :      * reserve space here.
    1248             :      */
    1249       95634 :     pq_sendint64(ctx->out, 0);   /* sendtime */
    1250       95634 : }
    1251             : 
    1252             : /*
    1253             :  * LogicalDecodingContext 'write' callback.
    1254             :  *
    1255             :  * Actually write out data previously prepared by WalSndPrepareWrite out to
    1256             :  * the network. Take as long as needed, but process replies from the other
    1257             :  * side and check timeouts during that.
    1258             :  */
    1259             : static void
    1260       95634 : WalSndWriteData(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid,
    1261             :                 bool last_write)
    1262             : {
    1263             :     TimestampTz now;
    1264             : 
    1265             :     /*
    1266             :      * Fill the send timestamp last, so that it is taken as late as possible.
    1267             :      * This is somewhat ugly, but the protocol is set as it's already used for
    1268             :      * several releases by streaming physical replication.
    1269             :      */
    1270       95634 :     resetStringInfo(&tmpbuf);
    1271       95634 :     now = GetCurrentTimestamp();
    1272       95634 :     pq_sendint64(&tmpbuf, now);
    1273       95634 :     memcpy(&ctx->out->data[1 + sizeof(int64) + sizeof(int64)],
    1274       95634 :            tmpbuf.data, sizeof(int64));
    1275             : 
    1276             :     /* output previously gathered data in a CopyData packet */
    1277       95634 :     pq_putmessage_noblock('d', ctx->out->data, ctx->out->len);
    1278             : 
    1279       95634 :     CHECK_FOR_INTERRUPTS();
    1280             : 
    1281             :     /* Try to flush pending output to the client */
    1282       95634 :     if (pq_flush_if_writable() != 0)
    1283           0 :         WalSndShutdown();
    1284             : 
    1285             :     /* Try taking fast path unless we get too close to walsender timeout. */
    1286       95634 :     if (now < TimestampTzPlusMilliseconds(last_reply_timestamp,
    1287       95634 :                                           wal_sender_timeout / 2) &&
    1288       95634 :         !pq_is_send_pending())
    1289             :     {
    1290       95628 :         return;
    1291             :     }
    1292             : 
    1293             :     /* If we have pending write here, go to slow path */
    1294             :     for (;;)
    1295           8 :     {
    1296             :         int         wakeEvents;
    1297             :         long        sleeptime;
    1298             : 
    1299             :         /* Check for input from the client */
    1300          14 :         ProcessRepliesIfAny();
    1301             : 
    1302             :         /* die if timeout was reached */
    1303          14 :         WalSndCheckTimeOut();
    1304             : 
    1305             :         /* Send keepalive if the time has come */
    1306          14 :         WalSndKeepaliveIfNecessary();
    1307             : 
    1308          14 :         if (!pq_is_send_pending())
    1309           6 :             break;
    1310             : 
    1311           8 :         sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
    1312             : 
    1313           8 :         wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
    1314             :             WL_SOCKET_WRITEABLE | WL_SOCKET_READABLE | WL_TIMEOUT;
    1315             : 
    1316             :         /* Sleep until something happens or we time out */
    1317           8 :         (void) WaitLatchOrSocket(MyLatch, wakeEvents,
    1318           8 :                                  MyProcPort->sock, sleeptime,
    1319             :                                  WAIT_EVENT_WAL_SENDER_WRITE_DATA);
    1320             : 
    1321             :         /* Clear any already-pending wakeups */
    1322           8 :         ResetLatch(MyLatch);
    1323             : 
    1324           8 :         CHECK_FOR_INTERRUPTS();
    1325             : 
    1326             :         /* Process any requests or signals received recently */
    1327           8 :         if (ConfigReloadPending)
    1328             :         {
    1329           0 :             ConfigReloadPending = false;
    1330           0 :             ProcessConfigFile(PGC_SIGHUP);
    1331           0 :             SyncRepInitConfig();
    1332             :         }
    1333             : 
    1334             :         /* Try to flush pending output to the client */
    1335           8 :         if (pq_flush_if_writable() != 0)
    1336           0 :             WalSndShutdown();
    1337             :     }
    1338             : 
    1339             :     /* reactivate latch so WalSndLoop knows to continue */
    1340           6 :     SetLatch(MyLatch);
    1341             : }
    1342             : 
    1343             : /*
    1344             :  * LogicalDecodingContext 'update_progress' callback.
    1345             :  *
    1346             :  * Write the current position to the lag tracker (see XLogSendPhysical).
    1347             :  */
    1348             : static void
    1349         346 : WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId xid)
    1350             : {
    1351             :     static TimestampTz sendTime = 0;
    1352         346 :     TimestampTz now = GetCurrentTimestamp();
    1353             : 
    1354             :     /*
    1355             :      * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
    1356             :      * avoid flooding the lag tracker when we commit frequently.
    1357             :      */
    1358             : #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS    1000
    1359         346 :     if (!TimestampDifferenceExceeds(sendTime, now,
    1360             :                                     WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
    1361         280 :         return;
    1362             : 
    1363          66 :     LagTrackerWrite(lsn, now);
    1364          66 :     sendTime = now;
    1365             : }
    1366             : 
    1367             : /*
    1368             :  * Wait till WAL < loc is flushed to disk so it can be safely sent to client.
    1369             :  *
    1370             :  * Returns end LSN of flushed WAL.  Normally this will be >= loc, but
    1371             :  * if we detect a shutdown request (either from postmaster or client)
    1372             :  * we will return early, so caller must always check.
    1373             :  */
    1374             : static XLogRecPtr
    1375       15024 : WalSndWaitForWal(XLogRecPtr loc)
    1376             : {
    1377             :     int         wakeEvents;
    1378             :     static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
    1379             : 
    1380             :     /*
    1381             :      * Fast path to avoid acquiring the spinlock in case we already know we
    1382             :      * have enough WAL available. This is particularly interesting if we're
    1383             :      * far behind.
    1384             :      */
    1385       15024 :     if (RecentFlushPtr != InvalidXLogRecPtr &&
    1386       14784 :         loc <= RecentFlushPtr)
    1387        8602 :         return RecentFlushPtr;
    1388             : 
    1389             :     /* Get a more recent flush pointer. */
    1390        6422 :     if (!RecoveryInProgress())
    1391        6422 :         RecentFlushPtr = GetFlushRecPtr();
    1392             :     else
    1393           0 :         RecentFlushPtr = GetXLogReplayRecPtr(NULL);
    1394             : 
    1395             :     for (;;)
    1396        1162 :     {
    1397             :         long        sleeptime;
    1398             : 
    1399             :         /* Clear any already-pending wakeups */
    1400        7584 :         ResetLatch(MyLatch);
    1401             : 
    1402        7584 :         CHECK_FOR_INTERRUPTS();
    1403             : 
    1404             :         /* Process any requests or signals received recently */
    1405        7584 :         if (ConfigReloadPending)
    1406             :         {
    1407           0 :             ConfigReloadPending = false;
    1408           0 :             ProcessConfigFile(PGC_SIGHUP);
    1409           0 :             SyncRepInitConfig();
    1410             :         }
    1411             : 
    1412             :         /* Check for input from the client */
    1413        7584 :         ProcessRepliesIfAny();
    1414             : 
    1415             :         /*
    1416             :          * If we're shutting down, trigger pending WAL to be written out,
    1417             :          * otherwise we'd possibly end up waiting for WAL that never gets
    1418             :          * written, because walwriter has shut down already.
    1419             :          */
    1420        7528 :         if (got_STOPPING)
    1421        5556 :             XLogBackgroundFlush();
    1422             : 
    1423             :         /* Update our idea of the currently flushed position. */
    1424        7528 :         if (!RecoveryInProgress())
    1425        7528 :             RecentFlushPtr = GetFlushRecPtr();
    1426             :         else
    1427           0 :             RecentFlushPtr = GetXLogReplayRecPtr(NULL);
    1428             : 
    1429             :         /*
    1430             :          * If postmaster asked us to stop, don't wait anymore.
    1431             :          *
    1432             :          * It's important to do this check after the recomputation of
    1433             :          * RecentFlushPtr, so we can send all remaining data before shutting
    1434             :          * down.
    1435             :          */
    1436        7528 :         if (got_STOPPING)
    1437        5556 :             break;
    1438             : 
    1439             :         /*
    1440             :          * We only send regular messages to the client for full decoded
    1441             :          * transactions, but a synchronous replication and walsender shutdown
    1442             :          * possibly are waiting for a later location. So, before sleeping, we
    1443             :          * send a ping containing the flush location. If the receiver is
    1444             :          * otherwise idle, this keepalive will trigger a reply. Processing the
    1445             :          * reply will update these MyWalSnd locations.
    1446             :          */
    1447        1972 :         if (MyWalSnd->flush < sentPtr &&
    1448        1108 :             MyWalSnd->write < sentPtr &&
    1449         640 :             !waiting_for_ping_response)
    1450         640 :             WalSndKeepalive(false);
    1451             : 
    1452             :         /* check whether we're done */
    1453        1972 :         if (loc <= RecentFlushPtr)
    1454         716 :             break;
    1455             : 
    1456             :         /* Waiting for new WAL. Since we need to wait, we're now caught up. */
    1457        1256 :         WalSndCaughtUp = true;
    1458             : 
    1459             :         /*
    1460             :          * Try to flush any pending output to the client.
    1461             :          */
    1462        1256 :         if (pq_flush_if_writable() != 0)
    1463           0 :             WalSndShutdown();
    1464             : 
    1465             :         /*
    1466             :          * If we have received CopyDone from the client, sent CopyDone
    1467             :          * ourselves, and the output buffer is empty, it's time to exit
    1468             :          * streaming, so fail the current WAL fetch request.
    1469             :          */
    1470        1256 :         if (streamingDoneReceiving && streamingDoneSending &&
    1471          94 :             !pq_is_send_pending())
    1472          94 :             break;
    1473             : 
    1474             :         /* die if timeout was reached */
    1475        1162 :         WalSndCheckTimeOut();
    1476             : 
    1477             :         /* Send keepalive if the time has come */
    1478        1162 :         WalSndKeepaliveIfNecessary();
    1479             : 
    1480             :         /*
    1481             :          * Sleep until something happens or we time out.  Also wait for the
    1482             :          * socket becoming writable, if there's still pending output.
    1483             :          * Otherwise we might sit on sendable output data while waiting for
    1484             :          * new WAL to be generated.  (But if we have nothing to send, we don't
    1485             :          * want to wake on socket-writable.)
    1486             :          */
    1487        1162 :         sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
    1488             : 
    1489        1162 :         wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH |
    1490             :             WL_SOCKET_READABLE | WL_TIMEOUT;
    1491             : 
    1492        1162 :         if (pq_is_send_pending())
    1493           0 :             wakeEvents |= WL_SOCKET_WRITEABLE;
    1494             : 
    1495        1162 :         (void) WaitLatchOrSocket(MyLatch, wakeEvents,
    1496        1162 :                                  MyProcPort->sock, sleeptime,
    1497             :                                  WAIT_EVENT_WAL_SENDER_WAIT_WAL);
    1498             :     }
    1499             : 
    1500             :     /* reactivate latch so WalSndLoop knows to continue */
    1501        6366 :     SetLatch(MyLatch);
    1502        6366 :     return RecentFlushPtr;
    1503             : }
    1504             : 
    1505             : /*
    1506             :  * Execute an incoming replication command.
    1507             :  *
    1508             :  * Returns true if the cmd_string was recognized as WalSender command, false
    1509             :  * if not.
    1510             :  */
    1511             : bool
    1512        2826 : exec_replication_command(const char *cmd_string)
    1513             : {
    1514             :     int         parse_rc;
    1515             :     Node       *cmd_node;
    1516             :     const char *cmdtag;
    1517             :     MemoryContext cmd_context;
    1518             :     MemoryContext old_context;
    1519             : 
    1520             :     /*
    1521             :      * If WAL sender has been told that shutdown is getting close, switch its
    1522             :      * status accordingly to handle the next replication commands correctly.
    1523             :      */
    1524        2826 :     if (got_STOPPING)
    1525           0 :         WalSndSetState(WALSNDSTATE_STOPPING);
    1526             : 
    1527             :     /*
    1528             :      * Throw error if in stopping mode.  We need prevent commands that could
    1529             :      * generate WAL while the shutdown checkpoint is being written.  To be
    1530             :      * safe, we just prohibit all new commands.
    1531             :      */
    1532        2826 :     if (MyWalSnd->state == WALSNDSTATE_STOPPING)
    1533           0 :         ereport(ERROR,
    1534             :                 (errmsg("cannot execute new commands while WAL sender is in stopping mode")));
    1535             : 
    1536             :     /*
    1537             :      * CREATE_REPLICATION_SLOT ... LOGICAL exports a snapshot until the next
    1538             :      * command arrives. Clean up the old stuff if there's anything.
    1539             :      */
    1540        2826 :     SnapBuildClearExportedSnapshot();
    1541             : 
    1542        2826 :     CHECK_FOR_INTERRUPTS();
    1543             : 
    1544             :     /*
    1545             :      * Parse the command.
    1546             :      */
    1547        2826 :     cmd_context = AllocSetContextCreate(CurrentMemoryContext,
    1548             :                                         "Replication command context",
    1549             :                                         ALLOCSET_DEFAULT_SIZES);
    1550        2826 :     old_context = MemoryContextSwitchTo(cmd_context);
    1551             : 
    1552        2826 :     replication_scanner_init(cmd_string);
    1553        2826 :     parse_rc = replication_yyparse();
    1554        2826 :     if (parse_rc != 0)
    1555           0 :         ereport(ERROR,
    1556             :                 (errcode(ERRCODE_SYNTAX_ERROR),
    1557             :                  errmsg_internal("replication command parser returned %d",
    1558             :                                  parse_rc)));
    1559        2826 :     replication_scanner_finish();
    1560             : 
    1561        2826 :     cmd_node = replication_parse_result;
    1562             : 
    1563             :     /*
    1564             :      * If it's a SQL command, just clean up our mess and return false; the
    1565             :      * caller will take care of executing it.
    1566             :      */
    1567        2826 :     if (IsA(cmd_node, SQLCmd))
    1568             :     {
    1569         930 :         if (MyDatabaseId == InvalidOid)
    1570           0 :             ereport(ERROR,
    1571             :                     (errmsg("cannot execute SQL commands in WAL sender for physical replication")));
    1572             : 
    1573         930 :         MemoryContextSwitchTo(old_context);
    1574         930 :         MemoryContextDelete(cmd_context);
    1575             : 
    1576             :         /* Tell the caller that this wasn't a WalSender command. */
    1577         930 :         return false;
    1578             :     }
    1579             : 
    1580             :     /*
    1581             :      * Report query to various monitoring facilities.  For this purpose, we
    1582             :      * report replication commands just like SQL commands.
    1583             :      */
    1584        1896 :     debug_query_string = cmd_string;
    1585             : 
    1586        1896 :     pgstat_report_activity(STATE_RUNNING, cmd_string);
    1587             : 
    1588             :     /*
    1589             :      * Log replication command if log_replication_commands is enabled. Even
    1590             :      * when it's disabled, log the command with DEBUG1 level for backward
    1591             :      * compatibility.
    1592             :      */
    1593        1896 :     ereport(log_replication_commands ? LOG : DEBUG1,
    1594             :             (errmsg("received replication command: %s", cmd_string)));
    1595             : 
    1596             :     /*
    1597             :      * Disallow replication commands in aborted transaction blocks.
    1598             :      */
    1599        1896 :     if (IsAbortedTransactionBlockState())
    1600           0 :         ereport(ERROR,
    1601             :                 (errcode(ERRCODE_IN_FAILED_SQL_TRANSACTION),
    1602             :                  errmsg("current transaction is aborted, "
    1603             :                         "commands ignored until end of transaction block")));
    1604             : 
    1605        1896 :     CHECK_FOR_INTERRUPTS();
    1606             : 
    1607             :     /*
    1608             :      * Allocate buffers that will be used for each outgoing and incoming
    1609             :      * message.  We do this just once per command to reduce palloc overhead.
    1610             :      */
    1611        1896 :     initStringInfo(&output_message);
    1612        1896 :     initStringInfo(&reply_message);
    1613        1896 :     initStringInfo(&tmpbuf);
    1614             : 
    1615        1896 :     switch (cmd_node->type)
    1616             :     {
    1617         482 :         case T_IdentifySystemCmd:
    1618         482 :             cmdtag = "IDENTIFY_SYSTEM";
    1619         482 :             set_ps_display(cmdtag);
    1620         482 :             IdentifySystem();
    1621         482 :             EndReplicationCommand(cmdtag);
    1622         482 :             break;
    1623             : 
    1624         152 :         case T_BaseBackupCmd:
    1625         152 :             cmdtag = "BASE_BACKUP";
    1626         152 :             set_ps_display(cmdtag);
    1627         152 :             PreventInTransactionBlock(true, cmdtag);
    1628         152 :             SendBaseBackup((BaseBackupCmd *) cmd_node);
    1629         138 :             EndReplicationCommand(cmdtag);
    1630         138 :             break;
    1631             : 
    1632         312 :         case T_CreateReplicationSlotCmd:
    1633         312 :             cmdtag = "CREATE_REPLICATION_SLOT";
    1634         312 :             set_ps_display(cmdtag);
    1635         312 :             CreateReplicationSlot((CreateReplicationSlotCmd *) cmd_node);
    1636         310 :             EndReplicationCommand(cmdtag);
    1637         310 :             break;
    1638             : 
    1639          16 :         case T_DropReplicationSlotCmd:
    1640          16 :             cmdtag = "DROP_REPLICATION_SLOT";
    1641          16 :             set_ps_display(cmdtag);
    1642          16 :             DropReplicationSlot((DropReplicationSlotCmd *) cmd_node);
    1643          16 :             EndReplicationCommand(cmdtag);
    1644          16 :             break;
    1645             : 
    1646         442 :         case T_StartReplicationCmd:
    1647             :             {
    1648         442 :                 StartReplicationCmd *cmd = (StartReplicationCmd *) cmd_node;
    1649             : 
    1650         442 :                 cmdtag = "START_REPLICATION";
    1651         442 :                 set_ps_display(cmdtag);
    1652         442 :                 PreventInTransactionBlock(true, cmdtag);
    1653             : 
    1654         442 :                 if (cmd->kind == REPLICATION_KIND_PHYSICAL)
    1655         256 :                     StartReplication(cmd);
    1656             :                 else
    1657         186 :                     StartLogicalReplication(cmd);
    1658             : 
    1659             :                 /* dupe, but necessary per libpqrcv_endstreaming */
    1660         266 :                 EndReplicationCommand(cmdtag);
    1661             : 
    1662             :                 Assert(xlogreader != NULL);
    1663         266 :                 break;
    1664             :             }
    1665             : 
    1666          10 :         case T_TimeLineHistoryCmd:
    1667          10 :             cmdtag = "TIMELINE_HISTORY";
    1668          10 :             set_ps_display(cmdtag);
    1669          10 :             PreventInTransactionBlock(true, cmdtag);
    1670          10 :             SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
    1671          10 :             EndReplicationCommand(cmdtag);
    1672          10 :             break;
    1673             : 
    1674         482 :         case T_VariableShowStmt:
    1675             :             {
    1676         482 :                 DestReceiver *dest = CreateDestReceiver(DestRemoteSimple);
    1677         482 :                 VariableShowStmt *n = (VariableShowStmt *) cmd_node;
    1678             : 
    1679         482 :                 cmdtag = "SHOW";
    1680         482 :                 set_ps_display(cmdtag);
    1681             : 
    1682             :                 /* syscache access needs a transaction environment */
    1683         482 :                 StartTransactionCommand();
    1684         482 :                 GetPGVariable(n->name, dest);
    1685         482 :                 CommitTransactionCommand();
    1686         482 :                 EndReplicationCommand(cmdtag);
    1687             :             }
    1688         482 :             break;
    1689             : 
    1690           0 :         default:
    1691           0 :             elog(ERROR, "unrecognized replication command node tag: %u",
    1692             :                  cmd_node->type);
    1693             :     }
    1694             : 
    1695             :     /* done */
    1696        1704 :     MemoryContextSwitchTo(old_context);
    1697        1704 :     MemoryContextDelete(cmd_context);
    1698             : 
    1699             :     /*
    1700             :      * We need not update ps display or pg_stat_activity, because PostgresMain
    1701             :      * will reset those to "idle".  But we must reset debug_query_string to
    1702             :      * ensure it doesn't become a dangling pointer.
    1703             :      */
    1704        1704 :     debug_query_string = NULL;
    1705             : 
    1706        1704 :     return true;
    1707             : }
    1708             : 
    1709             : /*
    1710             :  * Process any incoming messages while streaming. Also checks if the remote
    1711             :  * end has closed the connection.
    1712             :  */
    1713             : static void
    1714      271312 : ProcessRepliesIfAny(void)
    1715             : {
    1716             :     unsigned char firstchar;
    1717             :     int         r;
    1718      271312 :     bool        received = false;
    1719             : 
    1720      271312 :     last_processing = GetCurrentTimestamp();
    1721             : 
    1722             :     for (;;)
    1723             :     {
    1724      291594 :         pq_startmsgread();
    1725      291594 :         r = pq_getbyte_if_available(&firstchar);
    1726      291594 :         if (r < 0)
    1727             :         {
    1728             :             /* unexpected error or EOF */
    1729          20 :             ereport(COMMERROR,
    1730             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1731             :                      errmsg("unexpected EOF on standby connection")));
    1732          20 :             proc_exit(0);
    1733             :         }
    1734      291574 :         if (r == 0)
    1735             :         {
    1736             :             /* no data available without blocking */
    1737      271184 :             pq_endmsgread();
    1738      271184 :             break;
    1739             :         }
    1740             : 
    1741             :         /* Read the message contents */
    1742       20390 :         resetStringInfo(&reply_message);
    1743       20390 :         if (pq_getmessage(&reply_message, 0))
    1744             :         {
    1745           0 :             ereport(COMMERROR,
    1746             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1747             :                      errmsg("unexpected EOF on standby connection")));
    1748           0 :             proc_exit(0);
    1749             :         }
    1750             : 
    1751             :         /*
    1752             :          * If we already received a CopyDone from the frontend, the frontend
    1753             :          * should not send us anything until we've closed our end of the COPY.
    1754             :          * XXX: In theory, the frontend could already send the next command
    1755             :          * before receiving the CopyDone, but libpq doesn't currently allow
    1756             :          * that.
    1757             :          */
    1758       20390 :         if (streamingDoneReceiving && firstchar != 'X')
    1759           0 :             ereport(FATAL,
    1760             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1761             :                      errmsg("unexpected standby message type \"%c\", after receiving CopyDone",
    1762             :                             firstchar)));
    1763             : 
    1764             :         /* Handle the very limited subset of commands expected in this phase */
    1765       20390 :         switch (firstchar)
    1766             :         {
    1767             :                 /*
    1768             :                  * 'd' means a standby reply wrapped in a CopyData packet.
    1769             :                  */
    1770       20016 :             case 'd':
    1771       20016 :                 ProcessStandbyMessage();
    1772       20016 :                 received = true;
    1773       20016 :                 break;
    1774             : 
    1775             :                 /*
    1776             :                  * CopyDone means the standby requested to finish streaming.
    1777             :                  * Reply with CopyDone, if we had not sent that already.
    1778             :                  */
    1779         266 :             case 'c':
    1780         266 :                 if (!streamingDoneSending)
    1781             :                 {
    1782         256 :                     pq_putmessage_noblock('c', NULL, 0);
    1783         256 :                     streamingDoneSending = true;
    1784             :                 }
    1785             : 
    1786         266 :                 streamingDoneReceiving = true;
    1787         266 :                 received = true;
    1788         266 :                 break;
    1789             : 
    1790             :                 /*
    1791             :                  * 'X' means that the standby is closing down the socket.
    1792             :                  */
    1793         108 :             case 'X':
    1794         108 :                 proc_exit(0);
    1795             : 
    1796           0 :             default:
    1797           0 :                 ereport(FATAL,
    1798             :                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1799             :                          errmsg("invalid standby message type \"%c\"",
    1800             :                                 firstchar)));
    1801             :         }
    1802             :     }
    1803             : 
    1804             :     /*
    1805             :      * Save the last reply timestamp if we've received at least one reply.
    1806             :      */
    1807      271184 :     if (received)
    1808             :     {
    1809        3920 :         last_reply_timestamp = last_processing;
    1810        3920 :         waiting_for_ping_response = false;
    1811             :     }
    1812      271184 : }
    1813             : 
    1814             : /*
    1815             :  * Process a status update message received from standby.
    1816             :  */
    1817             : static void
    1818       20016 : ProcessStandbyMessage(void)
    1819             : {
    1820             :     char        msgtype;
    1821             : 
    1822             :     /*
    1823             :      * Check message type from the first byte.
    1824             :      */
    1825       20016 :     msgtype = pq_getmsgbyte(&reply_message);
    1826             : 
    1827       20016 :     switch (msgtype)
    1828             :     {
    1829       19904 :         case 'r':
    1830       19904 :             ProcessStandbyReplyMessage();
    1831       19904 :             break;
    1832             : 
    1833         112 :         case 'h':
    1834         112 :             ProcessStandbyHSFeedbackMessage();
    1835         112 :             break;
    1836             : 
    1837           0 :         default:
    1838           0 :             ereport(COMMERROR,
    1839             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1840             :                      errmsg("unexpected message type \"%c\"", msgtype)));
    1841           0 :             proc_exit(0);
    1842             :     }
    1843       20016 : }
    1844             : 
    1845             : /*
    1846             :  * Remember that a walreceiver just confirmed receipt of lsn `lsn`.
    1847             :  */
    1848             : static void
    1849         424 : PhysicalConfirmReceivedLocation(XLogRecPtr lsn)
    1850             : {
    1851         424 :     bool        changed = false;
    1852         424 :     ReplicationSlot *slot = MyReplicationSlot;
    1853             : 
    1854             :     Assert(lsn != InvalidXLogRecPtr);
    1855         424 :     SpinLockAcquire(&slot->mutex);
    1856         424 :     if (slot->data.restart_lsn != lsn)
    1857             :     {
    1858         258 :         changed = true;
    1859         258 :         slot->data.restart_lsn = lsn;
    1860             :     }
    1861         424 :     SpinLockRelease(&slot->mutex);
    1862             : 
    1863         424 :     if (changed)
    1864             :     {
    1865         258 :         ReplicationSlotMarkDirty();
    1866         258 :         ReplicationSlotsComputeRequiredLSN();
    1867             :     }
    1868             : 
    1869             :     /*
    1870             :      * One could argue that the slot should be saved to disk now, but that'd
    1871             :      * be energy wasted - the worst lost information can do here is give us
    1872             :      * wrong information in a statistics view - we'll just potentially be more
    1873             :      * conservative in removing files.
    1874             :      */
    1875         424 : }
    1876             : 
    1877             : /*
    1878             :  * Regular reply from standby advising of WAL locations on standby server.
    1879             :  */
    1880             : static void
    1881       19904 : ProcessStandbyReplyMessage(void)
    1882             : {
    1883             :     XLogRecPtr  writePtr,
    1884             :                 flushPtr,
    1885             :                 applyPtr;
    1886             :     bool        replyRequested;
    1887             :     TimeOffset  writeLag,
    1888             :                 flushLag,
    1889             :                 applyLag;
    1890             :     bool        clearLagTimes;
    1891             :     TimestampTz now;
    1892             :     TimestampTz replyTime;
    1893             : 
    1894             :     static bool fullyAppliedLastTime = false;
    1895             : 
    1896             :     /* the caller already consumed the msgtype byte */
    1897       19904 :     writePtr = pq_getmsgint64(&reply_message);
    1898       19904 :     flushPtr = pq_getmsgint64(&reply_message);
    1899       19904 :     applyPtr = pq_getmsgint64(&reply_message);
    1900       19904 :     replyTime = pq_getmsgint64(&reply_message);
    1901       19904 :     replyRequested = pq_getmsgbyte(&reply_message);
    1902             : 
    1903       19904 :     if (message_level_is_interesting(DEBUG2))
    1904             :     {
    1905             :         char       *replyTimeStr;
    1906             : 
    1907             :         /* Copy because timestamptz_to_str returns a static buffer */
    1908           0 :         replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
    1909             : 
    1910           0 :         elog(DEBUG2, "write %X/%X flush %X/%X apply %X/%X%s reply_time %s",
    1911             :              (uint32) (writePtr >> 32), (uint32) writePtr,
    1912             :              (uint32) (flushPtr >> 32), (uint32) flushPtr,
    1913             :              (uint32) (applyPtr >> 32), (uint32) applyPtr,
    1914             :              replyRequested ? " (reply requested)" : "",
    1915             :              replyTimeStr);
    1916             : 
    1917           0 :         pfree(replyTimeStr);
    1918             :     }
    1919             : 
    1920             :     /* See if we can compute the round-trip lag for these positions. */
    1921       19904 :     now = GetCurrentTimestamp();
    1922       19904 :     writeLag = LagTrackerRead(SYNC_REP_WAIT_WRITE, writePtr, now);
    1923       19904 :     flushLag = LagTrackerRead(SYNC_REP_WAIT_FLUSH, flushPtr, now);
    1924       19904 :     applyLag = LagTrackerRead(SYNC_REP_WAIT_APPLY, applyPtr, now);
    1925             : 
    1926             :     /*
    1927             :      * If the standby reports that it has fully replayed the WAL in two
    1928             :      * consecutive reply messages, then the second such message must result
    1929             :      * from wal_receiver_status_interval expiring on the standby.  This is a
    1930             :      * convenient time to forget the lag times measured when it last
    1931             :      * wrote/flushed/applied a WAL record, to avoid displaying stale lag data
    1932             :      * until more WAL traffic arrives.
    1933             :      */
    1934       19904 :     clearLagTimes = false;
    1935       19904 :     if (applyPtr == sentPtr)
    1936             :     {
    1937        1060 :         if (fullyAppliedLastTime)
    1938         482 :             clearLagTimes = true;
    1939        1060 :         fullyAppliedLastTime = true;
    1940             :     }
    1941             :     else
    1942       18844 :         fullyAppliedLastTime = false;
    1943             : 
    1944             :     /* Send a reply if the standby requested one. */
    1945       19904 :     if (replyRequested)
    1946           0 :         WalSndKeepalive(false);
    1947             : 
    1948             :     /*
    1949             :      * Update shared state for this WalSender process based on reply data from
    1950             :      * standby.
    1951             :      */
    1952             :     {
    1953       19904 :         WalSnd     *walsnd = MyWalSnd;
    1954             : 
    1955       19904 :         SpinLockAcquire(&walsnd->mutex);
    1956       19904 :         walsnd->write = writePtr;
    1957       19904 :         walsnd->flush = flushPtr;
    1958       19904 :         walsnd->apply = applyPtr;
    1959       19904 :         if (writeLag != -1 || clearLagTimes)
    1960        1750 :             walsnd->writeLag = writeLag;
    1961       19904 :         if (flushLag != -1 || clearLagTimes)
    1962        3642 :             walsnd->flushLag = flushLag;
    1963       19904 :         if (applyLag != -1 || clearLagTimes)
    1964        2190 :             walsnd->applyLag = applyLag;
    1965       19904 :         walsnd->replyTime = replyTime;
    1966       19904 :         SpinLockRelease(&walsnd->mutex);
    1967             :     }
    1968             : 
    1969       19904 :     if (!am_cascading_walsender)
    1970       19768 :         SyncRepReleaseWaiters();
    1971             : 
    1972             :     /*
    1973             :      * Advance our local xmin horizon when the client confirmed a flush.
    1974             :      */
    1975       19904 :     if (MyReplicationSlot && flushPtr != InvalidXLogRecPtr)
    1976             :     {
    1977       18584 :         if (SlotIsLogical(MyReplicationSlot))
    1978       18160 :             LogicalConfirmReceivedLocation(flushPtr);
    1979             :         else
    1980         424 :             PhysicalConfirmReceivedLocation(flushPtr);
    1981             :     }
    1982       19904 : }
    1983             : 
    1984             : /* compute new replication slot xmin horizon if needed */
    1985             : static void
    1986          34 : PhysicalReplicationSlotNewXmin(TransactionId feedbackXmin, TransactionId feedbackCatalogXmin)
    1987             : {
    1988          34 :     bool        changed = false;
    1989          34 :     ReplicationSlot *slot = MyReplicationSlot;
    1990             : 
    1991          34 :     SpinLockAcquire(&slot->mutex);
    1992          34 :     MyProc->xmin = InvalidTransactionId;
    1993             : 
    1994             :     /*
    1995             :      * For physical replication we don't need the interlock provided by xmin
    1996             :      * and effective_xmin since the consequences of a missed increase are
    1997             :      * limited to query cancellations, so set both at once.
    1998             :      */
    1999          34 :     if (!TransactionIdIsNormal(slot->data.xmin) ||
    2000          12 :         !TransactionIdIsNormal(feedbackXmin) ||
    2001          12 :         TransactionIdPrecedes(slot->data.xmin, feedbackXmin))
    2002             :     {
    2003          26 :         changed = true;
    2004          26 :         slot->data.xmin = feedbackXmin;
    2005          26 :         slot->effective_xmin = feedbackXmin;
    2006             :     }
    2007          34 :     if (!TransactionIdIsNormal(slot->data.catalog_xmin) ||
    2008           0 :         !TransactionIdIsNormal(feedbackCatalogXmin) ||
    2009           0 :         TransactionIdPrecedes(slot->data.catalog_xmin, feedbackCatalogXmin))
    2010             :     {
    2011          34 :         changed = true;
    2012          34 :         slot->data.catalog_xmin = feedbackCatalogXmin;
    2013          34 :         slot->effective_catalog_xmin = feedbackCatalogXmin;
    2014             :     }
    2015          34 :     SpinLockRelease(&slot->mutex);
    2016             : 
    2017          34 :     if (changed)
    2018             :     {
    2019          34 :         ReplicationSlotMarkDirty();
    2020          34 :         ReplicationSlotsComputeRequiredXmin(false);
    2021             :     }
    2022          34 : }
    2023             : 
    2024             : /*
    2025             :  * Check that the provided xmin/epoch are sane, that is, not in the future
    2026             :  * and not so far back as to be already wrapped around.
    2027             :  *
    2028             :  * Epoch of nextXid should be same as standby, or if the counter has
    2029             :  * wrapped, then one greater than standby.
    2030             :  *
    2031             :  * This check doesn't care about whether clog exists for these xids
    2032             :  * at all.
    2033             :  */
    2034             : static bool
    2035          16 : TransactionIdInRecentPast(TransactionId xid, uint32 epoch)
    2036             : {
    2037             :     FullTransactionId nextFullXid;
    2038             :     TransactionId nextXid;
    2039             :     uint32      nextEpoch;
    2040             : 
    2041          16 :     nextFullXid = ReadNextFullTransactionId();
    2042          16 :     nextXid = XidFromFullTransactionId(nextFullXid);
    2043          16 :     nextEpoch = EpochFromFullTransactionId(nextFullXid);
    2044             : 
    2045          16 :     if (xid <= nextXid)
    2046             :     {
    2047          16 :         if (epoch != nextEpoch)
    2048           0 :             return false;
    2049             :     }
    2050             :     else
    2051             :     {
    2052           0 :         if (epoch + 1 != nextEpoch)
    2053           0 :             return false;
    2054             :     }
    2055             : 
    2056          16 :     if (!TransactionIdPrecedesOrEquals(xid, nextXid))
    2057           0 :         return false;           /* epoch OK, but it's wrapped around */
    2058             : 
    2059          16 :     return true;
    2060             : }
    2061             : 
    2062             : /*
    2063             :  * Hot Standby feedback
    2064             :  */
    2065             : static void
    2066         112 : ProcessStandbyHSFeedbackMessage(void)
    2067             : {
    2068             :     TransactionId feedbackXmin;
    2069             :     uint32      feedbackEpoch;
    2070             :     TransactionId feedbackCatalogXmin;
    2071             :     uint32      feedbackCatalogEpoch;
    2072             :     TimestampTz replyTime;
    2073             : 
    2074             :     /*
    2075             :      * Decipher the reply message. The caller already consumed the msgtype
    2076             :      * byte. See XLogWalRcvSendHSFeedback() in walreceiver.c for the creation
    2077             :      * of this message.
    2078             :      */
    2079         112 :     replyTime = pq_getmsgint64(&reply_message);
    2080         112 :     feedbackXmin = pq_getmsgint(&reply_message, 4);
    2081         112 :     feedbackEpoch = pq_getmsgint(&reply_message, 4);
    2082         112 :     feedbackCatalogXmin = pq_getmsgint(&reply_message, 4);
    2083         112 :     feedbackCatalogEpoch = pq_getmsgint(&reply_message, 4);
    2084             : 
    2085         112 :     if (message_level_is_interesting(DEBUG2))
    2086             :     {
    2087             :         char       *replyTimeStr;
    2088             : 
    2089             :         /* Copy because timestamptz_to_str returns a static buffer */
    2090           0 :         replyTimeStr = pstrdup(timestamptz_to_str(replyTime));
    2091             : 
    2092           0 :         elog(DEBUG2, "hot standby feedback xmin %u epoch %u, catalog_xmin %u epoch %u reply_time %s",
    2093             :              feedbackXmin,
    2094             :              feedbackEpoch,
    2095             :              feedbackCatalogXmin,
    2096             :              feedbackCatalogEpoch,
    2097             :              replyTimeStr);
    2098             : 
    2099           0 :         pfree(replyTimeStr);
    2100             :     }
    2101             : 
    2102             :     /*
    2103             :      * Update shared state for this WalSender process based on reply data from
    2104             :      * standby.
    2105             :      */
    2106             :     {
    2107         112 :         WalSnd     *walsnd = MyWalSnd;
    2108             : 
    2109         112 :         SpinLockAcquire(&walsnd->mutex);
    2110         112 :         walsnd->replyTime = replyTime;
    2111         112 :         SpinLockRelease(&walsnd->mutex);
    2112             :     }
    2113             : 
    2114             :     /*
    2115             :      * Unset WalSender's xmins if the feedback message values are invalid.
    2116             :      * This happens when the downstream turned hot_standby_feedback off.
    2117             :      */
    2118         112 :     if (!TransactionIdIsNormal(feedbackXmin)
    2119          96 :         && !TransactionIdIsNormal(feedbackCatalogXmin))
    2120             :     {
    2121          96 :         MyProc->xmin = InvalidTransactionId;
    2122          96 :         if (MyReplicationSlot != NULL)
    2123          18 :             PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
    2124          96 :         return;
    2125             :     }
    2126             : 
    2127             :     /*
    2128             :      * Check that the provided xmin/epoch are sane, that is, not in the future
    2129             :      * and not so far back as to be already wrapped around.  Ignore if not.
    2130             :      */
    2131          16 :     if (TransactionIdIsNormal(feedbackXmin) &&
    2132          16 :         !TransactionIdInRecentPast(feedbackXmin, feedbackEpoch))
    2133           0 :         return;
    2134             : 
    2135          16 :     if (TransactionIdIsNormal(feedbackCatalogXmin) &&
    2136           0 :         !TransactionIdInRecentPast(feedbackCatalogXmin, feedbackCatalogEpoch))
    2137           0 :         return;
    2138             : 
    2139             :     /*
    2140             :      * Set the WalSender's xmin equal to the standby's requested xmin, so that
    2141             :      * the xmin will be taken into account by GetSnapshotData() /
    2142             :      * ComputeXidHorizons().  This will hold back the removal of dead rows and
    2143             :      * thereby prevent the generation of cleanup conflicts on the standby
    2144             :      * server.
    2145             :      *
    2146             :      * There is a small window for a race condition here: although we just
    2147             :      * checked that feedbackXmin precedes nextXid, the nextXid could have
    2148             :      * gotten advanced between our fetching it and applying the xmin below,
    2149             :      * perhaps far enough to make feedbackXmin wrap around.  In that case the
    2150             :      * xmin we set here would be "in the future" and have no effect.  No point
    2151             :      * in worrying about this since it's too late to save the desired data
    2152             :      * anyway.  Assuming that the standby sends us an increasing sequence of
    2153             :      * xmins, this could only happen during the first reply cycle, else our
    2154             :      * own xmin would prevent nextXid from advancing so far.
    2155             :      *
    2156             :      * We don't bother taking the ProcArrayLock here.  Setting the xmin field
    2157             :      * is assumed atomic, and there's no real need to prevent concurrent
    2158             :      * horizon determinations.  (If we're moving our xmin forward, this is
    2159             :      * obviously safe, and if we're moving it backwards, well, the data is at
    2160             :      * risk already since a VACUUM could already have determined the horizon.)
    2161             :      *
    2162             :      * If we're using a replication slot we reserve the xmin via that,
    2163             :      * otherwise via the walsender's PGPROC entry. We can only track the
    2164             :      * catalog xmin separately when using a slot, so we store the least of the
    2165             :      * two provided when not using a slot.
    2166             :      *
    2167             :      * XXX: It might make sense to generalize the ephemeral slot concept and
    2168             :      * always use the slot mechanism to handle the feedback xmin.
    2169             :      */
    2170          16 :     if (MyReplicationSlot != NULL)  /* XXX: persistency configurable? */
    2171          16 :         PhysicalReplicationSlotNewXmin(feedbackXmin, feedbackCatalogXmin);
    2172             :     else
    2173             :     {
    2174           0 :         if (TransactionIdIsNormal(feedbackCatalogXmin)
    2175           0 :             && TransactionIdPrecedes(feedbackCatalogXmin, feedbackXmin))
    2176           0 :             MyProc->xmin = feedbackCatalogXmin;
    2177             :         else
    2178           0 :             MyProc->xmin = feedbackXmin;
    2179             :     }
    2180             : }
    2181             : 
    2182             : /*
    2183             :  * Compute how long send/receive loops should sleep.
    2184             :  *
    2185             :  * If wal_sender_timeout is enabled we want to wake up in time to send
    2186             :  * keepalives and to abort the connection if wal_sender_timeout has been
    2187             :  * reached.
    2188             :  */
    2189             : static long
    2190        6366 : WalSndComputeSleeptime(TimestampTz now)
    2191             : {
    2192        6366 :     long        sleeptime = 10000;  /* 10 s */
    2193             : 
    2194        6366 :     if (wal_sender_timeout > 0 && last_reply_timestamp > 0)
    2195             :     {
    2196             :         TimestampTz wakeup_time;
    2197             : 
    2198             :         /*
    2199             :          * At the latest stop sleeping once wal_sender_timeout has been
    2200             :          * reached.
    2201             :          */
    2202        6366 :         wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
    2203             :                                                   wal_sender_timeout);
    2204             : 
    2205             :         /*
    2206             :          * If no ping has been sent yet, wakeup when it's time to do so.
    2207             :          * WalSndKeepaliveIfNecessary() wants to send a keepalive once half of
    2208             :          * the timeout passed without a response.
    2209             :          */
    2210        6366 :         if (!waiting_for_ping_response)
    2211        4834 :             wakeup_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
    2212             :                                                       wal_sender_timeout / 2);
    2213             : 
    2214             :         /* Compute relative time until wakeup. */
    2215        6366 :         sleeptime = TimestampDifferenceMilliseconds(now, wakeup_time);
    2216             :     }
    2217             : 
    2218        6366 :     return sleeptime;
    2219             : }
    2220             : 
    2221             : /*
    2222             :  * Check whether there have been responses by the client within
    2223             :  * wal_sender_timeout and shutdown if not.  Using last_processing as the
    2224             :  * reference point avoids counting server-side stalls against the client.
    2225             :  * However, a long server-side stall can make WalSndKeepaliveIfNecessary()
    2226             :  * postdate last_processing by more than wal_sender_timeout.  If that happens,
    2227             :  * the client must reply almost immediately to avoid a timeout.  This rarely
    2228             :  * affects the default configuration, under which clients spontaneously send a
    2229             :  * message every standby_message_timeout = wal_sender_timeout/6 = 10s.  We
    2230             :  * could eliminate that problem by recognizing timeout expiration at
    2231             :  * wal_sender_timeout/2 after the keepalive.
    2232             :  */
    2233             : static void
    2234      264454 : WalSndCheckTimeOut(void)
    2235             : {
    2236             :     TimestampTz timeout;
    2237             : 
    2238             :     /* don't bail out if we're doing something that doesn't require timeouts */
    2239      264454 :     if (last_reply_timestamp <= 0)
    2240           0 :         return;
    2241             : 
    2242      264454 :     timeout = TimestampTzPlusMilliseconds(last_reply_timestamp,
    2243             :                                           wal_sender_timeout);
    2244             : 
    2245      264454 :     if (wal_sender_timeout > 0 && last_processing >= timeout)
    2246             :     {
    2247             :         /*
    2248             :          * Since typically expiration of replication timeout means
    2249             :          * communication problem, we don't send the error message to the
    2250             :          * standby.
    2251             :          */
    2252           0 :         ereport(COMMERROR,
    2253             :                 (errmsg("terminating walsender process due to replication timeout")));
    2254             : 
    2255           0 :         WalSndShutdown();
    2256             :     }
    2257             : }
    2258             : 
    2259             : /* Main loop of walsender process that streams the WAL over Copy messages. */
    2260             : static void
    2261         436 : WalSndLoop(WalSndSendDataCallback send_data)
    2262             : {
    2263             :     /*
    2264             :      * Initialize the last reply timestamp. That enables timeout processing
    2265             :      * from hereon.
    2266             :      */
    2267         436 :     last_reply_timestamp = GetCurrentTimestamp();
    2268         436 :     waiting_for_ping_response = false;
    2269             : 
    2270             :     /*
    2271             :      * Loop until we reach the end of this timeline or the client requests to
    2272             :      * stop streaming.
    2273             :      */
    2274             :     for (;;)
    2275             :     {
    2276             :         /* Clear any already-pending wakeups */
    2277      263714 :         ResetLatch(MyLatch);
    2278             : 
    2279      263714 :         CHECK_FOR_INTERRUPTS();
    2280             : 
    2281             :         /* Process any requests or signals received recently */
    2282      263714 :         if (ConfigReloadPending)
    2283             :         {
    2284          28 :             ConfigReloadPending = false;
    2285          28 :             ProcessConfigFile(PGC_SIGHUP);
    2286          28 :             SyncRepInitConfig();
    2287             :         }
    2288             : 
    2289             :         /* Check for input from the client */
    2290      263714 :         ProcessRepliesIfAny();
    2291             : 
    2292             :         /*
    2293             :          * If we have received CopyDone from the client, sent CopyDone
    2294             :          * ourselves, and the output buffer is empty, it's time to exit
    2295             :          * streaming.
    2296             :          */
    2297      263642 :         if (streamingDoneReceiving && streamingDoneSending &&
    2298         442 :             !pq_is_send_pending())
    2299         266 :             break;
    2300             : 
    2301             :         /*
    2302             :          * If we don't have any pending data in the output buffer, try to send
    2303             :          * some more.  If there is some, we don't bother to call send_data
    2304             :          * again until we've flushed it ... but we'd better assume we are not
    2305             :          * caught up.
    2306             :          */
    2307      263376 :         if (!pq_is_send_pending())
    2308      259702 :             send_data();
    2309             :         else
    2310        3674 :             WalSndCaughtUp = false;
    2311             : 
    2312             :         /* Try to flush pending output to the client */
    2313      263314 :         if (pq_flush_if_writable() != 0)
    2314           0 :             WalSndShutdown();
    2315             : 
    2316             :         /* If nothing remains to be sent right now ... */
    2317      263314 :         if (WalSndCaughtUp && !pq_is_send_pending())
    2318             :         {
    2319             :             /*
    2320             :              * If we're in catchup state, move to streaming.  This is an
    2321             :              * important state change for users to know about, since before
    2322             :              * this point data loss might occur if the primary dies and we
    2323             :              * need to failover to the standby. The state change is also
    2324             :              * important for synchronous replication, since commits that
    2325             :              * started to wait at that point might wait for some time.
    2326             :              */
    2327        8280 :             if (MyWalSnd->state == WALSNDSTATE_CATCHUP)
    2328             :             {
    2329         402 :                 ereport(DEBUG1,
    2330             :                         (errmsg("\"%s\" has now caught up with upstream server",
    2331             :                                 application_name)));
    2332         402 :                 WalSndSetState(WALSNDSTATE_STREAMING);
    2333             :             }
    2334             : 
    2335             :             /*
    2336             :              * When SIGUSR2 arrives, we send any outstanding logs up to the
    2337             :              * shutdown checkpoint record (i.e., the latest record), wait for
    2338             :              * them to be replicated to the standby, and exit. This may be a
    2339             :              * normal termination at shutdown, or a promotion, the walsender
    2340             :              * is not sure which.
    2341             :              */
    2342        8280 :             if (got_SIGUSR2)
    2343        2818 :                 WalSndDone(send_data);
    2344             :         }
    2345             : 
    2346             :         /* Check for replication timeout. */
    2347      263278 :         WalSndCheckTimeOut();
    2348             : 
    2349             :         /* Send keepalive if the time has come */
    2350      263278 :         WalSndKeepaliveIfNecessary();
    2351             : 
    2352             :         /*
    2353             :          * Block if we have unsent data.  XXX For logical replication, let
    2354             :          * WalSndWaitForWal() handle any other blocking; idle receivers need
    2355             :          * its additional actions.  For physical replication, also block if
    2356             :          * caught up; its send_data does not block.
    2357             :          */
    2358      263278 :         if ((WalSndCaughtUp && send_data != XLogSendLogical &&
    2359      266124 :              !streamingDoneSending) ||
    2360      261682 :             pq_is_send_pending())
    2361             :         {
    2362             :             long        sleeptime;
    2363             :             int         wakeEvents;
    2364             : 
    2365        5196 :             wakeEvents = WL_LATCH_SET | WL_EXIT_ON_PM_DEATH | WL_TIMEOUT |
    2366             :                 WL_SOCKET_READABLE;
    2367             : 
    2368             :             /*
    2369             :              * Use fresh timestamp, not last_processing, to reduce the chance
    2370             :              * of reaching wal_sender_timeout before sending a keepalive.
    2371             :              */
    2372        5196 :             sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
    2373             : 
    2374        5196 :             if (pq_is_send_pending())
    2375        3626 :                 wakeEvents |= WL_SOCKET_WRITEABLE;
    2376             : 
    2377             :             /* Sleep until something happens or we time out */
    2378        5196 :             (void) WaitLatchOrSocket(MyLatch, wakeEvents,
    2379        5196 :                                      MyProcPort->sock, sleeptime,
    2380             :                                      WAIT_EVENT_WAL_SENDER_MAIN);
    2381             :         }
    2382             :     }
    2383         266 : }
    2384             : 
    2385             : /* Initialize a per-walsender data structure for this walsender process */
    2386             : static void
    2387         692 : InitWalSenderSlot(void)
    2388             : {
    2389             :     int         i;
    2390             : 
    2391             :     /*
    2392             :      * WalSndCtl should be set up already (we inherit this by fork() or
    2393             :      * EXEC_BACKEND mechanism from the postmaster).
    2394             :      */
    2395             :     Assert(WalSndCtl != NULL);
    2396             :     Assert(MyWalSnd == NULL);
    2397             : 
    2398             :     /*
    2399             :      * Find a free walsender slot and reserve it. This must not fail due to
    2400             :      * the prior check for free WAL senders in InitProcess().
    2401             :      */
    2402        1038 :     for (i = 0; i < max_wal_senders; i++)
    2403             :     {
    2404        1038 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    2405             : 
    2406        1038 :         SpinLockAcquire(&walsnd->mutex);
    2407             : 
    2408        1038 :         if (walsnd->pid != 0)
    2409             :         {
    2410         346 :             SpinLockRelease(&walsnd->mutex);
    2411         346 :             continue;
    2412             :         }
    2413             :         else
    2414             :         {
    2415             :             /*
    2416             :              * Found a free slot. Reserve it for us.
    2417             :              */
    2418         692 :             walsnd->pid = MyProcPid;
    2419         692 :             walsnd->state = WALSNDSTATE_STARTUP;
    2420         692 :             walsnd->sentPtr = InvalidXLogRecPtr;
    2421         692 :             walsnd->needreload = false;
    2422         692 :             walsnd->write = InvalidXLogRecPtr;
    2423         692 :             walsnd->flush = InvalidXLogRecPtr;
    2424         692 :             walsnd->apply = InvalidXLogRecPtr;
    2425         692 :             walsnd->writeLag = -1;
    2426         692 :             walsnd->flushLag = -1;
    2427         692 :             walsnd->applyLag = -1;
    2428         692 :             walsnd->sync_standby_priority = 0;
    2429         692 :             walsnd->latch = &MyProc->procLatch;
    2430         692 :             walsnd->replyTime = 0;
    2431         692 :             SpinLockRelease(&walsnd->mutex);
    2432             :             /* don't need the lock anymore */
    2433         692 :             MyWalSnd = (WalSnd *) walsnd;
    2434             : 
    2435         692 :             break;
    2436             :         }
    2437             :     }
    2438             : 
    2439             :     Assert(MyWalSnd != NULL);
    2440             : 
    2441             :     /* Arrange to clean up at walsender exit */
    2442         692 :     on_shmem_exit(WalSndKill, 0);
    2443         692 : }
    2444             : 
    2445             : /* Destroy the per-walsender data structure for this walsender process */
    2446             : static void
    2447         692 : WalSndKill(int code, Datum arg)
    2448             : {
    2449         692 :     WalSnd     *walsnd = MyWalSnd;
    2450             : 
    2451             :     Assert(walsnd != NULL);
    2452             : 
    2453         692 :     MyWalSnd = NULL;
    2454             : 
    2455         692 :     SpinLockAcquire(&walsnd->mutex);
    2456             :     /* clear latch while holding the spinlock, so it can safely be read */
    2457         692 :     walsnd->latch = NULL;
    2458             :     /* Mark WalSnd struct as no longer being in use. */
    2459         692 :     walsnd->pid = 0;
    2460         692 :     SpinLockRelease(&walsnd->mutex);
    2461         692 : }
    2462             : 
    2463             : /* XLogReaderRoutine->segment_open callback */
    2464             : static void
    2465        6160 : WalSndSegmentOpen(XLogReaderState *state, XLogSegNo nextSegNo,
    2466             :                   TimeLineID *tli_p)
    2467             : {
    2468             :     char        path[MAXPGPATH];
    2469             : 
    2470             :     /*-------
    2471             :      * When reading from a historic timeline, and there is a timeline switch
    2472             :      * within this segment, read from the WAL segment belonging to the new
    2473             :      * timeline.
    2474             :      *
    2475             :      * For example, imagine that this server is currently on timeline 5, and
    2476             :      * we're streaming timeline 4. The switch from timeline 4 to 5 happened at
    2477             :      * 0/13002088. In pg_wal, we have these files:
    2478             :      *
    2479             :      * ...
    2480             :      * 000000040000000000000012
    2481             :      * 000000040000000000000013
    2482             :      * 000000050000000000000013
    2483             :      * 000000050000000000000014
    2484             :      * ...
    2485             :      *
    2486             :      * In this situation, when requested to send the WAL from segment 0x13, on
    2487             :      * timeline 4, we read the WAL from file 000000050000000000000013. Archive
    2488             :      * recovery prefers files from newer timelines, so if the segment was
    2489             :      * restored from the archive on this server, the file belonging to the old
    2490             :      * timeline, 000000040000000000000013, might not exist. Their contents are
    2491             :      * equal up to the switchpoint, because at a timeline switch, the used
    2492             :      * portion of the old segment is copied to the new file.  -------
    2493             :      */
    2494        6160 :     *tli_p = sendTimeLine;
    2495        6160 :     if (sendTimeLineIsHistoric)
    2496             :     {
    2497             :         XLogSegNo   endSegNo;
    2498             : 
    2499          10 :         XLByteToSeg(sendTimeLineValidUpto, endSegNo, state->segcxt.ws_segsize);
    2500          10 :         if (state->seg.ws_segno == endSegNo)
    2501           0 :             *tli_p = sendTimeLineNextTLI;
    2502             :     }
    2503             : 
    2504        6160 :     XLogFilePath(path, *tli_p, nextSegNo, state->segcxt.ws_segsize);
    2505        6160 :     state->seg.ws_file = BasicOpenFile(path, O_RDONLY | PG_BINARY);
    2506        6160 :     if (state->seg.ws_file >= 0)
    2507        6154 :         return;
    2508             : 
    2509             :     /*
    2510             :      * If the file is not found, assume it's because the standby asked for a
    2511             :      * too old WAL segment that has already been removed or recycled.
    2512             :      */
    2513           6 :     if (errno == ENOENT)
    2514             :     {
    2515             :         char        xlogfname[MAXFNAMELEN];
    2516           6 :         int         save_errno = errno;
    2517             : 
    2518           6 :         XLogFileName(xlogfname, *tli_p, nextSegNo, wal_segment_size);
    2519           6 :         errno = save_errno;
    2520           6 :         ereport(ERROR,
    2521             :                 (errcode_for_file_access(),
    2522             :                  errmsg("requested WAL segment %s has already been removed",
    2523             :                         xlogfname)));
    2524             :     }
    2525             :     else
    2526           0 :         ereport(ERROR,
    2527             :                 (errcode_for_file_access(),
    2528             :                  errmsg("could not open file \"%s\": %m",
    2529             :                         path)));
    2530             : }
    2531             : 
    2532             : /*
    2533             :  * Send out the WAL in its normal physical/stored form.
    2534             :  *
    2535             :  * Read up to MAX_SEND_SIZE bytes of WAL that's been flushed to disk,
    2536             :  * but not yet sent to the client, and buffer it in the libpq output
    2537             :  * buffer.
    2538             :  *
    2539             :  * If there is no unsent WAL remaining, WalSndCaughtUp is set to true,
    2540             :  * otherwise WalSndCaughtUp is set to false.
    2541             :  */
    2542             : static void
    2543        9408 : XLogSendPhysical(void)
    2544             : {
    2545             :     XLogRecPtr  SendRqstPtr;
    2546             :     XLogRecPtr  startptr;
    2547             :     XLogRecPtr  endptr;
    2548             :     Size        nbytes;
    2549             :     XLogSegNo   segno;
    2550             :     WALReadError errinfo;
    2551             : 
    2552             :     /* If requested switch the WAL sender to the stopping state. */
    2553        9408 :     if (got_STOPPING)
    2554         200 :         WalSndSetState(WALSNDSTATE_STOPPING);
    2555             : 
    2556        9408 :     if (streamingDoneSending)
    2557             :     {
    2558        2836 :         WalSndCaughtUp = true;
    2559        3976 :         return;
    2560             :     }
    2561             : 
    2562             :     /* Figure out how far we can safely send the WAL. */
    2563        6572 :     if (sendTimeLineIsHistoric)
    2564             :     {
    2565             :         /*
    2566             :          * Streaming an old timeline that's in this server's history, but is
    2567             :          * not the one we're currently inserting or replaying. It can be
    2568             :          * streamed up to the point where we switched off that timeline.
    2569             :          */
    2570          24 :         SendRqstPtr = sendTimeLineValidUpto;
    2571             :     }
    2572        6548 :     else if (am_cascading_walsender)
    2573             :     {
    2574             :         /*
    2575             :          * Streaming the latest timeline on a standby.
    2576             :          *
    2577             :          * Attempt to send all WAL that has already been replayed, so that we
    2578             :          * know it's valid. If we're receiving WAL through streaming
    2579             :          * replication, it's also OK to send any WAL that has been received
    2580             :          * but not replayed.
    2581             :          *
    2582             :          * The timeline we're recovering from can change, or we can be
    2583             :          * promoted. In either case, the current timeline becomes historic. We
    2584             :          * need to detect that so that we don't try to stream past the point
    2585             :          * where we switched to another timeline. We check for promotion or
    2586             :          * timeline switch after calculating FlushPtr, to avoid a race
    2587             :          * condition: if the timeline becomes historic just after we checked
    2588             :          * that it was still current, it's still be OK to stream it up to the
    2589             :          * FlushPtr that was calculated before it became historic.
    2590             :          */
    2591         662 :         bool        becameHistoric = false;
    2592             : 
    2593         662 :         SendRqstPtr = GetStandbyFlushRecPtr();
    2594             : 
    2595         662 :         if (!RecoveryInProgress())
    2596             :         {
    2597             :             /*
    2598             :              * We have been promoted. RecoveryInProgress() updated
    2599             :              * ThisTimeLineID to the new current timeline.
    2600             :              */
    2601           0 :             am_cascading_walsender = false;
    2602           0 :             becameHistoric = true;
    2603             :         }
    2604             :         else
    2605             :         {
    2606             :             /*
    2607             :              * Still a cascading standby. But is the timeline we're sending
    2608             :              * still the one recovery is recovering from? ThisTimeLineID was
    2609             :              * updated by the GetStandbyFlushRecPtr() call above.
    2610             :              */
    2611         662 :             if (sendTimeLine != ThisTimeLineID)
    2612           0 :                 becameHistoric = true;
    2613             :         }
    2614             : 
    2615         662 :         if (becameHistoric)
    2616             :         {
    2617             :             /*
    2618             :              * The timeline we were sending has become historic. Read the
    2619             :              * timeline history file of the new timeline to see where exactly
    2620             :              * we forked off from the timeline we were sending.
    2621             :              */
    2622             :             List       *history;
    2623             : 
    2624           0 :             history = readTimeLineHistory(ThisTimeLineID);
    2625           0 :             sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history, &sendTimeLineNextTLI);
    2626             : 
    2627             :             Assert(sendTimeLine < sendTimeLineNextTLI);
    2628           0 :             list_free_deep(history);
    2629             : 
    2630           0 :             sendTimeLineIsHistoric = true;
    2631             : 
    2632           0 :             SendRqstPtr = sendTimeLineValidUpto;
    2633             :         }
    2634             :     }
    2635             :     else
    2636             :     {
    2637             :         /*
    2638             :          * Streaming the current timeline on a primary.
    2639             :          *
    2640             :          * Attempt to send all data that's already been written out and
    2641             :          * fsync'd to disk.  We cannot go further than what's been written out
    2642             :          * given the current implementation of WALRead().  And in any case
    2643             :          * it's unsafe to send WAL that is not securely down to disk on the
    2644             :          * primary: if the primary subsequently crashes and restarts, standbys
    2645             :          * must not have applied any WAL that got lost on the primary.
    2646             :          */
    2647        5886 :         SendRqstPtr = GetFlushRecPtr();
    2648             :     }
    2649             : 
    2650             :     /*
    2651             :      * Record the current system time as an approximation of the time at which
    2652             :      * this WAL location was written for the purposes of lag tracking.
    2653             :      *
    2654             :      * In theory we could make XLogFlush() record a time in shmem whenever WAL
    2655             :      * is flushed and we could get that time as well as the LSN when we call
    2656             :      * GetFlushRecPtr() above (and likewise for the cascading standby
    2657             :      * equivalent), but rather than putting any new code into the hot WAL path
    2658             :      * it seems good enough to capture the time here.  We should reach this
    2659             :      * after XLogFlush() runs WalSndWakeupProcessRequests(), and although that
    2660             :      * may take some time, we read the WAL flush pointer and take the time
    2661             :      * very close to together here so that we'll get a later position if it is
    2662             :      * still moving.
    2663             :      *
    2664             :      * Because LagTrackerWrite ignores samples when the LSN hasn't advanced,
    2665             :      * this gives us a cheap approximation for the WAL flush time for this
    2666             :      * LSN.
    2667             :      *
    2668             :      * Note that the LSN is not necessarily the LSN for the data contained in
    2669             :      * the present message; it's the end of the WAL, which might be further
    2670             :      * ahead.  All the lag tracking machinery cares about is finding out when
    2671             :      * that arbitrary LSN is eventually reported as written, flushed and
    2672             :      * applied, so that it can measure the elapsed time.
    2673             :      */
    2674        6572 :     LagTrackerWrite(SendRqstPtr, GetCurrentTimestamp());
    2675             : 
    2676             :     /*
    2677             :      * If this is a historic timeline and we've reached the point where we
    2678             :      * forked to the next timeline, stop streaming.
    2679             :      *
    2680             :      * Note: We might already have sent WAL > sendTimeLineValidUpto. The
    2681             :      * startup process will normally replay all WAL that has been received
    2682             :      * from the primary, before promoting, but if the WAL streaming is
    2683             :      * terminated at a WAL page boundary, the valid portion of the timeline
    2684             :      * might end in the middle of a WAL record. We might've already sent the
    2685             :      * first half of that partial WAL record to the cascading standby, so that
    2686             :      * sentPtr > sendTimeLineValidUpto. That's OK; the cascading standby can't
    2687             :      * replay the partial WAL record either, so it can still follow our
    2688             :      * timeline switch.
    2689             :      */
    2690        6572 :     if (sendTimeLineIsHistoric && sendTimeLineValidUpto <= sentPtr)
    2691             :     {
    2692             :         /* close the current file. */
    2693          10 :         if (xlogreader->seg.ws_file >= 0)
    2694          10 :             wal_segment_close(xlogreader);
    2695             : 
    2696             :         /* Send CopyDone */
    2697          10 :         pq_putmessage_noblock('c', NULL, 0);
    2698          10 :         streamingDoneSending = true;
    2699             : 
    2700          10 :         WalSndCaughtUp = true;
    2701             : 
    2702          10 :         elog(DEBUG1, "walsender reached end of timeline at %X/%X (sent up to %X/%X)",
    2703             :              (uint32) (sendTimeLineValidUpto >> 32), (uint32) sendTimeLineValidUpto,
    2704             :              (uint32) (sentPtr >> 32), (uint32) sentPtr);
    2705          10 :         return;
    2706             :     }
    2707             : 
    2708             :     /* Do we have any work to do? */
    2709             :     Assert(sentPtr <= SendRqstPtr);
    2710        6562 :     if (SendRqstPtr <= sentPtr)
    2711             :     {
    2712        1130 :         WalSndCaughtUp = true;
    2713        1130 :         return;
    2714             :     }
    2715             : 
    2716             :     /*
    2717             :      * Figure out how much to send in one message. If there's no more than
    2718             :      * MAX_SEND_SIZE bytes to send, send everything. Otherwise send
    2719             :      * MAX_SEND_SIZE bytes, but round back to logfile or page boundary.
    2720             :      *
    2721             :      * The rounding is not only for performance reasons. Walreceiver relies on
    2722             :      * the fact that we never split a WAL record across two messages. Since a
    2723             :      * long WAL record is split at page boundary into continuation records,
    2724             :      * page boundary is always a safe cut-off point. We also assume that
    2725             :      * SendRqstPtr never points to the middle of a WAL record.
    2726             :      */
    2727        5432 :     startptr = sentPtr;
    2728        5432 :     endptr = startptr;
    2729        5432 :     endptr += MAX_SEND_SIZE;
    2730             : 
    2731             :     /* if we went beyond SendRqstPtr, back off */
    2732        5432 :     if (SendRqstPtr <= endptr)
    2733             :     {
    2734         548 :         endptr = SendRqstPtr;
    2735         548 :         if (sendTimeLineIsHistoric)
    2736          10 :             WalSndCaughtUp = false;
    2737             :         else
    2738         538 :             WalSndCaughtUp = true;
    2739             :     }
    2740             :     else
    2741             :     {
    2742             :         /* round down to page boundary. */
    2743        4884 :         endptr -= (endptr % XLOG_BLCKSZ);
    2744        4884 :         WalSndCaughtUp = false;
    2745             :     }
    2746             : 
    2747        5432 :     nbytes = endptr - startptr;
    2748             :     Assert(nbytes <= MAX_SEND_SIZE);
    2749             : 
    2750             :     /*
    2751             :      * OK to read and send the slice.
    2752             :      */
    2753        5432 :     resetStringInfo(&output_message);
    2754        5432 :     pq_sendbyte(&output_message, 'w');
    2755             : 
    2756        5432 :     pq_sendint64(&output_message, startptr);    /* dataStart */
    2757        5432 :     pq_sendint64(&output_message, SendRqstPtr); /* walEnd */
    2758        5432 :     pq_sendint64(&output_message, 0);   /* sendtime, filled in last */
    2759             : 
    2760             :     /*
    2761             :      * Read the log directly into the output buffer to avoid extra memcpy
    2762             :      * calls.
    2763             :      */
    2764        5432 :     enlargeStringInfo(&output_message, nbytes);
    2765             : 
    2766        5432 : retry:
    2767        5426 :     if (!WALRead(xlogreader,
    2768        5432 :                  &output_message.data[output_message.len],
    2769             :                  startptr,
    2770             :                  nbytes,
    2771        5432 :                  xlogreader->seg.ws_tli, /* Pass the current TLI because
    2772             :                                              * only WalSndSegmentOpen controls
    2773             :                                              * whether new TLI is needed. */
    2774             :                  &errinfo))
    2775           0 :         WALReadRaiseError(&errinfo);
    2776             : 
    2777             :     /* See logical_read_xlog_page(). */
    2778        5426 :     XLByteToSeg(startptr, segno, xlogreader->segcxt.ws_segsize);
    2779        5426 :     CheckXLogRemoved(segno, xlogreader->seg.ws_tli);
    2780             : 
    2781             :     /*
    2782             :      * During recovery, the currently-open WAL file might be replaced with the
    2783             :      * file of the same name retrieved from archive. So we always need to
    2784             :      * check what we read was valid after reading into the buffer. If it's
    2785             :      * invalid, we try to open and read the file again.
    2786             :      */
    2787        5426 :     if (am_cascading_walsender)
    2788             :     {
    2789         560 :         WalSnd     *walsnd = MyWalSnd;
    2790             :         bool        reload;
    2791             : 
    2792         560 :         SpinLockAcquire(&walsnd->mutex);
    2793         560 :         reload = walsnd->needreload;
    2794         560 :         walsnd->needreload = false;
    2795         560 :         SpinLockRelease(&walsnd->mutex);
    2796             : 
    2797         560 :         if (reload && xlogreader->seg.ws_file >= 0)
    2798             :         {
    2799           0 :             wal_segment_close(xlogreader);
    2800             : 
    2801           0 :             goto retry;
    2802             :         }
    2803             :     }
    2804             : 
    2805        5426 :     output_message.len += nbytes;
    2806        5426 :     output_message.data[output_message.len] = '\0';
    2807             : 
    2808             :     /*
    2809             :      * Fill the send timestamp last, so that it is taken as late as possible.
    2810             :      */
    2811        5426 :     resetStringInfo(&tmpbuf);
    2812        5426 :     pq_sendint64(&tmpbuf, GetCurrentTimestamp());
    2813        5426 :     memcpy(&output_message.data[1 + sizeof(int64) + sizeof(int64)],
    2814        5426 :            tmpbuf.data, sizeof(int64));
    2815             : 
    2816        5426 :     pq_putmessage_noblock('d', output_message.data, output_message.len);
    2817             : 
    2818        5426 :     sentPtr = endptr;
    2819             : 
    2820             :     /* Update shared memory status */
    2821             :     {
    2822        5426 :         WalSnd     *walsnd = MyWalSnd;
    2823             : 
    2824        5426 :         SpinLockAcquire(&walsnd->mutex);
    2825        5426 :         walsnd->sentPtr = sentPtr;
    2826        5426 :         SpinLockRelease(&walsnd->mutex);
    2827             :     }
    2828             : 
    2829             :     /* Report progress of XLOG streaming in PS display */
    2830        5426 :     if (update_process_title)
    2831             :     {
    2832             :         char        activitymsg[50];
    2833             : 
    2834       10852 :         snprintf(activitymsg, sizeof(activitymsg), "streaming %X/%X",
    2835        5426 :                  (uint32) (sentPtr >> 32), (uint32) sentPtr);
    2836        5426 :         set_ps_display(activitymsg);
    2837             :     }
    2838             : }
    2839             : 
    2840             : /*
    2841             :  * Stream out logically decoded data.
    2842             :  */
    2843             : static void
    2844      253112 : XLogSendLogical(void)
    2845             : {
    2846             :     XLogRecord *record;
    2847             :     char       *errm;
    2848             : 
    2849             :     /*
    2850             :      * We'll use the current flush point to determine whether we've caught up.
    2851             :      * This variable is static in order to cache it across calls.  Caching is
    2852             :      * helpful because GetFlushRecPtr() needs to acquire a heavily-contended
    2853             :      * spinlock.
    2854             :      */
    2855             :     static XLogRecPtr flushPtr = InvalidXLogRecPtr;
    2856             : 
    2857             :     /*
    2858             :      * Don't know whether we've caught up yet. We'll set WalSndCaughtUp to
    2859             :      * true in WalSndWaitForWal, if we're actually waiting. We also set to
    2860             :      * true if XLogReadRecord() had to stop reading but WalSndWaitForWal
    2861             :      * didn't wait - i.e. when we're shutting down.
    2862             :      */
    2863      253112 :     WalSndCaughtUp = false;
    2864             : 
    2865      253112 :     record = XLogReadRecord(logical_decoding_ctx->reader, &errm);
    2866             : 
    2867             :     /* xlog record was invalid */
    2868      253056 :     if (errm != NULL)
    2869           0 :         elog(ERROR, "%s", errm);
    2870             : 
    2871      253056 :     if (record != NULL)
    2872             :     {
    2873             :         /*
    2874             :          * Note the lack of any call to LagTrackerWrite() which is handled by
    2875             :          * WalSndUpdateProgress which is called by output plugin through
    2876             :          * logical decoding write api.
    2877             :          */
    2878      247406 :         LogicalDecodingProcessRecord(logical_decoding_ctx, logical_decoding_ctx->reader);
    2879             : 
    2880      247406 :         sentPtr = logical_decoding_ctx->reader->EndRecPtr;
    2881             :     }
    2882             : 
    2883             :     /*
    2884             :      * If first time through in this session, initialize flushPtr.  Otherwise,
    2885             :      * we only need to update flushPtr if EndRecPtr is past it.
    2886             :      */
    2887      253056 :     if (flushPtr == InvalidXLogRecPtr)
    2888         158 :         flushPtr = GetFlushRecPtr();
    2889      252898 :     else if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
    2890        6488 :         flushPtr = GetFlushRecPtr();
    2891             : 
    2892             :     /* If EndRecPtr is still past our flushPtr, it means we caught up. */
    2893      253056 :     if (logical_decoding_ctx->reader->EndRecPtr >= flushPtr)
    2894        6256 :         WalSndCaughtUp = true;
    2895             : 
    2896             :     /*
    2897             :      * If we're caught up and have been requested to stop, have WalSndLoop()
    2898             :      * terminate the connection in an orderly manner, after writing out all
    2899             :      * the pending data.
    2900             :      */
    2901      253056 :     if (WalSndCaughtUp && got_STOPPING)
    2902        5556 :         got_SIGUSR2 = true;
    2903             : 
    2904             :     /* Update shared memory status */
    2905             :     {
    2906      253056 :         WalSnd     *walsnd = MyWalSnd;
    2907             : 
    2908      253056 :         SpinLockAcquire(&walsnd->mutex);
    2909      253056 :         walsnd->sentPtr = sentPtr;
    2910      253056 :         SpinLockRelease(&walsnd->mutex);
    2911             :     }
    2912      253056 : }
    2913             : 
    2914             : /*
    2915             :  * Shutdown if the sender is caught up.
    2916             :  *
    2917             :  * NB: This should only be called when the shutdown signal has been received
    2918             :  * from postmaster.
    2919             :  *
    2920             :  * Note that if we determine that there's still more data to send, this
    2921             :  * function will return control to the caller.
    2922             :  */
    2923             : static void
    2924        2818 : WalSndDone(WalSndSendDataCallback send_data)
    2925             : {
    2926             :     XLogRecPtr  replicatedPtr;
    2927             : 
    2928             :     /* ... let's just be real sure we're caught up ... */
    2929        2818 :     send_data();
    2930             : 
    2931             :     /*
    2932             :      * To figure out whether all WAL has successfully been replicated, check
    2933             :      * flush location if valid, write otherwise. Tools like pg_receivewal will
    2934             :      * usually (unless in synchronous mode) return an invalid flush location.
    2935             :      */
    2936        5636 :     replicatedPtr = XLogRecPtrIsInvalid(MyWalSnd->flush) ?
    2937        2818 :         MyWalSnd->write : MyWalSnd->flush;
    2938             : 
    2939        2818 :     if (WalSndCaughtUp && sentPtr == replicatedPtr &&
    2940          36 :         !pq_is_send_pending())
    2941             :     {
    2942             :         QueryCompletion qc;
    2943             : 
    2944             :         /* Inform the standby that XLOG streaming is done */
    2945          36 :         SetQueryCompletion(&qc, CMDTAG_COPY, 0);
    2946          36 :         EndCommand(&qc, DestRemote, false);
    2947          36 :         pq_flush();
    2948             : 
    2949          36 :         proc_exit(0);
    2950             :     }
    2951        2782 :     if (!waiting_for_ping_response)
    2952        1528 :         WalSndKeepalive(true);
    2953        2782 : }
    2954             : 
    2955             : /*
    2956             :  * Returns the latest point in WAL that has been safely flushed to disk, and
    2957             :  * can be sent to the standby. This should only be called when in recovery,
    2958             :  * ie. we're streaming to a cascaded standby.
    2959             :  *
    2960             :  * As a side-effect, ThisTimeLineID is updated to the TLI of the last
    2961             :  * replayed WAL record.
    2962             :  */
    2963             : static XLogRecPtr
    2964         692 : GetStandbyFlushRecPtr(void)
    2965             : {
    2966             :     XLogRecPtr  replayPtr;
    2967             :     TimeLineID  replayTLI;
    2968             :     XLogRecPtr  receivePtr;
    2969             :     TimeLineID  receiveTLI;
    2970             :     XLogRecPtr  result;
    2971             : 
    2972             :     /*
    2973             :      * We can safely send what's already been replayed. Also, if walreceiver
    2974             :      * is streaming WAL from the same timeline, we can send anything that it
    2975             :      * has streamed, but hasn't been replayed yet.
    2976             :      */
    2977             : 
    2978         692 :     receivePtr = GetWalRcvFlushRecPtr(NULL, &receiveTLI);
    2979         692 :     replayPtr = GetXLogReplayRecPtr(&replayTLI);
    2980             : 
    2981         692 :     ThisTimeLineID = replayTLI;
    2982             : 
    2983         692 :     result = replayPtr;
    2984         692 :     if (receiveTLI == ThisTimeLineID && receivePtr > replayPtr)
    2985          36 :         result = receivePtr;
    2986             : 
    2987         692 :     return result;
    2988             : }
    2989             : 
    2990             : /*
    2991             :  * Request walsenders to reload the currently-open WAL file
    2992             :  */
    2993             : void
    2994          12 : WalSndRqstFileReload(void)
    2995             : {
    2996             :     int         i;
    2997             : 
    2998         116 :     for (i = 0; i < max_wal_senders; i++)
    2999             :     {
    3000         104 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3001             : 
    3002         104 :         SpinLockAcquire(&walsnd->mutex);
    3003         104 :         if (walsnd->pid == 0)
    3004             :         {
    3005         104 :             SpinLockRelease(&walsnd->mutex);
    3006         104 :             continue;
    3007             :         }
    3008           0 :         walsnd->needreload = true;
    3009           0 :         SpinLockRelease(&walsnd->mutex);
    3010             :     }
    3011          12 : }
    3012             : 
    3013             : /*
    3014             :  * Handle PROCSIG_WALSND_INIT_STOPPING signal.
    3015             :  */
    3016             : void
    3017          36 : HandleWalSndInitStopping(void)
    3018             : {
    3019             :     Assert(am_walsender);
    3020             : 
    3021             :     /*
    3022             :      * If replication has not yet started, die like with SIGTERM. If
    3023             :      * replication is active, only set a flag and wake up the main loop. It
    3024             :      * will send any outstanding WAL, wait for it to be replicated to the
    3025             :      * standby, and then exit gracefully.
    3026             :      */
    3027          36 :     if (!replication_active)
    3028           0 :         kill(MyProcPid, SIGTERM);
    3029             :     else
    3030          36 :         got_STOPPING = true;
    3031          36 : }
    3032             : 
    3033             : /*
    3034             :  * SIGUSR2: set flag to do a last cycle and shut down afterwards. The WAL
    3035             :  * sender should already have been switched to WALSNDSTATE_STOPPING at
    3036             :  * this point.
    3037             :  */
    3038             : static void
    3039          32 : WalSndLastCycleHandler(SIGNAL_ARGS)
    3040             : {
    3041          32 :     int         save_errno = errno;
    3042             : 
    3043          32 :     got_SIGUSR2 = true;
    3044          32 :     SetLatch(MyLatch);
    3045             : 
    3046          32 :     errno = save_errno;
    3047          32 : }
    3048             : 
    3049             : /* Set up signal handlers */
    3050             : void
    3051         696 : WalSndSignals(void)
    3052             : {
    3053             :     /* Set up signal handlers */
    3054         696 :     pqsignal(SIGHUP, SignalHandlerForConfigReload);
    3055         696 :     pqsignal(SIGINT, StatementCancelHandler);   /* query cancel */
    3056         696 :     pqsignal(SIGTERM, die);     /* request shutdown */
    3057             :     /* SIGQUIT handler was already set up by InitPostmasterChild */
    3058         696 :     InitializeTimeouts();       /* establishes SIGALRM handler */
    3059         696 :     pqsignal(SIGPIPE, SIG_IGN);
    3060         696 :     pqsignal(SIGUSR1, procsignal_sigusr1_handler);
    3061         696 :     pqsignal(SIGUSR2, WalSndLastCycleHandler);  /* request a last cycle and
    3062             :                                                  * shutdown */
    3063             : 
    3064             :     /* Reset some signals that are accepted by postmaster but not here */
    3065         696 :     pqsignal(SIGCHLD, SIG_DFL);
    3066         696 : }
    3067             : 
    3068             : /* Report shared-memory space needed by WalSndShmemInit */
    3069             : Size
    3070        7120 : WalSndShmemSize(void)
    3071             : {
    3072        7120 :     Size        size = 0;
    3073             : 
    3074        7120 :     size = offsetof(WalSndCtlData, walsnds);
    3075        7120 :     size = add_size(size, mul_size(max_wal_senders, sizeof(WalSnd)));
    3076             : 
    3077        7120 :     return size;
    3078             : }
    3079             : 
    3080             : /* Allocate and initialize walsender-related shared memory */
    3081             : void
    3082        2372 : WalSndShmemInit(void)
    3083             : {
    3084             :     bool        found;
    3085             :     int         i;
    3086             : 
    3087        2372 :     WalSndCtl = (WalSndCtlData *)
    3088        2372 :         ShmemInitStruct("Wal Sender Ctl", WalSndShmemSize(), &found);
    3089             : 
    3090        2372 :     if (!found)
    3091             :     {
    3092             :         /* First time through, so initialize */
    3093        6084 :         MemSet(WalSndCtl, 0, WalSndShmemSize());
    3094             : 
    3095        9488 :         for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; i++)
    3096        7116 :             SHMQueueInit(&(WalSndCtl->SyncRepQueue[i]));
    3097             : 
    3098       23758 :         for (i = 0; i < max_wal_senders; i++)
    3099             :         {
    3100       21386 :             WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3101             : 
    3102       21386 :             SpinLockInit(&walsnd->mutex);
    3103             :         }
    3104             :     }
    3105        2372 : }
    3106             : 
    3107             : /*
    3108             :  * Wake up all walsenders
    3109             :  *
    3110             :  * This will be called inside critical sections, so throwing an error is not
    3111             :  * advisable.
    3112             :  */
    3113             : void
    3114      286748 : WalSndWakeup(void)
    3115             : {
    3116             :     int         i;
    3117             : 
    3118     3150872 :     for (i = 0; i < max_wal_senders; i++)
    3119             :     {
    3120             :         Latch      *latch;
    3121     2864124 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3122             : 
    3123             :         /*
    3124             :          * Get latch pointer with spinlock held, for the unlikely case that
    3125             :          * pointer reads aren't atomic (as they're 8 bytes).
    3126             :          */
    3127     2864124 :         SpinLockAcquire(&walsnd->mutex);
    3128     2864124 :         latch = walsnd->latch;
    3129     2864124 :         SpinLockRelease(&walsnd->mutex);
    3130             : 
    3131     2864124 :         if (latch != NULL)
    3132        1858 :             SetLatch(latch);
    3133             :     }
    3134      286748 : }
    3135             : 
    3136             : /*
    3137             :  * Signal all walsenders to move to stopping state.
    3138             :  *
    3139             :  * This will trigger walsenders to move to a state where no further WAL can be
    3140             :  * generated. See this file's header for details.
    3141             :  */
    3142             : void
    3143        1242 : WalSndInitStopping(void)
    3144             : {
    3145             :     int         i;
    3146             : 
    3147       12692 :     for (i = 0; i < max_wal_senders; i++)
    3148             :     {
    3149       11450 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3150             :         pid_t       pid;
    3151             : 
    3152       11450 :         SpinLockAcquire(&walsnd->mutex);
    3153       11450 :         pid = walsnd->pid;
    3154       11450 :         SpinLockRelease(&walsnd->mutex);
    3155             : 
    3156       11450 :         if (pid == 0)
    3157       11414 :             continue;
    3158             : 
    3159          36 :         SendProcSignal(pid, PROCSIG_WALSND_INIT_STOPPING, InvalidBackendId);
    3160             :     }
    3161        1242 : }
    3162             : 
    3163             : /*
    3164             :  * Wait that all the WAL senders have quit or reached the stopping state. This
    3165             :  * is used by the checkpointer to control when the shutdown checkpoint can
    3166             :  * safely be performed.
    3167             :  */
    3168             : void
    3169        1286 : WalSndWaitStopping(void)
    3170             : {
    3171             :     for (;;)
    3172          44 :     {
    3173             :         int         i;
    3174        1286 :         bool        all_stopped = true;
    3175             : 
    3176       12736 :         for (i = 0; i < max_wal_senders; i++)
    3177             :         {
    3178       11494 :             WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3179             : 
    3180       11494 :             SpinLockAcquire(&walsnd->mutex);
    3181             : 
    3182       11494 :             if (walsnd->pid == 0)
    3183             :             {
    3184       11418 :                 SpinLockRelease(&walsnd->mutex);
    3185       11418 :                 continue;
    3186             :             }
    3187             : 
    3188          76 :             if (walsnd->state != WALSNDSTATE_STOPPING)
    3189             :             {
    3190          44 :                 all_stopped = false;
    3191          44 :                 SpinLockRelease(&walsnd->mutex);
    3192          44 :                 break;
    3193             :             }
    3194          32 :             SpinLockRelease(&walsnd->mutex);
    3195             :         }
    3196             : 
    3197             :         /* safe to leave if confirmation is done for all WAL senders */
    3198        1286 :         if (all_stopped)
    3199        1242 :             return;
    3200             : 
    3201          44 :         pg_usleep(10000L);      /* wait for 10 msec */
    3202             :     }
    3203             : }
    3204             : 
    3205             : /* Set state for current walsender (only called in walsender) */
    3206             : void
    3207        1482 : WalSndSetState(WalSndState state)
    3208             : {
    3209        1482 :     WalSnd     *walsnd = MyWalSnd;
    3210             : 
    3211             :     Assert(am_walsender);
    3212             : 
    3213        1482 :     if (walsnd->state == state)
    3214         178 :         return;
    3215             : 
    3216        1304 :     SpinLockAcquire(&walsnd->mutex);
    3217        1304 :     walsnd->state = state;
    3218        1304 :     SpinLockRelease(&walsnd->mutex);
    3219             : }
    3220             : 
    3221             : /*
    3222             :  * Return a string constant representing the state. This is used
    3223             :  * in system views, and should *not* be translated.
    3224             :  */
    3225             : static const char *
    3226         502 : WalSndGetStateString(WalSndState state)
    3227             : {
    3228         502 :     switch (state)
    3229             :     {
    3230           0 :         case WALSNDSTATE_STARTUP:
    3231           0 :             return "startup";
    3232           0 :         case WALSNDSTATE_BACKUP:
    3233           0 :             return "backup";
    3234           2 :         case WALSNDSTATE_CATCHUP:
    3235           2 :             return "catchup";
    3236         500 :         case WALSNDSTATE_STREAMING:
    3237         500 :             return "streaming";
    3238           0 :         case WALSNDSTATE_STOPPING:
    3239           0 :             return "stopping";
    3240             :     }
    3241           0 :     return "UNKNOWN";
    3242             : }
    3243             : 
    3244             : static Interval *
    3245         786 : offset_to_interval(TimeOffset offset)
    3246             : {
    3247         786 :     Interval   *result = palloc(sizeof(Interval));
    3248             : 
    3249         786 :     result->month = 0;
    3250         786 :     result->day = 0;
    3251         786 :     result->time = offset;
    3252             : 
    3253         786 :     return result;
    3254             : }
    3255             : 
    3256             : /*
    3257             :  * Returns activity of walsenders, including pids and xlog locations sent to
    3258             :  * standby servers.
    3259             :  */
    3260             : Datum
    3261         420 : pg_stat_get_wal_senders(PG_FUNCTION_ARGS)
    3262             : {
    3263             : #define PG_STAT_GET_WAL_SENDERS_COLS    12
    3264         420 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    3265             :     TupleDesc   tupdesc;
    3266             :     Tuplestorestate *tupstore;
    3267             :     MemoryContext per_query_ctx;
    3268             :     MemoryContext oldcontext;
    3269             :     SyncRepStandbyData *sync_standbys;
    3270             :     int         num_standbys;
    3271             :     int         i;
    3272             : 
    3273             :     /* check to see if caller supports us returning a tuplestore */
    3274         420 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
    3275           0 :         ereport(ERROR,
    3276             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3277             :                  errmsg("set-valued function called in context that cannot accept a set")));
    3278         420 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
    3279           0 :         ereport(ERROR,
    3280             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    3281             :                  errmsg("materialize mode required, but it is not allowed in this context")));
    3282             : 
    3283             :     /* Build a tuple descriptor for our result type */
    3284         420 :     if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
    3285           0 :         elog(ERROR, "return type must be a row type");
    3286             : 
    3287         420 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    3288         420 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    3289             : 
    3290         420 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    3291         420 :     rsinfo->returnMode = SFRM_Materialize;
    3292         420 :     rsinfo->setResult = tupstore;
    3293         420 :     rsinfo->setDesc = tupdesc;
    3294             : 
    3295         420 :     MemoryContextSwitchTo(oldcontext);
    3296             : 
    3297             :     /*
    3298             :      * Get the currently active synchronous standbys.  This could be out of
    3299             :      * date before we're done, but we'll use the data anyway.
    3300             :      */
    3301         420 :     num_standbys = SyncRepGetCandidateStandbys(&sync_standbys);
    3302             : 
    3303        4604 :     for (i = 0; i < max_wal_senders; i++)
    3304             :     {
    3305        4184 :         WalSnd     *walsnd = &WalSndCtl->walsnds[i];
    3306             :         XLogRecPtr  sentPtr;
    3307             :         XLogRecPtr  write;
    3308             :         XLogRecPtr  flush;
    3309             :         XLogRecPtr  apply;
    3310             :         TimeOffset  writeLag;
    3311             :         TimeOffset  flushLag;
    3312             :         TimeOffset  applyLag;
    3313             :         int         priority;
    3314             :         int         pid;
    3315             :         WalSndState state;
    3316             :         TimestampTz replyTime;
    3317             :         bool        is_sync_standby;
    3318             :         Datum       values[PG_STAT_GET_WAL_SENDERS_COLS];
    3319             :         bool        nulls[PG_STAT_GET_WAL_SENDERS_COLS];
    3320             :         int         j;
    3321             : 
    3322             :         /* Collect data from shared memory */
    3323        4184 :         SpinLockAcquire(&walsnd->mutex);
    3324        4184 :         if (walsnd->pid == 0)
    3325             :         {
    3326        3682 :             SpinLockRelease(&walsnd->mutex);
    3327        3682 :             continue;
    3328             :         }
    3329         502 :         pid = walsnd->pid;
    3330         502 :         sentPtr = walsnd->sentPtr;
    3331         502 :         state = walsnd->state;
    3332         502 :         write = walsnd->write;
    3333         502 :         flush = walsnd->flush;
    3334         502 :         apply = walsnd->apply;
    3335         502 :         writeLag = walsnd->writeLag;
    3336         502 :         flushLag = walsnd->flushLag;
    3337         502 :         applyLag = walsnd->applyLag;
    3338         502 :         priority = walsnd->sync_standby_priority;
    3339         502 :         replyTime = walsnd->replyTime;
    3340         502 :         SpinLockRelease(&walsnd->mutex);
    3341             : 
    3342             :         /*
    3343             :          * Detect whether walsender is/was considered synchronous.  We can
    3344             :          * provide some protection against stale data by checking the PID
    3345             :          * along with walsnd_index.
    3346             :          */
    3347         502 :         is_sync_standby = false;
    3348         582 :         for (j = 0; j < num_standbys; j++)
    3349             :         {
    3350         132 :             if (sync_standbys[j].walsnd_index == i &&
    3351          52 :                 sync_standbys[j].pid == pid)
    3352             :             {
    3353          52 :                 is_sync_standby = true;
    3354          52 :                 break;
    3355             :             }
    3356             :         }
    3357             : 
    3358         502 :         memset(nulls, 0, sizeof(nulls));
    3359         502 :         values[0] = Int32GetDatum(pid);
    3360             : 
    3361         502 :         if (!is_member_of_role(GetUserId(), DEFAULT_ROLE_READ_ALL_STATS))
    3362             :         {
    3363             :             /*
    3364             :              * Only superusers and members of pg_read_all_stats can see
    3365             :              * details. Other users only get the pid value to know it's a
    3366             :              * walsender, but no details.
    3367             :              */
    3368           0 :             MemSet(&nulls[1], true, PG_STAT_GET_WAL_SENDERS_COLS - 1);
    3369             :         }
    3370             :         else
    3371             :         {
    3372         502 :             values[1] = CStringGetTextDatum(WalSndGetStateString(state));
    3373             : 
    3374         502 :             if (XLogRecPtrIsInvalid(sentPtr))
    3375           0 :                 nulls[2] = true;
    3376         502 :             values[2] = LSNGetDatum(sentPtr);
    3377             : 
    3378         502 :             if (XLogRecPtrIsInvalid(write))
    3379          18 :                 nulls[3] = true;
    3380         502 :             values[3] = LSNGetDatum(write);
    3381             : 
    3382         502 :             if (XLogRecPtrIsInvalid(flush))
    3383          18 :                 nulls[4] = true;
    3384         502 :             values[4] = LSNGetDatum(flush);
    3385             : 
    3386         502 :             if (XLogRecPtrIsInvalid(apply))
    3387          18 :                 nulls[5] = true;
    3388         502 :             values[5] = LSNGetDatum(apply);
    3389             : 
    3390             :             /*
    3391             :              * Treat a standby such as a pg_basebackup background process
    3392             :              * which always returns an invalid flush location, as an
    3393             :              * asynchronous standby.
    3394             :              */
    3395         502 :             priority = XLogRecPtrIsInvalid(flush) ? 0 : priority;
    3396             : 
    3397         502 :             if (writeLag < 0)
    3398         258 :                 nulls[6] = true;
    3399             :             else
    3400         244 :                 values[6] = IntervalPGetDatum(offset_to_interval(writeLag));
    3401             : 
    3402         502 :             if (flushLag < 0)
    3403         204 :                 nulls[7] = true;
    3404             :             else
    3405         298 :                 values[7] = IntervalPGetDatum(offset_to_interval(flushLag));
    3406             : 
    3407         502 :             if (applyLag < 0)
    3408         258 :                 nulls[8] = true;
    3409             :             else
    3410         244 :                 values[8] = IntervalPGetDatum(offset_to_interval(applyLag));
    3411             : 
    3412         502 :             values[9] = Int32GetDatum(priority);
    3413             : 
    3414             :             /*
    3415             :              * More easily understood version of standby state. This is purely
    3416             :              * informational.
    3417             :              *
    3418             :              * In quorum-based sync replication, the role of each standby
    3419             :              * listed in synchronous_standby_names can be changing very
    3420             :              * frequently. Any standbys considered as "sync" at one moment can
    3421             :              * be switched to "potential" ones at the next moment. So, it's
    3422             :              * basically useless to report "sync" or "potential" as their sync
    3423             :              * states. We report just "quorum" for them.
    3424             :              */
    3425         502 :             if (priority == 0)
    3426         430 :                 values[10] = CStringGetTextDatum("async");
    3427          72 :             else if (is_sync_standby)
    3428          52 :                 values[10] = SyncRepConfig->syncrep_method == SYNC_REP_PRIORITY ?
    3429          52 :                     CStringGetTextDatum("sync") : CStringGetTextDatum("quorum");
    3430             :             else
    3431          20 :                 values[10] = CStringGetTextDatum("potential");
    3432             : 
    3433         502 :             if (replyTime == 0)
    3434          16 :                 nulls[11] = true;
    3435             :             else
    3436         486 :                 values[11] = TimestampTzGetDatum(replyTime);
    3437             :         }
    3438             : 
    3439         502 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    3440             :     }
    3441             : 
    3442             :     /* clean up and return the tuplestore */
    3443             :     tuplestore_donestoring(tupstore);
    3444             : 
    3445         420 :     return (Datum) 0;
    3446             : }
    3447             : 
    3448             : /*
    3449             :  * Send a keepalive message to standby.
    3450             :  *
    3451             :  * If requestReply is set, the message requests the other party to send
    3452             :  * a message back to us, for heartbeat purposes.  We also set a flag to
    3453             :  * let nearby code that we're waiting for that response, to avoid
    3454             :  * repeated requests.
    3455             :  */
    3456             : static void
    3457        2168 : WalSndKeepalive(bool requestReply)
    3458             : {
    3459        2168 :     elog(DEBUG2, "sending replication keepalive");
    3460             : 
    3461             :     /* construct the message... */
    3462        2168 :     resetStringInfo(&output_message);
    3463        2168 :     pq_sendbyte(&output_message, 'k');
    3464        2168 :     pq_sendint64(&output_message, sentPtr);
    3465        2168 :     pq_sendint64(&output_message, GetCurrentTimestamp());
    3466        2168 :     pq_sendbyte(&output_message, requestReply ? 1 : 0);
    3467             : 
    3468             :     /* ... and send it wrapped in CopyData */
    3469        2168 :     pq_putmessage_noblock('d', output_message.data, output_message.len);
    3470             : 
    3471             :     /* Set local flag */
    3472        2168 :     if (requestReply)
    3473        1528 :         waiting_for_ping_response = true;
    3474        2168 : }
    3475             : 
    3476             : /*
    3477             :  * Send keepalive message if too much time has elapsed.
    3478             :  */
    3479             : static void
    3480      264454 : WalSndKeepaliveIfNecessary(void)
    3481             : {
    3482             :     TimestampTz ping_time;
    3483             : 
    3484             :     /*
    3485             :      * Don't send keepalive messages if timeouts are globally disabled or
    3486             :      * we're doing something not partaking in timeouts.
    3487             :      */
    3488      264454 :     if (wal_sender_timeout <= 0 || last_reply_timestamp <= 0)
    3489           0 :         return;
    3490             : 
    3491      264454 :     if (waiting_for_ping_response)
    3492        4310 :         return;
    3493             : 
    3494             :     /*
    3495             :      * If half of wal_sender_timeout has lapsed without receiving any reply
    3496             :      * from the standby, send a keep-alive message to the standby requesting
    3497             :      * an immediate reply.
    3498             :      */
    3499      260144 :     ping_time = TimestampTzPlusMilliseconds(last_reply_timestamp,
    3500             :                                             wal_sender_timeout / 2);
    3501      260144 :     if (last_processing >= ping_time)
    3502             :     {
    3503           0 :         WalSndKeepalive(true);
    3504             : 
    3505             :         /* Try to flush pending output to the client */
    3506           0 :         if (pq_flush_if_writable() != 0)
    3507           0 :             WalSndShutdown();
    3508             :     }
    3509             : }
    3510             : 
    3511             : /*
    3512             :  * Record the end of the WAL and the time it was flushed locally, so that
    3513             :  * LagTrackerRead can compute the elapsed time (lag) when this WAL location is
    3514             :  * eventually reported to have been written, flushed and applied by the
    3515             :  * standby in a reply message.
    3516             :  */
    3517             : static void
    3518        6638 : LagTrackerWrite(XLogRecPtr lsn, TimestampTz local_flush_time)
    3519             : {
    3520             :     bool        buffer_full;
    3521             :     int         new_write_head;
    3522             :     int         i;
    3523             : 
    3524        6638 :     if (!am_walsender)
    3525           0 :         return;
    3526             : 
    3527             :     /*
    3528             :      * If the lsn hasn't advanced since last time, then do nothing.  This way
    3529             :      * we only record a new sample when new WAL has been written.
    3530             :      */
    3531        6638 :     if (lag_tracker->last_lsn == lsn)
    3532        5842 :         return;
    3533         796 :     lag_tracker->last_lsn = lsn;
    3534             : 
    3535             :     /*
    3536             :      * If advancing the write head of the circular buffer would crash into any
    3537             :      * of the read heads, then the buffer is full.  In other words, the
    3538             :      * slowest reader (presumably apply) is the one that controls the release
    3539             :      * of space.
    3540             :      */
    3541         796 :     new_write_head = (lag_tracker->write_head + 1) % LAG_TRACKER_BUFFER_SIZE;
    3542         796 :     buffer_full = false;
    3543        3184 :     for (i = 0; i < NUM_SYNC_REP_WAIT_MODE; ++i)
    3544             :     {
    3545        2388 :         if (new_write_head == lag_tracker->read_heads[i])
    3546           0 :             buffer_full = true;
    3547             :     }
    3548             : 
    3549             :     /*
    3550             :      * If the buffer is full, for now we just rewind by one slot and overwrite
    3551             :      * the last sample, as a simple (if somewhat uneven) way to lower the
    3552             :      * sampling rate.  There may be better adaptive compaction algorithms.
    3553             :      */
    3554         796 :     if (buffer_full)
    3555             :     {
    3556           0 :         new_write_head = lag_tracker->write_head;
    3557           0 :         if (lag_tracker->write_head > 0)
    3558           0 :             lag_tracker->write_head--;
    3559             :         else
    3560           0 :             lag_tracker->write_head = LAG_TRACKER_BUFFER_SIZE - 1;
    3561             :     }
    3562             : 
    3563             :     /* Store a sample at the current write head position. */
    3564         796 :     lag_tracker->buffer[lag_tracker->write_head].lsn = lsn;
    3565         796 :     lag_tracker->buffer[lag_tracker->write_head].time = local_flush_time;
    3566         796 :     lag_tracker->write_head = new_write_head;
    3567             : }
    3568             : 
    3569             : /*
    3570             :  * Find out how much time has elapsed between the moment WAL location 'lsn'
    3571             :  * (or the highest known earlier LSN) was flushed locally and the time 'now'.
    3572             :  * We have a separate read head for each of the reported LSN locations we
    3573             :  * receive in replies from standby; 'head' controls which read head is
    3574             :  * used.  Whenever a read head crosses an LSN which was written into the
    3575             :  * lag buffer with LagTrackerWrite, we can use the associated timestamp to
    3576             :  * find out the time this LSN (or an earlier one) was flushed locally, and
    3577             :  * therefore compute the lag.
    3578             :  *
    3579             :  * Return -1 if no new sample data is available, and otherwise the elapsed
    3580             :  * time in microseconds.
    3581             :  */
    3582             : static TimeOffset
    3583       59712 : LagTrackerRead(int head, XLogRecPtr lsn, TimestampTz now)
    3584             : {
    3585       59712 :     TimestampTz time = 0;
    3586             : 
    3587             :     /* Read all unread samples up to this LSN or end of buffer. */
    3588       61196 :     while (lag_tracker->read_heads[head] != lag_tracker->write_head &&
    3589        6456 :            lag_tracker->buffer[lag_tracker->read_heads[head]].lsn <= lsn)
    3590             :     {
    3591        1484 :         time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
    3592        1484 :         lag_tracker->last_read[head] =
    3593        1484 :             lag_tracker->buffer[lag_tracker->read_heads[head]];
    3594        2968 :         lag_tracker->read_heads[head] =
    3595        1484 :             (lag_tracker->read_heads[head] + 1) % LAG_TRACKER_BUFFER_SIZE;
    3596             :     }
    3597             : 
    3598             :     /*
    3599             :      * If the lag tracker is empty, that means the standby has processed
    3600             :      * everything we've ever sent so we should now clear 'last_read'.  If we
    3601             :      * didn't do that, we'd risk using a stale and irrelevant sample for
    3602             :      * interpolation at the beginning of the next burst of WAL after a period
    3603             :      * of idleness.
    3604             :      */
    3605       59712 :     if (lag_tracker->read_heads[head] == lag_tracker->write_head)
    3606       54740 :         lag_tracker->last_read[head].time = 0;
    3607             : 
    3608       59712 :     if (time > now)
    3609             :     {
    3610             :         /* If the clock somehow went backwards, treat as not found. */
    3611           0 :         return -1;
    3612             :     }
    3613       59712 :     else if (time == 0)
    3614             :     {
    3615             :         /*
    3616             :          * We didn't cross a time.  If there is a future sample that we
    3617             :          * haven't reached yet, and we've already reached at least one sample,
    3618             :          * let's interpolate the local flushed time.  This is mainly useful
    3619             :          * for reporting a completely stuck apply position as having
    3620             :          * increasing lag, since otherwise we'd have to wait for it to
    3621             :          * eventually start moving again and cross one of our samples before
    3622             :          * we can show the lag increasing.
    3623             :          */
    3624       58312 :         if (lag_tracker->read_heads[head] == lag_tracker->write_head)
    3625             :         {
    3626             :             /* There are no future samples, so we can't interpolate. */
    3627       53390 :             return -1;
    3628             :         }
    3629        4922 :         else if (lag_tracker->last_read[head].time != 0)
    3630             :         {
    3631             :             /* We can interpolate between last_read and the next sample. */
    3632             :             double      fraction;
    3633          28 :             WalTimeSample prev = lag_tracker->last_read[head];
    3634          28 :             WalTimeSample next = lag_tracker->buffer[lag_tracker->read_heads[head]];
    3635             : 
    3636          28 :             if (lsn < prev.lsn)
    3637             :             {
    3638             :                 /*
    3639             :                  * Reported LSNs shouldn't normally go backwards, but it's
    3640             :                  * possible when there is a timeline change.  Treat as not
    3641             :                  * found.
    3642             :                  */
    3643           0 :                 return -1;
    3644             :             }
    3645             : 
    3646             :             Assert(prev.lsn < next.lsn);
    3647             : 
    3648          28 :             if (prev.time > next.time)
    3649             :             {
    3650             :                 /* If the clock somehow went backwards, treat as not found. */
    3651           0 :                 return -1;
    3652             :             }
    3653             : 
    3654             :             /* See how far we are between the previous and next samples. */
    3655          28 :             fraction =
    3656          28 :                 (double) (lsn - prev.lsn) / (double) (next.lsn - prev.lsn);
    3657             : 
    3658             :             /* Scale the local flush time proportionally. */
    3659          28 :             time = (TimestampTz)
    3660          28 :                 ((double) prev.time + (next.time - prev.time) * fraction);
    3661             :         }
    3662             :         else
    3663             :         {
    3664             :             /*
    3665             :              * We have only a future sample, implying that we were entirely
    3666             :              * caught up but and now there is a new burst of WAL and the
    3667             :              * standby hasn't processed the first sample yet.  Until the
    3668             :              * standby reaches the future sample the best we can do is report
    3669             :              * the hypothetical lag if that sample were to be replayed now.
    3670             :              */
    3671        4894 :             time = lag_tracker->buffer[lag_tracker->read_heads[head]].time;
    3672             :         }
    3673             :     }
    3674             : 
    3675             :     /* Return the elapsed time since local flush time in microseconds. */
    3676             :     Assert(time != 0);
    3677        6322 :     return now - time;
    3678             : }

Generated by: LCOV version 1.13