LCOV - differential code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Coverage Total Hit UNC UBC GBC GNC CBC DUB DCB
Current: d36b728949bf4e37ada1cd23e0f2aaa94f609a70 vs 52e118fe2f7e3381bdaa479816a7f72eda2ae517 Lines: 78.4 % 486 381 6 99 26 23 332 10 22
Current Date: 2026-06-29 16:15:13 +0200 Functions: 90.0 % 10 9 1 6 3
Baseline: lcov-20260630-baseline Branches: 71.9 % 292 210 13 69 16 23 171
Baseline Date: 2026-06-29 13:01:57 +0200 Line coverage date bins:
Legend: Lines:     hit not hit
Branches: + taken - not taken # not executed
(7,30] days: 100.0 % 7 7 1 6
(30,360] days: 78.6 % 28 22 6 22
(360..) days: 78.0 % 451 352 99 26 326
Function coverage date bins:
(360..) days: 90.0 % 10 9 1 6 3
Branch coverage date bins:
(7,30] days: 50.0 % 2 1 1 1
(30,360] days: 63.9 % 36 23 13 23
(360..) days: 73.2 % 254 186 68 16 170

 Age         Owner                    Branch data    TLA  Line data    Source code
                                  1                 :                : /*-------------------------------------------------------------------------
                                  2                 :                :  *
                                  3                 :                :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
                                  4                 :                :  *                    fashion and write it to a local file.
                                  5                 :                :  *
                                  6                 :                :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
                                  7                 :                :  *
                                  8                 :                :  * IDENTIFICATION
                                  9                 :                :  *        src/bin/pg_basebackup/pg_recvlogical.c
                                 10                 :                :  *-------------------------------------------------------------------------
                                 11                 :                :  */
                                 12                 :                : 
                                 13                 :                : #include "postgres_fe.h"
                                 14                 :                : 
                                 15                 :                : #include <dirent.h>
                                 16                 :                : #include <limits.h>
                                 17                 :                : #include <sys/select.h>
                                 18                 :                : #include <sys/stat.h>
                                 19                 :                : #include <unistd.h>
                                 20                 :                : 
                                 21                 :                : #include "common/file_perm.h"
                                 22                 :                : #include "common/logging.h"
                                 23                 :                : #include "fe_utils/option_utils.h"
                                 24                 :                : #include "getopt_long.h"
                                 25                 :                : #include "libpq-fe.h"
                                 26                 :                : #include "libpq/pqsignal.h"
                                 27                 :                : #include "libpq/protocol.h"
                                 28                 :                : #include "pqexpbuffer.h"
                                 29                 :                : #include "streamutil.h"
                                 30                 :                : 
                                 31                 :                : /* Time to sleep between reconnection attempts */
                                 32                 :                : #define RECONNECT_SLEEP_TIME 5
                                 33                 :                : 
                                 34                 :                : typedef enum
                                 35                 :                : {
                                 36                 :                :     STREAM_STOP_NONE,
                                 37                 :                :     STREAM_STOP_END_OF_WAL,
                                 38                 :                :     STREAM_STOP_KEEPALIVE,
                                 39                 :                :     STREAM_STOP_SIGNAL
                                 40                 :                : } StreamStopReason;
                                 41                 :                : 
                                 42                 :                : /* Global Options */
                                 43                 :                : static char *outfile = NULL;
                                 44                 :                : static int  verbose = 0;
                                 45                 :                : static bool two_phase = false;  /* enable-two-phase option */
                                 46                 :                : static bool failover = false;   /* enable-failover option */
                                 47                 :                : static int  noloop = 0;
                                 48                 :                : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
                                 49                 :                : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
                                 50                 :                : static XLogRecPtr startpos = InvalidXLogRecPtr;
                                 51                 :                : static XLogRecPtr endpos = InvalidXLogRecPtr;
                                 52                 :                : static bool do_create_slot = false;
                                 53                 :                : static bool slot_exists_ok = false;
                                 54                 :                : static bool do_start_slot = false;
                                 55                 :                : static bool do_drop_slot = false;
                                 56                 :                : static char *replication_slot = NULL;
                                 57                 :                : 
                                 58                 :                : /* filled pairwise with option, value. value may be NULL */
                                 59                 :                : static char **options;
                                 60                 :                : static size_t noptions = 0;
                                 61                 :                : static const char *plugin = "test_decoding";
                                 62                 :                : 
                                 63                 :                : /* Global State */
                                 64                 :                : static int  outfd = -1;
                                 65                 :                : static volatile sig_atomic_t time_to_abort = false;
                                 66                 :                : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
                                 67                 :                : static volatile sig_atomic_t output_reopen = false;
                                 68                 :                : static bool output_isfile;
                                 69                 :                : static TimestampTz output_last_fsync = -1;
                                 70                 :                : static bool output_needs_fsync = false;
                                 71                 :                : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
                                 72                 :                : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
                                 73                 :                : 
                                 74                 :                : static void usage(void);
                                 75                 :                : static void StreamLogicalLog(void);
                                 76                 :                : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
                                 77                 :                : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
                                 78                 :                :                                StreamStopReason reason,
                                 79                 :                :                                XLogRecPtr lsn);
                                 80                 :                : 
                                 81                 :                : static void
 4487 rhaas@postgresql.org       82                 :CBC           1 : usage(void)
                                 83                 :                : {
 4120 bruce@momjian.us           84                 :              1 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
                                 85                 :                :            progname);
 4487 rhaas@postgresql.org       86                 :              1 :     printf(_("Usage:\n"));
                                 87                 :              1 :     printf(_("  %s [OPTION]...\n"), progname);
 4279 peter_e@gmx.net            88                 :              1 :     printf(_("\nAction to be performed:\n"));
                                 89                 :              1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
                                 90                 :              1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
                                 91                 :              1 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
 4487 rhaas@postgresql.org       92                 :              1 :     printf(_("\nOptions:\n"));
  366 peter@eisentraut.org       93                 :              1 :     printf(_("      --enable-failover  enable replication slot synchronization to standby servers when\n"
                                 94                 :                :              "                         creating a replication slot\n"));
                                 95                 :              1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
 4279 peter_e@gmx.net            96                 :              1 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
 4419 andres@anarazel.de         97                 :              1 :     printf(_("  -F  --fsync-interval=SECS\n"
                                 98                 :                :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
 3940 peter_e@gmx.net            99                 :              1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
 4279                           100                 :              1 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
 4487 rhaas@postgresql.org      101                 :              1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
 4279 peter_e@gmx.net           102                 :              1 :     printf(_("  -o, --option=NAME[=VALUE]\n"
                                103                 :                :              "                         pass option NAME with optional value VALUE to the\n"
                                104                 :                :              "                         output plugin\n"));
                                105                 :              1 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
                                106                 :              1 :     printf(_("  -s, --status-interval=SECS\n"
                                107                 :                :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
                                108                 :              1 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
  366 peter@eisentraut.org      109                 :              1 :     printf(_("  -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
                                110                 :              1 :     printf(_("      --two-phase        (same as --enable-two-phase, deprecated)\n"));
 4487 rhaas@postgresql.org      111                 :              1 :     printf(_("  -v, --verbose          output verbose messages\n"));
                                112                 :              1 :     printf(_("  -V, --version          output version information, then exit\n"));
                                113                 :              1 :     printf(_("  -?, --help             show this help, then exit\n"));
                                114                 :              1 :     printf(_("\nConnection options:\n"));
                                115                 :              1 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
                                116                 :              1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
                                117                 :              1 :     printf(_("  -p, --port=PORT        database server port number\n"));
                                118                 :              1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
                                119                 :              1 :     printf(_("  -w, --no-password      never prompt for password\n"));
                                120                 :              1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
 2314 peter@eisentraut.org      121                 :              1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
                                122                 :              1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
 4487 rhaas@postgresql.org      123                 :              1 : }
                                124                 :                : 
                                125                 :                : /*
                                126                 :                :  * Send a Standby Status Update message to server.
                                127                 :                :  */
                                128                 :                : static bool
 3414 tgl@sss.pgh.pa.us         129                 :             29 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
                                130                 :                : {
                                131                 :                :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
                                132                 :                :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
                                133                 :                : 
                                134                 :                :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
 4487 rhaas@postgresql.org      135                 :             29 :     int         len = 0;
                                136                 :                : 
                                137                 :                :     /*
                                138                 :                :      * we normally don't want to send superfluous feedback, but if it's
                                139                 :                :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
                                140                 :                :      * us.
                                141                 :                :      */
                                142         [ -  + ]:             29 :     if (!force &&
 4487 rhaas@postgresql.org      143         [ #  # ]:UBC           0 :         last_written_lsn == output_written_lsn &&
 2239 noah@leadboat.com         144         [ #  # ]:              0 :         last_fsync_lsn == output_fsync_lsn)
 4487 rhaas@postgresql.org      145                 :              0 :         return true;
                                146                 :                : 
 4487 rhaas@postgresql.org      147         [ +  + ]:CBC          29 :     if (verbose)
  358 alvherre@kurilemu.de      148                 :GNC           2 :         pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
                                149                 :                :                     LSN_FORMAT_ARGS(output_written_lsn),
                                150                 :                :                     LSN_FORMAT_ARGS(output_fsync_lsn),
                                151                 :                :                     replication_slot);
                                152                 :                : 
  328 nathan@postgresql.or      153                 :             29 :     replybuf[len] = PqReplMsg_StandbyStatusUpdate;
 4487 rhaas@postgresql.org      154                 :CBC          29 :     len += 1;
 4438 bruce@momjian.us          155                 :             29 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
 4487 rhaas@postgresql.org      156                 :             29 :     len += 8;
 3296 tgl@sss.pgh.pa.us         157                 :             29 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
 4487 rhaas@postgresql.org      158                 :             29 :     len += 8;
 4438 bruce@momjian.us          159                 :             29 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
 4487 rhaas@postgresql.org      160                 :             29 :     len += 8;
 4438 bruce@momjian.us          161                 :             29 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
 4487 rhaas@postgresql.org      162                 :             29 :     len += 8;
 3296 tgl@sss.pgh.pa.us         163                 :             29 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
 4487 rhaas@postgresql.org      164                 :             29 :     len += 1;
                                165                 :                : 
                                166                 :             29 :     startpos = output_written_lsn;
                                167                 :             29 :     last_written_lsn = output_written_lsn;
                                168                 :             29 :     last_fsync_lsn = output_fsync_lsn;
                                169                 :                : 
                                170   [ +  -  -  + ]:             29 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
                                171                 :                :     {
 2647 peter@eisentraut.org      172                 :UBC           0 :         pg_log_error("could not send feedback packet: %s",
                                173                 :                :                      PQerrorMessage(conn));
 4487 rhaas@postgresql.org      174                 :              0 :         return false;
                                175                 :                :     }
                                176                 :                : 
 4487 rhaas@postgresql.org      177                 :CBC          29 :     return true;
                                178                 :                : }
                                179                 :                : 
                                180                 :                : static void
 2740 peter@eisentraut.org      181                 :             60 : disconnect_atexit(void)
                                182                 :                : {
 4487 rhaas@postgresql.org      183         [ +  + ]:             60 :     if (conn != NULL)
                                184                 :             34 :         PQfinish(conn);
                                185                 :             60 : }
                                186                 :                : 
                                187                 :                : static void
 3414 tgl@sss.pgh.pa.us         188                 :             41 : OutputFsync(TimestampTz now)
                                189                 :                : {
 4487 rhaas@postgresql.org      190                 :             41 :     output_last_fsync = now;
                                191                 :                : 
                                192                 :             41 :     output_fsync_lsn = output_written_lsn;
                                193                 :                : 
                                194                 :                :     /*
                                195                 :                :      * Save the last flushed position as the replication start point. On
                                196                 :                :      * reconnect, replication resumes from there to avoid re-sending flushed
                                197                 :                :      * data.
                                198                 :                :      */
  165 fujii@postgresql.org      199                 :GNC          41 :     startpos = output_fsync_lsn;
                                200                 :                : 
 4487 rhaas@postgresql.org      201         [ -  + ]:CBC          41 :     if (fsync_interval <= 0)
  165 fujii@postgresql.org      202                 :UNC           0 :         return;
                                203                 :                : 
 4429 heikki.linnakangas@i      204         [ +  + ]:CBC          41 :     if (!output_needs_fsync)
  165 fujii@postgresql.org      205                 :GNC          27 :         return;
                                206                 :                : 
 4429 heikki.linnakangas@i      207                 :CBC          14 :     output_needs_fsync = false;
                                208                 :                : 
                                209                 :                :     /* can only fsync if it's a regular file */
 4011 andres@anarazel.de        210         [ +  + ]:             14 :     if (!output_isfile)
  165 fujii@postgresql.org      211                 :GNC          11 :         return;
                                212                 :                : 
 4011 andres@anarazel.de        213         [ -  + ]:CBC           3 :     if (fsync(outfd) != 0)
 1544 tgl@sss.pgh.pa.us         214                 :UBC           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
                                215                 :                : }
                                216                 :                : 
                                217                 :                : /*
                                218                 :                :  * Start the log streaming
                                219                 :                :  */
                                220                 :                : static void
 4292 andres@anarazel.de        221                 :CBC          27 : StreamLogicalLog(void)
                                222                 :                : {
                                223                 :                :     PGresult   *res;
 4487 rhaas@postgresql.org      224                 :             27 :     char       *copybuf = NULL;
 3414 tgl@sss.pgh.pa.us         225                 :             27 :     TimestampTz last_status = -1;
                                226                 :                :     int         i;
                                227                 :                :     PQExpBuffer query;
                                228                 :                :     XLogRecPtr  cur_record_lsn;
                                229                 :                : 
 1076 michael@paquier.xyz       230                 :             27 :     cur_record_lsn = InvalidXLogRecPtr;
                                231                 :                : 
                                232                 :                :     /*
                                233                 :                :      * Connect in replication mode to the server
                                234                 :                :      */
 4487 rhaas@postgresql.org      235         [ +  + ]:             27 :     if (!conn)
 4487 rhaas@postgresql.org      236                 :GBC           1 :         conn = GetConnection();
 4487 rhaas@postgresql.org      237         [ -  + ]:CBC          27 :     if (!conn)
                                238                 :                :         /* Error message already written in GetConnection() */
 4487 rhaas@postgresql.org      239                 :UBC           0 :         return;
                                240                 :                : 
                                241                 :                :     /*
                                242                 :                :      * Start the replication
                                243                 :                :      */
 4487 rhaas@postgresql.org      244         [ +  + ]:CBC          27 :     if (verbose)
  358 alvherre@kurilemu.de      245                 :GNC           2 :         pg_log_info("starting log streaming at %X/%08X (slot %s)",
                                246                 :                :                     LSN_FORMAT_ARGS(startpos),
                                247                 :                :                     replication_slot);
                                248                 :                : 
                                249                 :                :     /* Initiate the replication stream at specified location */
 1682 tgl@sss.pgh.pa.us         250                 :CBC          27 :     query = createPQExpBuffer();
   15                           251                 :             27 :     appendPQExpBufferStr(query, "START_REPLICATION SLOT ");
                                252                 :             27 :     AppendQuotedIdentifier(query, replication_slot);
   15 tgl@sss.pgh.pa.us         253                 :GNC          27 :     appendPQExpBuffer(query, " LOGICAL %X/%08X", LSN_FORMAT_ARGS(startpos));
                                254                 :                : 
                                255                 :                :     /* print options if there are any */
 4487 rhaas@postgresql.org      256         [ +  + ]:CBC          27 :     if (noptions)
                                257                 :             21 :         appendPQExpBufferStr(query, " (");
                                258                 :                : 
                                259         [ +  + ]:             69 :     for (i = 0; i < noptions; i++)
                                260                 :                :     {
                                261                 :                :         /* separator */
                                262         [ +  + ]:             42 :         if (i > 0)
                                263                 :             21 :             appendPQExpBufferStr(query, ", ");
                                264                 :                : 
                                265                 :                :         /* write option name */
   15 tgl@sss.pgh.pa.us         266                 :             42 :         AppendQuotedIdentifier(query, options[i * 2]);
                                267                 :                : 
                                268                 :                :         /* write option value if specified */
                                269         [ +  - ]:             42 :         if (options[i * 2 + 1] != NULL)
                                270                 :                :         {
                                271                 :             42 :             appendPQExpBufferChar(query, ' ');
                                272                 :             42 :             AppendQuotedLiteral(query, options[i * 2 + 1]);
                                273                 :                :         }
                                274                 :                :     }
                                275                 :                : 
 4487 rhaas@postgresql.org      276         [ +  + ]:             27 :     if (noptions)
                                277                 :             21 :         appendPQExpBufferChar(query, ')');
                                278                 :                : 
                                279                 :             27 :     res = PQexec(conn, query->data);
                                280         [ +  + ]:             27 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
                                281                 :                :     {
 2647 peter@eisentraut.org      282                 :              6 :         pg_log_error("could not send replication command \"%s\": %s",
                                283                 :                :                      query->data, PQresultErrorMessage(res));
 4487 rhaas@postgresql.org      284                 :              6 :         PQclear(res);
                                285                 :              6 :         goto error;
                                286                 :                :     }
                                287                 :             21 :     PQclear(res);
                                288                 :             21 :     resetPQExpBuffer(query);
                                289                 :                : 
                                290         [ +  + ]:             21 :     if (verbose)
 2647 peter@eisentraut.org      291                 :GBC           2 :         pg_log_info("streaming initiated");
                                292                 :                : 
 4487 rhaas@postgresql.org      293         [ +  + ]:CBC         552 :     while (!time_to_abort)
                                294                 :                :     {
                                295                 :                :         int         r;
                                296                 :                :         int         bytes_left;
                                297                 :                :         int         bytes_written;
                                298                 :                :         TimestampTz now;
                                299                 :                :         int         hdr_len;
                                300                 :                : 
 1076 michael@paquier.xyz       301                 :            549 :         cur_record_lsn = InvalidXLogRecPtr;
                                302                 :                : 
 4487 rhaas@postgresql.org      303         [ +  + ]:            549 :         if (copybuf != NULL)
                                304                 :                :         {
                                305                 :            332 :             PQfreemem(copybuf);
                                306                 :            332 :             copybuf = NULL;
                                307                 :                :         }
                                308                 :                : 
                                309                 :                :         /*
                                310                 :                :          * Potentially send a status message to the primary.
                                311                 :                :          */
                                312                 :            549 :         now = feGetCurrentTimestamp();
                                313                 :                : 
                                314   [ +  +  +  + ]:           1078 :         if (outfd != -1 &&
                                315                 :            529 :             feTimestampDifferenceExceeds(output_last_fsync, now,
                                316                 :                :                                          fsync_interval))
  165 fujii@postgresql.org      317                 :GNC          21 :             OutputFsync(now);
                                318                 :                : 
 4487 rhaas@postgresql.org      319   [ +  -  +  + ]:CBC        1098 :         if (standby_message_timeout > 0 &&
                                320                 :            549 :             feTimestampDifferenceExceeds(last_status, now,
                                321                 :                :                                          standby_message_timeout))
                                322                 :                :         {
                                323                 :                :             /* Time to send feedback! */
                                324         [ -  + ]:             21 :             if (!sendFeedback(conn, now, true, false))
 4487 rhaas@postgresql.org      325                 :GBC           3 :                 goto error;
                                326                 :                : 
 4487 rhaas@postgresql.org      327                 :CBC          21 :             last_status = now;
                                328                 :                :         }
                                329                 :                : 
                                330                 :                :         /* got SIGHUP, close output file */
 4429 heikki.linnakangas@i      331   [ +  +  -  +  :            549 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
                                              -  - ]
                                332                 :                :         {
 4429 heikki.linnakangas@i      333                 :UBC           0 :             now = feGetCurrentTimestamp();
  165 fujii@postgresql.org      334                 :UNC           0 :             OutputFsync(now);
 4429 heikki.linnakangas@i      335                 :UBC           0 :             close(outfd);
                                336                 :              0 :             outfd = -1;
                                337                 :                :         }
 4429 heikki.linnakangas@i      338                 :CBC         549 :         output_reopen = false;
                                339                 :                : 
                                340                 :                :         /* open the output file, if not open yet */
 4428                           341         [ +  + ]:            549 :         if (outfd == -1)
                                342                 :                :         {
                                343                 :                :             struct stat statbuf;
                                344                 :                : 
                                345         [ +  + ]:             20 :             if (strcmp(outfile, "-") == 0)
                                346                 :             18 :                 outfd = fileno(stdout);
                                347                 :                :             else
 4428 heikki.linnakangas@i      348                 :GBC           2 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
                                349                 :                :                              pg_file_create_mode);
 4428 heikki.linnakangas@i      350         [ -  + ]:CBC          20 :             if (outfd == -1)
                                351                 :                :             {
 2647 peter@eisentraut.org      352                 :UBC           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
 4428 heikki.linnakangas@i      353                 :              0 :                 goto error;
                                354                 :                :             }
                                355                 :                : 
 4011 andres@anarazel.de        356         [ -  + ]:CBC          20 :             if (fstat(outfd, &statbuf) != 0)
                                357                 :                :             {
 2647 peter@eisentraut.org      358                 :UBC           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
 1797 michael@paquier.xyz       359                 :              0 :                 goto error;
                                360                 :                :             }
                                361                 :                : 
 4011 andres@anarazel.de        362   [ +  +  +  - ]:CBC          20 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
                                363                 :                :         }
                                364                 :                : 
 4487 rhaas@postgresql.org      365                 :            549 :         r = PQgetCopyData(conn, &copybuf, 1);
                                366         [ +  + ]:            549 :         if (r == 0)
                                367                 :            196 :         {
                                368                 :                :             /*
                                369                 :                :              * In async mode, and no data available. We block on reading but
                                370                 :                :              * not more than the specified timeout, so that we can send a
                                371                 :                :              * response back to the client.
                                372                 :                :              */
                                373                 :                :             fd_set      input_mask;
 3414 tgl@sss.pgh.pa.us         374                 :            202 :             TimestampTz message_target = 0;
                                375                 :            202 :             TimestampTz fsync_target = 0;
                                376                 :                :             struct timeval timeout;
 4487 rhaas@postgresql.org      377                 :            202 :             struct timeval *timeoutptr = NULL;
                                378                 :                : 
 3766 peter_e@gmx.net           379         [ -  + ]:            202 :             if (PQsocket(conn) < 0)
                                380                 :                :             {
 2647 peter@eisentraut.org      381                 :UBC           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
 3766 peter_e@gmx.net           382                 :CBC           3 :                 goto error;
                                383                 :                :             }
                                384                 :                : 
 4487 rhaas@postgresql.org      385         [ +  + ]:           3434 :             FD_ZERO(&input_mask);
                                386                 :            202 :             FD_SET(PQsocket(conn), &input_mask);
                                387                 :                : 
                                388                 :                :             /* Compute when we need to wakeup to send a keepalive message. */
                                389         [ +  - ]:            202 :             if (standby_message_timeout)
                                390                 :            202 :                 message_target = last_status + (standby_message_timeout - 1) *
                                391                 :                :                     ((int64) 1000);
                                392                 :                : 
                                393                 :                :             /* Compute when we need to wakeup to fsync the output file. */
 4429 heikki.linnakangas@i      394   [ +  -  +  + ]:            202 :             if (fsync_interval > 0 && output_needs_fsync)
 4487 rhaas@postgresql.org      395                 :             93 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
                                396                 :                :                     ((int64) 1000);
                                397                 :                : 
                                398                 :                :             /* Now compute when to wakeup. */
                                399   [ -  +  -  - ]:            202 :             if (message_target > 0 || fsync_target > 0)
                                400                 :                :             {
                                401                 :                :                 TimestampTz targettime;
                                402                 :                :                 long        secs;
                                403                 :                :                 int         usecs;
                                404                 :                : 
                                405                 :            202 :                 targettime = message_target;
                                406                 :                : 
                                407   [ +  +  +  + ]:            202 :                 if (fsync_target > 0 && fsync_target < targettime)
 4487 rhaas@postgresql.org      408                 :GBC          11 :                     targettime = fsync_target;
                                409                 :                : 
 4487 rhaas@postgresql.org      410                 :CBC         202 :                 feTimestampDifference(now,
                                411                 :                :                                       targettime,
                                412                 :                :                                       &secs,
                                413                 :                :                                       &usecs);
                                414         [ +  + ]:            202 :                 if (secs <= 0)
 4487 rhaas@postgresql.org      415                 :GBC          11 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
                                416                 :                :                 else
 4487 rhaas@postgresql.org      417                 :CBC         191 :                     timeout.tv_sec = secs;
                                418                 :            202 :                 timeout.tv_usec = usecs;
                                419                 :            202 :                 timeoutptr = &timeout;
                                420                 :                :             }
                                421                 :                : 
                                422                 :            202 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
                                423   [ +  -  +  +  :            202 :             if (r == 0 || (r < 0 && errno == EINTR))
                                              +  - ]
                                424                 :                :             {
                                425                 :                :                 /*
                                426                 :                :                  * Got a timeout or signal. Continue the loop and either
                                427                 :                :                  * deliver a status packet to the server or just go back into
                                428                 :                :                  * blocking.
                                429                 :                :                  */
                                430                 :            199 :                 continue;
                                431                 :                :             }
                                432         [ -  + ]:            199 :             else if (r < 0)
                                433                 :                :             {
 1894 peter@eisentraut.org      434                 :UBC           0 :                 pg_log_error("%s() failed: %m", "select");
 4487 rhaas@postgresql.org      435                 :              0 :                 goto error;
                                436                 :                :             }
                                437                 :                : 
                                438                 :                :             /* Else there is actually data on the socket */
 4487 rhaas@postgresql.org      439         [ +  + ]:CBC         199 :             if (PQconsumeInput(conn) == 0)
                                440                 :                :             {
 2647 peter@eisentraut.org      441                 :              3 :                 pg_log_error("could not receive data from WAL stream: %s",
                                442                 :                :                              PQerrorMessage(conn));
 4487 rhaas@postgresql.org      443                 :              3 :                 goto error;
                                444                 :                :             }
                                445                 :            196 :             continue;
                                446                 :                :         }
                                447                 :                : 
                                448                 :                :         /* End of copy stream */
                                449         [ +  + ]:            347 :         if (r == -1)
                                450                 :             15 :             break;
                                451                 :                : 
                                452                 :                :         /* Failure while reading the copy stream */
                                453         [ -  + ]:            339 :         if (r == -2)
                                454                 :                :         {
 2647 peter@eisentraut.org      455                 :UBC           0 :             pg_log_error("could not read COPY data: %s",
                                456                 :                :                          PQerrorMessage(conn));
 4487 rhaas@postgresql.org      457                 :              0 :             goto error;
                                458                 :                :         }
                                459                 :                : 
                                460                 :                :         /* Check the message type. */
  328 nathan@postgresql.or      461         [ +  + ]:GNC         339 :         if (copybuf[0] == PqReplMsg_Keepalive)
 4487 rhaas@postgresql.org      462                 :CBC         216 :         {
                                463                 :                :             int         pos;
                                464                 :                :             bool        replyRequested;
                                465                 :                :             XLogRecPtr  walEnd;
 3464 simon@2ndQuadrant.co      466                 :            218 :             bool        endposReached = false;
                                467                 :                : 
                                468                 :                :             /*
                                469                 :                :              * Parse the keepalive message, enclosed in the CopyData message.
                                470                 :                :              * We just check if the server requested a reply, and ignore the
                                471                 :                :              * rest.
                                472                 :                :              */
  328 nathan@postgresql.or      473                 :GNC         218 :             pos = 1;            /* skip msgtype PqReplMsg_Keepalive */
 4487 rhaas@postgresql.org      474                 :CBC         218 :             walEnd = fe_recvint64(&copybuf[pos]);
                                475                 :            218 :             output_written_lsn = Max(walEnd, output_written_lsn);
                                476                 :                : 
                                477                 :            218 :             pos += 8;           /* read walEnd */
                                478                 :                : 
                                479                 :            218 :             pos += 8;           /* skip sendTime */
                                480                 :                : 
                                481         [ -  + ]:            218 :             if (r < pos + 1)
                                482                 :                :             {
 2647 peter@eisentraut.org      483                 :UBC           0 :                 pg_log_error("streaming header too small: %d", r);
 4487 rhaas@postgresql.org      484                 :              0 :                 goto error;
                                485                 :                :             }
 4487 rhaas@postgresql.org      486                 :CBC         218 :             replyRequested = copybuf[pos];
                                487                 :                : 
  236 alvherre@kurilemu.de      488   [ +  +  +  + ]:GNC         218 :             if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
                                489                 :                :             {
                                490                 :                :                 /*
                                491                 :                :                  * If there's nothing to read on the socket until a keepalive
                                492                 :                :                  * we know that the server has nothing to send us; and if
                                493                 :                :                  * walEnd has passed endpos, we know nothing else can have
                                494                 :                :                  * committed before endpos.  So we can bail out now.
                                495                 :                :                  */
 3464 simon@2ndQuadrant.co      496                 :CBC           2 :                 endposReached = true;
                                497                 :                :             }
                                498                 :                : 
                                499                 :                :             /* Send a reply, if necessary */
                                500   [ +  +  +  + ]:            218 :             if (replyRequested || endposReached)
                                501                 :                :             {
                                502         [ -  + ]:              3 :                 if (!flushAndSendFeedback(conn, &now))
 4487 rhaas@postgresql.org      503                 :UBC           0 :                     goto error;
 4487 rhaas@postgresql.org      504                 :CBC           3 :                 last_status = now;
                                505                 :                :             }
                                506                 :                : 
 3464 simon@2ndQuadrant.co      507         [ +  + ]:            218 :             if (endposReached)
                                508                 :                :             {
 1076 michael@paquier.xyz       509                 :              2 :                 stop_reason = STREAM_STOP_KEEPALIVE;
 3464 simon@2ndQuadrant.co      510                 :              2 :                 time_to_abort = true;
                                511                 :              2 :                 break;
                                512                 :                :             }
                                513                 :                : 
 4487 rhaas@postgresql.org      514                 :            216 :             continue;
                                515                 :                :         }
  328 nathan@postgresql.or      516         [ -  + ]:GNC         121 :         else if (copybuf[0] != PqReplMsg_WALData)
                                517                 :                :         {
 2647 peter@eisentraut.org      518                 :UBC           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
                                519                 :                :                          copybuf[0]);
 4487 rhaas@postgresql.org      520                 :              0 :             goto error;
                                521                 :                :         }
                                522                 :                : 
                                523                 :                :         /*
                                524                 :                :          * Read the header of the WALData message, enclosed in the CopyData
                                525                 :                :          * message. We only need the WAL location field (dataStart), the rest
                                526                 :                :          * of the header is ignored.
                                527                 :                :          */
  328 nathan@postgresql.or      528                 :GNC         121 :         hdr_len = 1;            /* msgtype PqReplMsg_WALData */
 4487 rhaas@postgresql.org      529                 :CBC         121 :         hdr_len += 8;           /* dataStart */
                                530                 :            121 :         hdr_len += 8;           /* walEnd */
                                531                 :            121 :         hdr_len += 8;           /* sendTime */
                                532         [ -  + ]:            121 :         if (r < hdr_len + 1)
                                533                 :                :         {
 2647 peter@eisentraut.org      534                 :UBC           0 :             pg_log_error("streaming header too small: %d", r);
 4487 rhaas@postgresql.org      535                 :              0 :             goto error;
                                536                 :                :         }
                                537                 :                : 
                                538                 :                :         /* Extract WAL location for this block */
 3464 simon@2ndQuadrant.co      539                 :CBC         121 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
                                540                 :                : 
  236 alvherre@kurilemu.de      541   [ +  +  -  + ]:GNC         121 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
                                542                 :                :         {
                                543                 :                :             /*
                                544                 :                :              * We've read past our endpoint, so prepare to go away being
                                545                 :                :              * cautious about what happens to our output data.
                                546                 :                :              */
 3464 simon@2ndQuadrant.co      547         [ #  # ]:UBC           0 :             if (!flushAndSendFeedback(conn, &now))
                                548                 :              0 :                 goto error;
 1076 michael@paquier.xyz       549                 :              0 :             stop_reason = STREAM_STOP_END_OF_WAL;
 3464 simon@2ndQuadrant.co      550                 :              0 :             time_to_abort = true;
                                551                 :              0 :             break;
                                552                 :                :         }
                                553                 :                : 
 3464 simon@2ndQuadrant.co      554                 :CBC         121 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
                                555                 :                : 
 4487 rhaas@postgresql.org      556                 :            121 :         bytes_left = r - hdr_len;
                                557                 :            121 :         bytes_written = 0;
                                558                 :                : 
                                559                 :                :         /* signal that a fsync is needed */
 4429 heikki.linnakangas@i      560                 :            121 :         output_needs_fsync = true;
                                561                 :                : 
 4487 rhaas@postgresql.org      562         [ +  + ]:            242 :         while (bytes_left)
                                563                 :                :         {
                                564                 :                :             int         ret;
                                565                 :                : 
                                566                 :            242 :             ret = write(outfd,
                                567                 :            121 :                         copybuf + hdr_len + bytes_written,
                                568                 :                :                         bytes_left);
                                569                 :                : 
                                570         [ -  + ]:            121 :             if (ret < 0)
                                571                 :                :             {
 1686 peter@eisentraut.org      572                 :UBC           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                573                 :                :                              bytes_left, outfile);
 4487 rhaas@postgresql.org      574                 :              0 :                 goto error;
                                575                 :                :             }
                                576                 :                : 
                                577                 :                :             /* Write was successful, advance our position */
 4487 rhaas@postgresql.org      578                 :CBC         121 :             bytes_written += ret;
                                579                 :            121 :             bytes_left -= ret;
                                580                 :                :         }
                                581                 :                : 
                                582         [ -  + ]:            121 :         if (write(outfd, "\n", 1) != 1)
                                583                 :                :         {
 1686 peter@eisentraut.org      584                 :UBC           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
                                585                 :                :                          1, outfile);
 4487 rhaas@postgresql.org      586                 :              0 :             goto error;
                                587                 :                :         }
                                588                 :                : 
  236 alvherre@kurilemu.de      589   [ +  +  +  + ]:GNC         121 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
                                590                 :                :         {
                                591                 :                :             /* endpos was exactly the record we just processed, we're done */
 3464 simon@2ndQuadrant.co      592         [ -  + ]:CBC           5 :             if (!flushAndSendFeedback(conn, &now))
 3464 simon@2ndQuadrant.co      593                 :UBC           0 :                 goto error;
 1076 michael@paquier.xyz       594                 :CBC           5 :             stop_reason = STREAM_STOP_END_OF_WAL;
 3464 simon@2ndQuadrant.co      595                 :              5 :             time_to_abort = true;
                                596                 :              5 :             break;
                                597                 :                :         }
                                598                 :                :     }
                                599                 :                : 
                                600                 :                :     /* Clean up connection state if stream has been aborted */
 1076 michael@paquier.xyz       601         [ +  + ]:             18 :     if (time_to_abort)
                                602                 :             10 :         prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
                                603                 :                : 
 4487 rhaas@postgresql.org      604                 :             18 :     res = PQgetResult(conn);
 3464 simon@2ndQuadrant.co      605         [ +  + ]:             18 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
                                606                 :                :     {
 2239 noah@leadboat.com         607                 :             10 :         PQclear(res);
                                608                 :                : 
                                609                 :                :         /*
                                610                 :                :          * We're doing a client-initiated clean exit and have sent CopyDone to
                                611                 :                :          * the server. Drain any messages, so we don't miss a last-minute
                                612                 :                :          * ErrorResponse. The walsender stops generating WALData records once
                                613                 :                :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
                                614                 :                :          * it's too late for sendFeedback(), even if this were to take a long
                                615                 :                :          * time. Hence, use synchronous-mode PQgetCopyData().
                                616                 :                :          */
                                617                 :                :         while (1)
                                618                 :             52 :         {
                                619                 :                :             int         r;
                                620                 :                : 
                                621         [ +  + ]:             62 :             if (copybuf != NULL)
                                622                 :                :             {
                                623                 :             59 :                 PQfreemem(copybuf);
                                624                 :             59 :                 copybuf = NULL;
                                625                 :                :             }
                                626                 :             62 :             r = PQgetCopyData(conn, &copybuf, 0);
                                627         [ +  + ]:             62 :             if (r == -1)
                                628                 :             10 :                 break;
                                629         [ -  + ]:             52 :             if (r == -2)
                                630                 :                :             {
 2239 noah@leadboat.com         631                 :UBC           0 :                 pg_log_error("could not read COPY data: %s",
                                632                 :                :                              PQerrorMessage(conn));
                                633                 :              0 :                 time_to_abort = false;  /* unclean exit */
                                634                 :              0 :                 goto error;
                                635                 :                :             }
                                636                 :                :         }
                                637                 :                : 
 2239 noah@leadboat.com         638                 :CBC          10 :         res = PQgetResult(conn);
                                639                 :                :     }
                                640         [ +  + ]:             18 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
                                641                 :                :     {
 2647 peter@eisentraut.org      642                 :              7 :         pg_log_error("unexpected termination of replication stream: %s",
                                643                 :                :                      PQresultErrorMessage(res));
  462 dgustafsson@postgres      644                 :              7 :         PQclear(res);
 4487 rhaas@postgresql.org      645                 :              7 :         goto error;
                                646                 :                :     }
                                647                 :             11 :     PQclear(res);
                                648                 :                : 
                                649   [ +  -  +  + ]:             11 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
                                650                 :                :     {
 3414 tgl@sss.pgh.pa.us         651                 :GBC           2 :         TimestampTz t = feGetCurrentTimestamp();
                                652                 :                : 
 4487 rhaas@postgresql.org      653                 :              2 :         OutputFsync(t);
                                654         [ -  + ]:              2 :         if (close(outfd) != 0)
 2647 peter@eisentraut.org      655                 :UBC           0 :             pg_log_error("could not close file \"%s\": %m", outfile);
                                656                 :                :     }
 4487 rhaas@postgresql.org      657                 :CBC          11 :     outfd = -1;
                                658                 :             27 : error:
 4439 heikki.linnakangas@i      659         [ -  + ]:             27 :     if (copybuf != NULL)
                                660                 :                :     {
 4439 heikki.linnakangas@i      661                 :UBC           0 :         PQfreemem(copybuf);
                                662                 :              0 :         copybuf = NULL;
                                663                 :                :     }
 4487 rhaas@postgresql.org      664                 :CBC          27 :     destroyPQExpBuffer(query);
                                665                 :             27 :     PQfinish(conn);
                                666                 :             27 :     conn = NULL;
                                667                 :                : }
                                668                 :                : 
                                669                 :                : /*
                                670                 :                :  * Unfortunately we can't do sensible signal handling on windows...
                                671                 :                :  */
                                672                 :                : #ifndef WIN32
                                673                 :                : 
                                674                 :                : /*
                                675                 :                :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
                                676                 :                :  * possible moment.
                                677                 :                :  */
                                678                 :                : static void
 1385 tgl@sss.pgh.pa.us         679                 :              3 : sigexit_handler(SIGNAL_ARGS)
                                680                 :                : {
 1076 michael@paquier.xyz       681                 :              3 :     stop_reason = STREAM_STOP_SIGNAL;
 4487 rhaas@postgresql.org      682                 :              3 :     time_to_abort = true;
                                683                 :              3 : }
                                684                 :                : 
                                685                 :                : /*
                                686                 :                :  * Trigger the output file to be reopened.
                                687                 :                :  */
                                688                 :                : static void
 1385 tgl@sss.pgh.pa.us         689                 :UBC           0 : sighup_handler(SIGNAL_ARGS)
                                690                 :                : {
 4487 rhaas@postgresql.org      691                 :              0 :     output_reopen = true;
                                692                 :              0 : }
                                693                 :                : #endif
                                694                 :                : 
                                695                 :                : 
                                696                 :                : int
 4487 rhaas@postgresql.org      697                 :CBC          68 : main(int argc, char **argv)
                                698                 :                : {
                                699                 :                :     static struct option long_options[] = {
                                700                 :                : /* general options */
                                701                 :                :         {"file", required_argument, NULL, 'f'},
                                702                 :                :         {"fsync-interval", required_argument, NULL, 'F'},
                                703                 :                :         {"no-loop", no_argument, NULL, 'n'},
                                704                 :                :         {"enable-failover", no_argument, NULL, 5},
                                705                 :                :         {"enable-two-phase", no_argument, NULL, 't'},
                                706                 :                :         {"two-phase", no_argument, NULL, 't'},    /* deprecated */
                                707                 :                :         {"verbose", no_argument, NULL, 'v'},
                                708                 :                :         {"version", no_argument, NULL, 'V'},
                                709                 :                :         {"help", no_argument, NULL, '?'},
                                710                 :                : /* connection options */
                                711                 :                :         {"dbname", required_argument, NULL, 'd'},
                                712                 :                :         {"host", required_argument, NULL, 'h'},
                                713                 :                :         {"port", required_argument, NULL, 'p'},
                                714                 :                :         {"username", required_argument, NULL, 'U'},
                                715                 :                :         {"no-password", no_argument, NULL, 'w'},
                                716                 :                :         {"password", no_argument, NULL, 'W'},
                                717                 :                : /* replication options */
                                718                 :                :         {"startpos", required_argument, NULL, 'I'},
                                719                 :                :         {"endpos", required_argument, NULL, 'E'},
                                720                 :                :         {"option", required_argument, NULL, 'o'},
                                721                 :                :         {"plugin", required_argument, NULL, 'P'},
                                722                 :                :         {"status-interval", required_argument, NULL, 's'},
                                723                 :                :         {"slot", required_argument, NULL, 'S'},
                                724                 :                : /* action */
                                725                 :                :         {"create-slot", no_argument, NULL, 1},
                                726                 :                :         {"start", no_argument, NULL, 2},
                                727                 :                :         {"drop-slot", no_argument, NULL, 3},
                                728                 :                :         {"if-not-exists", no_argument, NULL, 4},
                                729                 :                :         {NULL, 0, NULL, 0}
                                730                 :                :     };
                                731                 :                :     int         c;
                                732                 :                :     int         option_index;
                                733                 :                :     uint32      hi,
                                734                 :                :                 lo;
                                735                 :                :     char       *db_name;
                                736                 :                : 
 2647 peter@eisentraut.org      737                 :             68 :     pg_logging_init(argv[0]);
 4487 rhaas@postgresql.org      738                 :             68 :     progname = get_progname(argv[0]);
 3837 alvherre@alvh.no-ip.      739                 :             68 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
                                740                 :                : 
 4487 rhaas@postgresql.org      741         [ +  + ]:             68 :     if (argc > 1)
                                742                 :                :     {
                                743   [ +  +  -  + ]:             67 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
                                744                 :                :         {
                                745                 :              1 :             usage();
                                746                 :              1 :             exit(0);
                                747                 :                :         }
                                748         [ +  - ]:             66 :         else if (strcmp(argv[1], "-V") == 0 ||
                                749         [ +  + ]:             66 :                  strcmp(argv[1], "--version") == 0)
                                750                 :                :         {
                                751                 :              1 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
                                752                 :              1 :             exit(0);
                                753                 :                :         }
                                754                 :                :     }
                                755                 :                : 
 1296 peter@eisentraut.org      756                 :            389 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
 4487 rhaas@postgresql.org      757         [ +  + ]:            389 :                             long_options, &option_index)) != -1)
                                758                 :                :     {
                                759   [ +  +  +  +  :            324 :         switch (c)
                                     +  +  +  -  -  
                                     -  -  -  -  +  
                                     +  +  +  +  +  
                                        +  +  -  + ]
                                760                 :                :         {
                                761                 :                : /* general options */
                                762                 :             27 :             case 'f':
                                763                 :             27 :                 outfile = pg_strdup(optarg);
                                764                 :             27 :                 break;
 4419 andres@anarazel.de        765                 :GBC           2 :             case 'F':
 1802 michael@paquier.xyz       766         [ -  + ]:              2 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
                                767                 :                :                                       INT_MAX / 1000,
                                768                 :                :                                       &fsync_interval))
 4419 andres@anarazel.de        769                 :UBC           0 :                     exit(1);
 1802 michael@paquier.xyz       770                 :GBC           2 :                 fsync_interval *= 1000;
 4419 andres@anarazel.de        771                 :              2 :                 break;
 4487 rhaas@postgresql.org      772                 :CBC          24 :             case 'n':
                                773                 :             24 :                 noloop = 1;
                                774                 :             24 :                 break;
 1826 akapila@postgresql.o      775                 :              2 :             case 't':
                                776                 :              2 :                 two_phase = true;
                                777                 :              2 :                 break;
 1296 peter@eisentraut.org      778                 :GBC           1 :             case 'v':
                                779                 :              1 :                 verbose++;
                                780                 :              1 :                 break;
  452 msawada@postgresql.o      781                 :CBC           1 :             case 5:
                                782                 :              1 :                 failover = true;
                                783                 :              1 :                 break;
                                784                 :                : /* connection options */
 4487 rhaas@postgresql.org      785                 :             62 :             case 'd':
                                786                 :             62 :                 dbname = pg_strdup(optarg);
                                787                 :             62 :                 break;
 4487 rhaas@postgresql.org      788                 :UBC           0 :             case 'h':
                                789                 :              0 :                 dbhost = pg_strdup(optarg);
                                790                 :              0 :                 break;
                                791                 :              0 :             case 'p':
                                792                 :              0 :                 dbport = pg_strdup(optarg);
                                793                 :              0 :                 break;
                                794                 :              0 :             case 'U':
                                795                 :              0 :                 dbuser = pg_strdup(optarg);
                                796                 :              0 :                 break;
                                797                 :              0 :             case 'w':
                                798                 :              0 :                 dbgetpassword = -1;
                                799                 :              0 :                 break;
                                800                 :              0 :             case 'W':
                                801                 :              0 :                 dbgetpassword = 1;
                                802                 :              0 :                 break;
                                803                 :                : /* replication options */
 4419 andres@anarazel.de        804                 :              0 :             case 'I':
  358 alvherre@kurilemu.de      805         [ #  # ]:UNC           0 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
 1544 tgl@sss.pgh.pa.us         806                 :UBC           0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
 4419 andres@anarazel.de        807                 :              0 :                 startpos = ((uint64) hi) << 32 | lo;
                                808                 :              0 :                 break;
 3464 simon@2ndQuadrant.co      809                 :CBC           8 :             case 'E':
  358 alvherre@kurilemu.de      810         [ -  + ]:GNC           8 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
 1544 tgl@sss.pgh.pa.us         811                 :UBC           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
 3464 simon@2ndQuadrant.co      812                 :CBC           8 :                 endpos = ((uint64) hi) << 32 | lo;
                                813                 :              8 :                 break;
 4487 rhaas@postgresql.org      814                 :             42 :             case 'o':
                                815                 :                :                 {
 4438 bruce@momjian.us          816                 :             42 :                     char       *data = pg_strdup(optarg);
                                817                 :             42 :                     char       *val = strchr(data, '=');
                                818                 :                : 
 4487 rhaas@postgresql.org      819         [ +  - ]:             42 :                     if (val != NULL)
                                820                 :                :                     {
                                821                 :                :                         /* remove =; separate data from val */
                                822                 :             42 :                         *val = '\0';
                                823                 :             42 :                         val++;
                                824                 :                :                     }
                                825                 :                : 
                                826                 :             42 :                     noptions += 1;
  123 michael@paquier.xyz       827                 :GNC          42 :                     options = pg_realloc_array(options, char *, noptions * 2);
                                828                 :                : 
 4487 rhaas@postgresql.org      829                 :CBC          42 :                     options[(noptions - 1) * 2] = data;
                                830                 :             42 :                     options[(noptions - 1) * 2 + 1] = val;
                                831                 :                :                 }
                                832                 :                : 
                                833                 :             42 :                 break;
                                834                 :             27 :             case 'P':
                                835                 :             27 :                 plugin = pg_strdup(optarg);
                                836                 :             27 :                 break;
 4487 rhaas@postgresql.org      837                 :GBC           1 :             case 's':
 1802 michael@paquier.xyz       838         [ -  + ]:              1 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
                                839                 :                :                                       INT_MAX / 1000,
                                840                 :                :                                       &standby_message_timeout))
 4487 rhaas@postgresql.org      841                 :UBC           0 :                     exit(1);
 1802 michael@paquier.xyz       842                 :GBC           1 :                 standby_message_timeout *= 1000;
 4487 rhaas@postgresql.org      843                 :              1 :                 break;
 4487 rhaas@postgresql.org      844                 :CBC          64 :             case 'S':
                                845                 :             64 :                 replication_slot = pg_strdup(optarg);
                                846                 :             64 :                 break;
                                847                 :                : /* action */
                                848                 :             31 :             case 1:
                                849                 :             31 :                 do_create_slot = true;
                                850                 :             31 :                 break;
                                851                 :             28 :             case 2:
                                852                 :             28 :                 do_start_slot = true;
                                853                 :             28 :                 break;
                                854                 :              3 :             case 3:
                                855                 :              3 :                 do_drop_slot = true;
                                856                 :              3 :                 break;
 4006 andres@anarazel.de        857                 :UBC           0 :             case 4:
                                858                 :              0 :                 slot_exists_ok = true;
                                859                 :              0 :                 break;
                                860                 :                : 
 4487 rhaas@postgresql.org      861                 :CBC           1 :             default:
                                862                 :                :                 /* getopt_long already emitted a complaint */
 1544 tgl@sss.pgh.pa.us         863                 :              1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      864                 :              1 :                 exit(1);
                                865                 :                :         }
                                866                 :                :     }
                                867                 :                : 
                                868                 :                :     /*
                                869                 :                :      * Any non-option arguments?
                                870                 :                :      */
                                871         [ -  + ]:             65 :     if (optind < argc)
                                872                 :                :     {
 2647 peter@eisentraut.org      873                 :UBC           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
                                874                 :                :                      argv[optind]);
 1544 tgl@sss.pgh.pa.us         875                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      876                 :              0 :         exit(1);
                                877                 :                :     }
                                878                 :                : 
                                879                 :                :     /*
                                880                 :                :      * Required arguments
                                881                 :                :      */
 4487 rhaas@postgresql.org      882         [ +  + ]:CBC          65 :     if (replication_slot == NULL)
                                883                 :                :     {
 2647 peter@eisentraut.org      884                 :              1 :         pg_log_error("no slot specified");
 1544 tgl@sss.pgh.pa.us         885                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      886                 :              1 :         exit(1);
                                887                 :                :     }
                                888                 :                : 
                                889   [ +  +  +  + ]:             64 :     if (do_start_slot && outfile == NULL)
                                890                 :                :     {
 2647 peter@eisentraut.org      891                 :              1 :         pg_log_error("no target file specified");
 1544 tgl@sss.pgh.pa.us         892                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      893                 :              1 :         exit(1);
                                894                 :                :     }
                                895                 :                : 
                                896   [ +  +  +  + ]:             63 :     if (!do_drop_slot && dbname == NULL)
                                897                 :                :     {
 2647 peter@eisentraut.org      898                 :              1 :         pg_log_error("no database specified");
 1544 tgl@sss.pgh.pa.us         899                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      900                 :              1 :         exit(1);
                                901                 :                :     }
                                902                 :                : 
                                903   [ +  +  +  +  :             62 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
                                              +  + ]
                                904                 :                :     {
 2647 peter@eisentraut.org      905                 :              1 :         pg_log_error("at least one action needs to be specified");
 1544 tgl@sss.pgh.pa.us         906                 :              1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      907                 :              1 :         exit(1);
                                908                 :                :     }
                                909                 :                : 
                                910   [ +  +  +  -  :             61 :     if (do_drop_slot && (do_create_slot || do_start_slot))
                                              -  + ]
                                911                 :                :     {
 2647 peter@eisentraut.org      912                 :UBC           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
 1544 tgl@sss.pgh.pa.us         913                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      914                 :              0 :         exit(1);
                                915                 :                :     }
                                916                 :                : 
  236 alvherre@kurilemu.de      917   [ -  +  -  -  :GNC          61 :     if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
                                              -  - ]
                                918                 :                :     {
 2647 peter@eisentraut.org      919                 :UBC           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
 1544 tgl@sss.pgh.pa.us         920                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 4487 rhaas@postgresql.org      921                 :              0 :         exit(1);
                                922                 :                :     }
                                923                 :                : 
  236 alvherre@kurilemu.de      924   [ +  +  -  + ]:GNC          61 :     if (XLogRecPtrIsValid(endpos) && !do_start_slot)
                                925                 :                :     {
 2647 peter@eisentraut.org      926                 :UBC           0 :         pg_log_error("--endpos may only be specified with --start");
 1544 tgl@sss.pgh.pa.us         927                 :              0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
 3464 simon@2ndQuadrant.co      928                 :              0 :         exit(1);
                                929                 :                :     }
                                930                 :                : 
  452 msawada@postgresql.o      931         [ +  + ]:CBC          61 :     if (!do_create_slot)
                                932                 :                :     {
                                933         [ +  + ]:             30 :         if (two_phase)
                                934                 :                :         {
  366 peter@eisentraut.org      935                 :              1 :             pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
  452 msawada@postgresql.o      936                 :              1 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
                                937                 :              1 :             exit(1);
                                938                 :                :         }
                                939                 :                : 
                                940         [ -  + ]:             29 :         if (failover)
                                941                 :                :         {
  366 peter@eisentraut.org      942                 :UBC           0 :             pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
  452 msawada@postgresql.o      943                 :              0 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
                                944                 :              0 :             exit(1);
                                945                 :                :         }
                                946                 :                :     }
                                947                 :                : 
                                948                 :                :     /*
                                949                 :                :      * Obtain a connection to server.  Notably, if we need a password, we want
                                950                 :                :      * to collect it from the user immediately.
                                951                 :                :      */
 4290 andres@anarazel.de        952                 :CBC          60 :     conn = GetConnection();
                                953         [ -  + ]:             60 :     if (!conn)
                                954                 :                :         /* Error message already written in GetConnection() */
 4290 andres@anarazel.de        955                 :UBC           0 :         exit(1);
 2740 peter@eisentraut.org      956                 :CBC          60 :     atexit(disconnect_atexit);
                                957                 :                : 
                                958                 :                :     /*
                                959                 :                :      * Trap signals.  (Don't do this until after the initial password prompt,
                                960                 :                :      * if one is needed, in GetConnection.)
                                961                 :                :      */
                                962                 :                : #ifndef WIN32
 1385 dgustafsson@postgres      963                 :             60 :     pqsignal(SIGINT, sigexit_handler);
                                964                 :             60 :     pqsignal(SIGTERM, sigexit_handler);
 1682 tgl@sss.pgh.pa.us         965                 :             60 :     pqsignal(SIGHUP, sighup_handler);
                                966                 :                : #endif
                                967                 :                : 
                                968                 :                :     /*
                                969                 :                :      * Run IDENTIFY_SYSTEM to check the connection type for each action.
                                970                 :                :      * --create-slot and --start actions require a database-specific
                                971                 :                :      * replication connection because they handle logical replication slots.
                                972                 :                :      * --drop-slot can remove replication slots from any replication
                                973                 :                :      * connection without this restriction.
                                974                 :                :      */
 4290 andres@anarazel.de        975         [ -  + ]:             60 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
 2740 peter@eisentraut.org      976                 :UBC           0 :         exit(1);
                                977                 :                : 
  462 fujii@postgresql.org      978   [ +  +  -  + ]:CBC          60 :     if (!do_drop_slot && db_name == NULL)
 1544 tgl@sss.pgh.pa.us         979                 :UBC           0 :         pg_fatal("could not establish database-specific replication connection");
                                980                 :                : 
                                981                 :                :     /*
                                982                 :                :      * Set umask so that directories/files are created with the same
                                983                 :                :      * permissions as directories/files in the source data directory.
                                984                 :                :      *
                                985                 :                :      * pg_mode_mask is set to owner-only by default and then updated in
                                986                 :                :      * GetConnection() where we get the mode from the server-side with
                                987                 :                :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
                                988                 :                :      */
 3006 sfrost@snowman.net        989                 :CBC          60 :     umask(pg_mode_mask);
                                990                 :                : 
                                991                 :                :     /* Drop a replication slot. */
 4487 rhaas@postgresql.org      992         [ +  + ]:             60 :     if (do_drop_slot)
                                993                 :                :     {
                                994         [ -  + ]:              3 :         if (verbose)
 2647 peter@eisentraut.org      995                 :UBC           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
                                996                 :                : 
 4290 andres@anarazel.de        997         [ -  + ]:CBC           3 :         if (!DropReplicationSlot(conn, replication_slot))
 2740 peter@eisentraut.org      998                 :UBC           0 :             exit(1);
                                999                 :                :     }
                               1000                 :                : 
                               1001                 :                :     /* Create a replication slot. */
 4487 rhaas@postgresql.org     1002         [ +  + ]:CBC          60 :     if (do_create_slot)
                               1003                 :                :     {
                               1004         [ -  + ]:             31 :         if (verbose)
 2647 peter@eisentraut.org     1005                 :UBC           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
                               1006                 :                : 
 3199 peter_e@gmx.net          1007         [ -  + ]:CBC          31 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
                               1008                 :                :                                    false, false, slot_exists_ok, two_phase,
                               1009                 :                :                                    failover))
 2740 peter@eisentraut.org     1010                 :UBC           0 :             exit(1);
 4006 andres@anarazel.de       1011                 :CBC          31 :         startpos = InvalidXLogRecPtr;
                               1012                 :                :     }
                               1013                 :                : 
 4487 rhaas@postgresql.org     1014         [ +  + ]:             60 :     if (!do_start_slot)
 2740 peter@eisentraut.org     1015                 :             34 :         exit(0);
                               1016                 :                : 
                               1017                 :                :     /* Stream loop */
                               1018                 :                :     while (true)
                               1019                 :                :     {
 4290 andres@anarazel.de       1020                 :             27 :         StreamLogicalLog();
 4487 rhaas@postgresql.org     1021         [ +  + ]:             27 :         if (time_to_abort)
                               1022                 :                :         {
                               1023                 :                :             /*
                               1024                 :                :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
                               1025                 :                :              * not an error, so exit without an errorcode.
                               1026                 :                :              */
 2740 peter@eisentraut.org     1027                 :             10 :             exit(0);
                               1028                 :                :         }
                               1029                 :                : 
                               1030                 :                :         /*
                               1031                 :                :          * Ensure all written data is flushed to disk before exiting or
                               1032                 :                :          * starting a new replication.
                               1033                 :                :          */
  165 fujii@postgresql.org     1034         [ +  + ]:GNC          17 :         if (outfd != -1)
                               1035                 :             10 :             OutputFsync(feGetCurrentTimestamp());
                               1036                 :                : 
                               1037         [ +  + ]:             17 :         if (noloop)
                               1038                 :                :         {
 1544 tgl@sss.pgh.pa.us        1039                 :CBC          16 :             pg_fatal("disconnected");
                               1040                 :                :         }
                               1041                 :                :         else
                               1042                 :                :         {
                               1043                 :                :             /* translator: check source for value for %d */
 2647 peter@eisentraut.org     1044                 :GBC           1 :             pg_log_info("disconnected; waiting %d seconds to try again",
                               1045                 :                :                         RECONNECT_SLEEP_TIME);
 4487 rhaas@postgresql.org     1046                 :              1 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
                               1047                 :                :         }
                               1048                 :                :     }
                               1049                 :                : }
                               1050                 :                : 
                               1051                 :                : /*
                               1052                 :                :  * Fsync our output data, and send a feedback message to the server.  Returns
                               1053                 :                :  * true if successful, false otherwise.
                               1054                 :                :  *
                               1055                 :                :  * If successful, *now is updated to the current timestamp just before sending
                               1056                 :                :  * feedback.
                               1057                 :                :  */
                               1058                 :                : static bool
 3464 simon@2ndQuadrant.co     1059                 :CBC           8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
                               1060                 :                : {
                               1061                 :                :     /* flush data to disk, so that we send a recent flush pointer */
  165 fujii@postgresql.org     1062                 :GNC           8 :     OutputFsync(*now);
 3464 simon@2ndQuadrant.co     1063                 :CBC           8 :     *now = feGetCurrentTimestamp();
                               1064         [ -  + ]:              8 :     if (!sendFeedback(conn, *now, true, false))
 3464 simon@2ndQuadrant.co     1065                 :UBC           0 :         return false;
                               1066                 :                : 
 3464 simon@2ndQuadrant.co     1067                 :CBC           8 :     return true;
                               1068                 :                : }
                               1069                 :                : 
                               1070                 :                : /*
                               1071                 :                :  * Try to inform the server about our upcoming demise, but don't wait around or
                               1072                 :                :  * retry on failure.
                               1073                 :                :  */
                               1074                 :                : static void
 1076 michael@paquier.xyz      1075                 :             10 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
                               1076                 :                :                    XLogRecPtr lsn)
                               1077                 :                : {
 3464 simon@2ndQuadrant.co     1078                 :             10 :     (void) PQputCopyEnd(conn, NULL);
                               1079                 :             10 :     (void) PQflush(conn);
                               1080                 :                : 
                               1081         [ +  + ]:             10 :     if (verbose)
                               1082                 :                :     {
 1076 michael@paquier.xyz      1083   [ +  -  -  -  :GBC           1 :         switch (reason)
                                                 - ]
                               1084                 :                :         {
                               1085                 :              1 :             case STREAM_STOP_SIGNAL:
                               1086                 :              1 :                 pg_log_info("received interrupt signal, exiting");
                               1087                 :              1 :                 break;
 1076 michael@paquier.xyz      1088                 :UBC           0 :             case STREAM_STOP_KEEPALIVE:
  358 alvherre@kurilemu.de     1089                 :UNC           0 :                 pg_log_info("end position %X/%08X reached by keepalive",
                               1090                 :                :                             LSN_FORMAT_ARGS(endpos));
 1076 michael@paquier.xyz      1091                 :UBC           0 :                 break;
                               1092                 :              0 :             case STREAM_STOP_END_OF_WAL:
  236 alvherre@kurilemu.de     1093         [ #  # ]:UNC           0 :                 Assert(XLogRecPtrIsValid(lsn));
  358                          1094                 :              0 :                 pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
                               1095                 :                :                             LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
 1076 michael@paquier.xyz      1096                 :UBC           0 :                 break;
                               1097                 :              0 :             case STREAM_STOP_NONE:
                               1098                 :              0 :                 Assert(false);
                               1099                 :                :                 break;
                               1100                 :                :         }
                               1101                 :                :     }
 3464 simon@2ndQuadrant.co     1102                 :CBC          10 : }
        

Generated by: LCOV version 2.0-1