LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 379 483 78.5 %
Date: 2026-02-02 14:17:46 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
       4             :  *                    fashion and write it to a local file.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/bin/pg_basebackup/pg_recvlogical.c
      10             :  *-------------------------------------------------------------------------
      11             :  */
      12             : 
      13             : #include "postgres_fe.h"
      14             : 
      15             : #include <dirent.h>
      16             : #include <limits.h>
      17             : #include <sys/select.h>
      18             : #include <sys/stat.h>
      19             : #include <unistd.h>
      20             : 
      21             : #include "common/file_perm.h"
      22             : #include "common/logging.h"
      23             : #include "fe_utils/option_utils.h"
      24             : #include "getopt_long.h"
      25             : #include "libpq-fe.h"
      26             : #include "libpq/pqsignal.h"
      27             : #include "libpq/protocol.h"
      28             : #include "pqexpbuffer.h"
      29             : #include "streamutil.h"
      30             : 
      31             : /* Time to sleep between reconnection attempts */
      32             : #define RECONNECT_SLEEP_TIME 5
      33             : 
      34             : typedef enum
      35             : {
      36             :     STREAM_STOP_NONE,
      37             :     STREAM_STOP_END_OF_WAL,
      38             :     STREAM_STOP_KEEPALIVE,
      39             :     STREAM_STOP_SIGNAL
      40             : } StreamStopReason;
      41             : 
      42             : /* Global Options */
      43             : static char *outfile = NULL;
      44             : static int  verbose = 0;
      45             : static bool two_phase = false;  /* enable-two-phase option */
      46             : static bool failover = false;   /* enable-failover option */
      47             : static int  noloop = 0;
      48             : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      49             : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
      50             : static XLogRecPtr startpos = InvalidXLogRecPtr;
      51             : static XLogRecPtr endpos = InvalidXLogRecPtr;
      52             : static bool do_create_slot = false;
      53             : static bool slot_exists_ok = false;
      54             : static bool do_start_slot = false;
      55             : static bool do_drop_slot = false;
      56             : static char *replication_slot = NULL;
      57             : 
      58             : /* filled pairwise with option, value. value may be NULL */
      59             : static char **options;
      60             : static size_t noptions = 0;
      61             : static const char *plugin = "test_decoding";
      62             : 
      63             : /* Global State */
      64             : static int  outfd = -1;
      65             : static volatile sig_atomic_t time_to_abort = false;
      66             : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
      67             : static volatile sig_atomic_t output_reopen = false;
      68             : static bool output_isfile;
      69             : static TimestampTz output_last_fsync = -1;
      70             : static bool output_needs_fsync = false;
      71             : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
      72             : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
      73             : 
      74             : static void usage(void);
      75             : static void StreamLogicalLog(void);
      76             : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
      77             : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
      78             :                                StreamStopReason reason,
      79             :                                XLogRecPtr lsn);
      80             : 
      81             : static void
      82           2 : usage(void)
      83             : {
      84           2 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
      85             :            progname);
      86           2 :     printf(_("Usage:\n"));
      87           2 :     printf(_("  %s [OPTION]...\n"), progname);
      88           2 :     printf(_("\nAction to be performed:\n"));
      89           2 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
      90           2 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
      91           2 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
      92           2 :     printf(_("\nOptions:\n"));
      93           2 :     printf(_("      --enable-failover  enable replication slot synchronization to standby servers when\n"
      94             :              "                         creating a replication slot\n"));
      95           2 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      96           2 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
      97           2 :     printf(_("  -F  --fsync-interval=SECS\n"
      98             :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
      99           2 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
     100           2 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
     101           2 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
     102           2 :     printf(_("  -o, --option=NAME[=VALUE]\n"
     103             :              "                         pass option NAME with optional value VALUE to the\n"
     104             :              "                         output plugin\n"));
     105           2 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
     106           2 :     printf(_("  -s, --status-interval=SECS\n"
     107             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
     108           2 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
     109           2 :     printf(_("  -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
     110           2 :     printf(_("      --two-phase        (same as --enable-two-phase, deprecated)\n"));
     111           2 :     printf(_("  -v, --verbose          output verbose messages\n"));
     112           2 :     printf(_("  -V, --version          output version information, then exit\n"));
     113           2 :     printf(_("  -?, --help             show this help, then exit\n"));
     114           2 :     printf(_("\nConnection options:\n"));
     115           2 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
     116           2 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
     117           2 :     printf(_("  -p, --port=PORT        database server port number\n"));
     118           2 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     119           2 :     printf(_("  -w, --no-password      never prompt for password\n"));
     120           2 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     121           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     122           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     123           2 : }
     124             : 
     125             : /*
     126             :  * Send a Standby Status Update message to server.
     127             :  */
     128             : static bool
     129          54 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
     130             : {
     131             :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
     132             :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
     133             : 
     134             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     135          54 :     int         len = 0;
     136             : 
     137             :     /*
     138             :      * we normally don't want to send superfluous feedback, but if it's
     139             :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
     140             :      * us.
     141             :      */
     142          54 :     if (!force &&
     143           0 :         last_written_lsn == output_written_lsn &&
     144           0 :         last_fsync_lsn == output_fsync_lsn)
     145           0 :         return true;
     146             : 
     147          54 :     if (verbose)
     148           4 :         pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
     149             :                     LSN_FORMAT_ARGS(output_written_lsn),
     150             :                     LSN_FORMAT_ARGS(output_fsync_lsn),
     151             :                     replication_slot);
     152             : 
     153          54 :     replybuf[len] = PqReplMsg_StandbyStatusUpdate;
     154          54 :     len += 1;
     155          54 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
     156          54 :     len += 8;
     157          54 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
     158          54 :     len += 8;
     159          54 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     160          54 :     len += 8;
     161          54 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     162          54 :     len += 8;
     163          54 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     164          54 :     len += 1;
     165             : 
     166          54 :     startpos = output_written_lsn;
     167          54 :     last_written_lsn = output_written_lsn;
     168          54 :     last_fsync_lsn = output_fsync_lsn;
     169             : 
     170          54 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     171             :     {
     172           0 :         pg_log_error("could not send feedback packet: %s",
     173             :                      PQerrorMessage(conn));
     174           0 :         return false;
     175             :     }
     176             : 
     177          54 :     return true;
     178             : }
     179             : 
     180             : static void
     181         112 : disconnect_atexit(void)
     182             : {
     183         112 :     if (conn != NULL)
     184          64 :         PQfinish(conn);
     185         112 : }
     186             : 
     187             : static void
     188          74 : OutputFsync(TimestampTz now)
     189             : {
     190          74 :     output_last_fsync = now;
     191             : 
     192          74 :     output_fsync_lsn = output_written_lsn;
     193             : 
     194             :     /*
     195             :      * Save the last flushed position as the replication start point. On
     196             :      * reconnect, replication resumes from there to avoid re-sending flushed
     197             :      * data.
     198             :      */
     199          74 :     startpos = output_fsync_lsn;
     200             : 
     201          74 :     if (fsync_interval <= 0)
     202           0 :         return;
     203             : 
     204          74 :     if (!output_needs_fsync)
     205          50 :         return;
     206             : 
     207          24 :     output_needs_fsync = false;
     208             : 
     209             :     /* can only fsync if it's a regular file */
     210          24 :     if (!output_isfile)
     211          16 :         return;
     212             : 
     213           8 :     if (fsync(outfd) != 0)
     214           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
     215             : }
     216             : 
     217             : /*
     218             :  * Start the log streaming
     219             :  */
     220             : static void
     221          50 : StreamLogicalLog(void)
     222             : {
     223             :     PGresult   *res;
     224          50 :     char       *copybuf = NULL;
     225          50 :     TimestampTz last_status = -1;
     226             :     int         i;
     227             :     PQExpBuffer query;
     228             :     XLogRecPtr  cur_record_lsn;
     229             : 
     230          50 :     cur_record_lsn = InvalidXLogRecPtr;
     231             : 
     232             :     /*
     233             :      * Connect in replication mode to the server
     234             :      */
     235          50 :     if (!conn)
     236           2 :         conn = GetConnection();
     237          50 :     if (!conn)
     238             :         /* Error message already written in GetConnection() */
     239           0 :         return;
     240             : 
     241             :     /*
     242             :      * Start the replication
     243             :      */
     244          50 :     if (verbose)
     245           4 :         pg_log_info("starting log streaming at %X/%08X (slot %s)",
     246             :                     LSN_FORMAT_ARGS(startpos),
     247             :                     replication_slot);
     248             : 
     249             :     /* Initiate the replication stream at specified location */
     250          50 :     query = createPQExpBuffer();
     251          50 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
     252          50 :                       replication_slot, LSN_FORMAT_ARGS(startpos));
     253             : 
     254             :     /* print options if there are any */
     255          50 :     if (noptions)
     256          40 :         appendPQExpBufferStr(query, " (");
     257             : 
     258         130 :     for (i = 0; i < noptions; i++)
     259             :     {
     260             :         /* separator */
     261          80 :         if (i > 0)
     262          40 :             appendPQExpBufferStr(query, ", ");
     263             : 
     264             :         /* write option name */
     265          80 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
     266             : 
     267             :         /* write option value if specified */
     268          80 :         if (options[(i * 2) + 1] != NULL)
     269          80 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
     270             :     }
     271             : 
     272          50 :     if (noptions)
     273          40 :         appendPQExpBufferChar(query, ')');
     274             : 
     275          50 :     res = PQexec(conn, query->data);
     276          50 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
     277             :     {
     278          12 :         pg_log_error("could not send replication command \"%s\": %s",
     279             :                      query->data, PQresultErrorMessage(res));
     280          12 :         PQclear(res);
     281          12 :         goto error;
     282             :     }
     283          38 :     PQclear(res);
     284          38 :     resetPQExpBuffer(query);
     285             : 
     286          38 :     if (verbose)
     287           4 :         pg_log_info("streaming initiated");
     288             : 
     289         968 :     while (!time_to_abort)
     290             :     {
     291             :         int         r;
     292             :         int         bytes_left;
     293             :         int         bytes_written;
     294             :         TimestampTz now;
     295             :         int         hdr_len;
     296             : 
     297         964 :         cur_record_lsn = InvalidXLogRecPtr;
     298             : 
     299         964 :         if (copybuf != NULL)
     300             :         {
     301         602 :             PQfreemem(copybuf);
     302         602 :             copybuf = NULL;
     303             :         }
     304             : 
     305             :         /*
     306             :          * Potentially send a status message to the primary.
     307             :          */
     308         964 :         now = feGetCurrentTimestamp();
     309             : 
     310        1892 :         if (outfd != -1 &&
     311         928 :             feTimestampDifferenceExceeds(output_last_fsync, now,
     312             :                                          fsync_interval))
     313          38 :             OutputFsync(now);
     314             : 
     315        1928 :         if (standby_message_timeout > 0 &&
     316         964 :             feTimestampDifferenceExceeds(last_status, now,
     317             :                                          standby_message_timeout))
     318             :         {
     319             :             /* Time to send feedback! */
     320          38 :             if (!sendFeedback(conn, now, true, false))
     321           4 :                 goto error;
     322             : 
     323          38 :             last_status = now;
     324             :         }
     325             : 
     326             :         /* got SIGHUP, close output file */
     327         964 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
     328             :         {
     329           0 :             now = feGetCurrentTimestamp();
     330           0 :             OutputFsync(now);
     331           0 :             close(outfd);
     332           0 :             outfd = -1;
     333             :         }
     334         964 :         output_reopen = false;
     335             : 
     336             :         /* open the output file, if not open yet */
     337         964 :         if (outfd == -1)
     338             :         {
     339             :             struct stat statbuf;
     340             : 
     341          36 :             if (strcmp(outfile, "-") == 0)
     342          34 :                 outfd = fileno(stdout);
     343             :             else
     344           2 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
     345             :                              S_IRUSR | S_IWUSR);
     346          36 :             if (outfd == -1)
     347             :             {
     348           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
     349           0 :                 goto error;
     350             :             }
     351             : 
     352          36 :             if (fstat(outfd, &statbuf) != 0)
     353             :             {
     354           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
     355           0 :                 goto error;
     356             :             }
     357             : 
     358          36 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
     359             :         }
     360             : 
     361         964 :         r = PQgetCopyData(conn, &copybuf, 1);
     362         964 :         if (r == 0)
     363         324 :         {
     364             :             /*
     365             :              * In async mode, and no data available. We block on reading but
     366             :              * not more than the specified timeout, so that we can send a
     367             :              * response back to the client.
     368             :              */
     369             :             fd_set      input_mask;
     370         332 :             TimestampTz message_target = 0;
     371         332 :             TimestampTz fsync_target = 0;
     372             :             struct timeval timeout;
     373         332 :             struct timeval *timeoutptr = NULL;
     374             : 
     375         332 :             if (PQsocket(conn) < 0)
     376             :             {
     377           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     378           4 :                 goto error;
     379             :             }
     380             : 
     381        5644 :             FD_ZERO(&input_mask);
     382         332 :             FD_SET(PQsocket(conn), &input_mask);
     383             : 
     384             :             /* Compute when we need to wakeup to send a keepalive message. */
     385         332 :             if (standby_message_timeout)
     386         332 :                 message_target = last_status + (standby_message_timeout - 1) *
     387             :                     ((int64) 1000);
     388             : 
     389             :             /* Compute when we need to wakeup to fsync the output file. */
     390         332 :             if (fsync_interval > 0 && output_needs_fsync)
     391         154 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
     392             :                     ((int64) 1000);
     393             : 
     394             :             /* Now compute when to wakeup. */
     395         332 :             if (message_target > 0 || fsync_target > 0)
     396             :             {
     397             :                 TimestampTz targettime;
     398             :                 long        secs;
     399             :                 int         usecs;
     400             : 
     401         332 :                 targettime = message_target;
     402             : 
     403         332 :                 if (fsync_target > 0 && fsync_target < targettime)
     404          10 :                     targettime = fsync_target;
     405             : 
     406         332 :                 feTimestampDifference(now,
     407             :                                       targettime,
     408             :                                       &secs,
     409             :                                       &usecs);
     410         332 :                 if (secs <= 0)
     411          10 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
     412             :                 else
     413         322 :                     timeout.tv_sec = secs;
     414         332 :                 timeout.tv_usec = usecs;
     415         332 :                 timeoutptr = &timeout;
     416             :             }
     417             : 
     418         332 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
     419         332 :             if (r == 0 || (r < 0 && errno == EINTR))
     420             :             {
     421             :                 /*
     422             :                  * Got a timeout or signal. Continue the loop and either
     423             :                  * deliver a status packet to the server or just go back into
     424             :                  * blocking.
     425             :                  */
     426         328 :                 continue;
     427             :             }
     428         328 :             else if (r < 0)
     429             :             {
     430           0 :                 pg_log_error("%s() failed: %m", "select");
     431           0 :                 goto error;
     432             :             }
     433             : 
     434             :             /* Else there is actually data on the socket */
     435         328 :             if (PQconsumeInput(conn) == 0)
     436             :             {
     437           4 :                 pg_log_error("could not receive data from WAL stream: %s",
     438             :                              PQerrorMessage(conn));
     439           4 :                 goto error;
     440             :             }
     441         324 :             continue;
     442             :         }
     443             : 
     444             :         /* End of copy stream */
     445         632 :         if (r == -1)
     446          30 :             break;
     447             : 
     448             :         /* Failure while reading the copy stream */
     449         616 :         if (r == -2)
     450             :         {
     451           0 :             pg_log_error("could not read COPY data: %s",
     452             :                          PQerrorMessage(conn));
     453           0 :             goto error;
     454             :         }
     455             : 
     456             :         /* Check the message type. */
     457         616 :         if (copybuf[0] == PqReplMsg_Keepalive)
     458         418 :         {
     459             :             int         pos;
     460             :             bool        replyRequested;
     461             :             XLogRecPtr  walEnd;
     462         422 :             bool        endposReached = false;
     463             : 
     464             :             /*
     465             :              * Parse the keepalive message, enclosed in the CopyData message.
     466             :              * We just check if the server requested a reply, and ignore the
     467             :              * rest.
     468             :              */
     469         422 :             pos = 1;            /* skip msgtype PqReplMsg_Keepalive */
     470         422 :             walEnd = fe_recvint64(&copybuf[pos]);
     471         422 :             output_written_lsn = Max(walEnd, output_written_lsn);
     472             : 
     473         422 :             pos += 8;           /* read walEnd */
     474             : 
     475         422 :             pos += 8;           /* skip sendTime */
     476             : 
     477         422 :             if (r < pos + 1)
     478             :             {
     479           0 :                 pg_log_error("streaming header too small: %d", r);
     480           0 :                 goto error;
     481             :             }
     482         422 :             replyRequested = copybuf[pos];
     483             : 
     484         422 :             if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
     485             :             {
     486             :                 /*
     487             :                  * If there's nothing to read on the socket until a keepalive
     488             :                  * we know that the server has nothing to send us; and if
     489             :                  * walEnd has passed endpos, we know nothing else can have
     490             :                  * committed before endpos.  So we can bail out now.
     491             :                  */
     492           4 :                 endposReached = true;
     493             :             }
     494             : 
     495             :             /* Send a reply, if necessary */
     496         422 :             if (replyRequested || endposReached)
     497             :             {
     498           6 :                 if (!flushAndSendFeedback(conn, &now))
     499           0 :                     goto error;
     500           6 :                 last_status = now;
     501             :             }
     502             : 
     503         422 :             if (endposReached)
     504             :             {
     505           4 :                 stop_reason = STREAM_STOP_KEEPALIVE;
     506           4 :                 time_to_abort = true;
     507           4 :                 break;
     508             :             }
     509             : 
     510         418 :             continue;
     511             :         }
     512         194 :         else if (copybuf[0] != PqReplMsg_WALData)
     513             :         {
     514           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
     515             :                          copybuf[0]);
     516           0 :             goto error;
     517             :         }
     518             : 
     519             :         /*
     520             :          * Read the header of the WALData message, enclosed in the CopyData
     521             :          * message. We only need the WAL location field (dataStart), the rest
     522             :          * of the header is ignored.
     523             :          */
     524         194 :         hdr_len = 1;            /* msgtype PqReplMsg_WALData */
     525         194 :         hdr_len += 8;           /* dataStart */
     526         194 :         hdr_len += 8;           /* walEnd */
     527         194 :         hdr_len += 8;           /* sendTime */
     528         194 :         if (r < hdr_len + 1)
     529             :         {
     530           0 :             pg_log_error("streaming header too small: %d", r);
     531           0 :             goto error;
     532             :         }
     533             : 
     534             :         /* Extract WAL location for this block */
     535         194 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
     536             : 
     537         194 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
     538             :         {
     539             :             /*
     540             :              * We've read past our endpoint, so prepare to go away being
     541             :              * cautious about what happens to our output data.
     542             :              */
     543           0 :             if (!flushAndSendFeedback(conn, &now))
     544           0 :                 goto error;
     545           0 :             stop_reason = STREAM_STOP_END_OF_WAL;
     546           0 :             time_to_abort = true;
     547           0 :             break;
     548             :         }
     549             : 
     550         194 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
     551             : 
     552         194 :         bytes_left = r - hdr_len;
     553         194 :         bytes_written = 0;
     554             : 
     555             :         /* signal that a fsync is needed */
     556         194 :         output_needs_fsync = true;
     557             : 
     558         388 :         while (bytes_left)
     559             :         {
     560             :             int         ret;
     561             : 
     562         388 :             ret = write(outfd,
     563         194 :                         copybuf + hdr_len + bytes_written,
     564             :                         bytes_left);
     565             : 
     566         194 :             if (ret < 0)
     567             :             {
     568           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
     569             :                              bytes_left, outfile);
     570           0 :                 goto error;
     571             :             }
     572             : 
     573             :             /* Write was successful, advance our position */
     574         194 :             bytes_written += ret;
     575         194 :             bytes_left -= ret;
     576             :         }
     577             : 
     578         194 :         if (write(outfd, "\n", 1) != 1)
     579             :         {
     580           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
     581             :                          1, outfile);
     582           0 :             goto error;
     583             :         }
     584             : 
     585         194 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
     586             :         {
     587             :             /* endpos was exactly the record we just processed, we're done */
     588          10 :             if (!flushAndSendFeedback(conn, &now))
     589           0 :                 goto error;
     590          10 :             stop_reason = STREAM_STOP_END_OF_WAL;
     591          10 :             time_to_abort = true;
     592          10 :             break;
     593             :         }
     594             :     }
     595             : 
     596             :     /* Clean up connection state if stream has been aborted */
     597          34 :     if (time_to_abort)
     598          18 :         prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
     599             : 
     600          34 :     res = PQgetResult(conn);
     601          34 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
     602             :     {
     603          18 :         PQclear(res);
     604             : 
     605             :         /*
     606             :          * We're doing a client-initiated clean exit and have sent CopyDone to
     607             :          * the server. Drain any messages, so we don't miss a last-minute
     608             :          * ErrorResponse. The walsender stops generating WALData records once
     609             :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
     610             :          * it's too late for sendFeedback(), even if this were to take a long
     611             :          * time. Hence, use synchronous-mode PQgetCopyData().
     612             :          */
     613             :         while (1)
     614           4 :         {
     615             :             int         r;
     616             : 
     617          22 :             if (copybuf != NULL)
     618             :             {
     619          18 :                 PQfreemem(copybuf);
     620          18 :                 copybuf = NULL;
     621             :             }
     622          22 :             r = PQgetCopyData(conn, &copybuf, 0);
     623          22 :             if (r == -1)
     624          18 :                 break;
     625           4 :             if (r == -2)
     626             :             {
     627           0 :                 pg_log_error("could not read COPY data: %s",
     628             :                              PQerrorMessage(conn));
     629           0 :                 time_to_abort = false;  /* unclean exit */
     630           0 :                 goto error;
     631             :             }
     632             :         }
     633             : 
     634          18 :         res = PQgetResult(conn);
     635             :     }
     636          34 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     637             :     {
     638          14 :         pg_log_error("unexpected termination of replication stream: %s",
     639             :                      PQresultErrorMessage(res));
     640          14 :         PQclear(res);
     641          14 :         goto error;
     642             :     }
     643          20 :     PQclear(res);
     644             : 
     645          20 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
     646             :     {
     647           2 :         TimestampTz t = feGetCurrentTimestamp();
     648             : 
     649           2 :         OutputFsync(t);
     650           2 :         if (close(outfd) != 0)
     651           0 :             pg_log_error("could not close file \"%s\": %m", outfile);
     652             :     }
     653          20 :     outfd = -1;
     654          50 : error:
     655          50 :     if (copybuf != NULL)
     656             :     {
     657           0 :         PQfreemem(copybuf);
     658           0 :         copybuf = NULL;
     659             :     }
     660          50 :     destroyPQExpBuffer(query);
     661          50 :     PQfinish(conn);
     662          50 :     conn = NULL;
     663             : }
     664             : 
     665             : /*
     666             :  * Unfortunately we can't do sensible signal handling on windows...
     667             :  */
     668             : #ifndef WIN32
     669             : 
     670             : /*
     671             :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
     672             :  * possible moment.
     673             :  */
     674             : static void
     675           4 : sigexit_handler(SIGNAL_ARGS)
     676             : {
     677           4 :     stop_reason = STREAM_STOP_SIGNAL;
     678           4 :     time_to_abort = true;
     679           4 : }
     680             : 
     681             : /*
     682             :  * Trigger the output file to be reopened.
     683             :  */
     684             : static void
     685           0 : sighup_handler(SIGNAL_ARGS)
     686             : {
     687           0 :     output_reopen = true;
     688           0 : }
     689             : #endif
     690             : 
     691             : 
     692             : int
     693         128 : main(int argc, char **argv)
     694             : {
     695             :     static struct option long_options[] = {
     696             : /* general options */
     697             :         {"file", required_argument, NULL, 'f'},
     698             :         {"fsync-interval", required_argument, NULL, 'F'},
     699             :         {"no-loop", no_argument, NULL, 'n'},
     700             :         {"enable-failover", no_argument, NULL, 5},
     701             :         {"enable-two-phase", no_argument, NULL, 't'},
     702             :         {"two-phase", no_argument, NULL, 't'},    /* deprecated */
     703             :         {"verbose", no_argument, NULL, 'v'},
     704             :         {"version", no_argument, NULL, 'V'},
     705             :         {"help", no_argument, NULL, '?'},
     706             : /* connection options */
     707             :         {"dbname", required_argument, NULL, 'd'},
     708             :         {"host", required_argument, NULL, 'h'},
     709             :         {"port", required_argument, NULL, 'p'},
     710             :         {"username", required_argument, NULL, 'U'},
     711             :         {"no-password", no_argument, NULL, 'w'},
     712             :         {"password", no_argument, NULL, 'W'},
     713             : /* replication options */
     714             :         {"startpos", required_argument, NULL, 'I'},
     715             :         {"endpos", required_argument, NULL, 'E'},
     716             :         {"option", required_argument, NULL, 'o'},
     717             :         {"plugin", required_argument, NULL, 'P'},
     718             :         {"status-interval", required_argument, NULL, 's'},
     719             :         {"slot", required_argument, NULL, 'S'},
     720             : /* action */
     721             :         {"create-slot", no_argument, NULL, 1},
     722             :         {"start", no_argument, NULL, 2},
     723             :         {"drop-slot", no_argument, NULL, 3},
     724             :         {"if-not-exists", no_argument, NULL, 4},
     725             :         {NULL, 0, NULL, 0}
     726             :     };
     727             :     int         c;
     728             :     int         option_index;
     729             :     uint32      hi,
     730             :                 lo;
     731             :     char       *db_name;
     732             : 
     733         128 :     pg_logging_init(argv[0]);
     734         128 :     progname = get_progname(argv[0]);
     735         128 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     736             : 
     737         128 :     if (argc > 1)
     738             :     {
     739         126 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     740             :         {
     741           2 :             usage();
     742           2 :             exit(0);
     743             :         }
     744         124 :         else if (strcmp(argv[1], "-V") == 0 ||
     745         124 :                  strcmp(argv[1], "--version") == 0)
     746             :         {
     747           2 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
     748           2 :             exit(0);
     749             :         }
     750             :     }
     751             : 
     752         730 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
     753         730 :                             long_options, &option_index)) != -1)
     754             :     {
     755         608 :         switch (c)
     756             :         {
     757             : /* general options */
     758          50 :             case 'f':
     759          50 :                 outfile = pg_strdup(optarg);
     760          50 :                 break;
     761           2 :             case 'F':
     762           2 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
     763             :                                       INT_MAX / 1000,
     764             :                                       &fsync_interval))
     765           0 :                     exit(1);
     766           2 :                 fsync_interval *= 1000;
     767           2 :                 break;
     768          46 :             case 'n':
     769          46 :                 noloop = 1;
     770          46 :                 break;
     771           4 :             case 't':
     772           4 :                 two_phase = true;
     773           4 :                 break;
     774           2 :             case 'v':
     775           2 :                 verbose++;
     776           2 :                 break;
     777           2 :             case 5:
     778           2 :                 failover = true;
     779           2 :                 break;
     780             : /* connection options */
     781         116 :             case 'd':
     782         116 :                 dbname = pg_strdup(optarg);
     783         116 :                 break;
     784           0 :             case 'h':
     785           0 :                 dbhost = pg_strdup(optarg);
     786           0 :                 break;
     787           0 :             case 'p':
     788           0 :                 dbport = pg_strdup(optarg);
     789           0 :                 break;
     790           0 :             case 'U':
     791           0 :                 dbuser = pg_strdup(optarg);
     792           0 :                 break;
     793           0 :             case 'w':
     794           0 :                 dbgetpassword = -1;
     795           0 :                 break;
     796           0 :             case 'W':
     797           0 :                 dbgetpassword = 1;
     798           0 :                 break;
     799             : /* replication options */
     800           0 :             case 'I':
     801           0 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
     802           0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
     803           0 :                 startpos = ((uint64) hi) << 32 | lo;
     804           0 :                 break;
     805          16 :             case 'E':
     806          16 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
     807           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
     808          16 :                 endpos = ((uint64) hi) << 32 | lo;
     809          16 :                 break;
     810          80 :             case 'o':
     811             :                 {
     812          80 :                     char       *data = pg_strdup(optarg);
     813          80 :                     char       *val = strchr(data, '=');
     814             : 
     815          80 :                     if (val != NULL)
     816             :                     {
     817             :                         /* remove =; separate data from val */
     818          80 :                         *val = '\0';
     819          80 :                         val++;
     820             :                     }
     821             : 
     822          80 :                     noptions += 1;
     823          80 :                     options = pg_realloc(options, sizeof(char *) * noptions * 2);
     824             : 
     825          80 :                     options[(noptions - 1) * 2] = data;
     826          80 :                     options[(noptions - 1) * 2 + 1] = val;
     827             :                 }
     828             : 
     829          80 :                 break;
     830          50 :             case 'P':
     831          50 :                 plugin = pg_strdup(optarg);
     832          50 :                 break;
     833           2 :             case 's':
     834           2 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
     835             :                                       INT_MAX / 1000,
     836             :                                       &standby_message_timeout))
     837           0 :                     exit(1);
     838           2 :                 standby_message_timeout *= 1000;
     839           2 :                 break;
     840         120 :             case 'S':
     841         120 :                 replication_slot = pg_strdup(optarg);
     842         120 :                 break;
     843             : /* action */
     844          58 :             case 1:
     845          58 :                 do_create_slot = true;
     846          58 :                 break;
     847          52 :             case 2:
     848          52 :                 do_start_slot = true;
     849          52 :                 break;
     850           6 :             case 3:
     851           6 :                 do_drop_slot = true;
     852           6 :                 break;
     853           0 :             case 4:
     854           0 :                 slot_exists_ok = true;
     855           0 :                 break;
     856             : 
     857           2 :             default:
     858             :                 /* getopt_long already emitted a complaint */
     859           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     860           2 :                 exit(1);
     861             :         }
     862             :     }
     863             : 
     864             :     /*
     865             :      * Any non-option arguments?
     866             :      */
     867         122 :     if (optind < argc)
     868             :     {
     869           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     870             :                      argv[optind]);
     871           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     872           0 :         exit(1);
     873             :     }
     874             : 
     875             :     /*
     876             :      * Required arguments
     877             :      */
     878         122 :     if (replication_slot == NULL)
     879             :     {
     880           2 :         pg_log_error("no slot specified");
     881           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     882           2 :         exit(1);
     883             :     }
     884             : 
     885         120 :     if (do_start_slot && outfile == NULL)
     886             :     {
     887           2 :         pg_log_error("no target file specified");
     888           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     889           2 :         exit(1);
     890             :     }
     891             : 
     892         118 :     if (!do_drop_slot && dbname == NULL)
     893             :     {
     894           2 :         pg_log_error("no database specified");
     895           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     896           2 :         exit(1);
     897             :     }
     898             : 
     899         116 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
     900             :     {
     901           2 :         pg_log_error("at least one action needs to be specified");
     902           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     903           2 :         exit(1);
     904             :     }
     905             : 
     906         114 :     if (do_drop_slot && (do_create_slot || do_start_slot))
     907             :     {
     908           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
     909           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     910           0 :         exit(1);
     911             :     }
     912             : 
     913         114 :     if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
     914             :     {
     915           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
     916           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     917           0 :         exit(1);
     918             :     }
     919             : 
     920         114 :     if (XLogRecPtrIsValid(endpos) && !do_start_slot)
     921             :     {
     922           0 :         pg_log_error("--endpos may only be specified with --start");
     923           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     924           0 :         exit(1);
     925             :     }
     926             : 
     927         114 :     if (!do_create_slot)
     928             :     {
     929          56 :         if (two_phase)
     930             :         {
     931           2 :             pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
     932           2 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     933           2 :             exit(1);
     934             :         }
     935             : 
     936          54 :         if (failover)
     937             :         {
     938           0 :             pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
     939           0 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     940           0 :             exit(1);
     941             :         }
     942             :     }
     943             : 
     944             :     /*
     945             :      * Obtain a connection to server.  Notably, if we need a password, we want
     946             :      * to collect it from the user immediately.
     947             :      */
     948         112 :     conn = GetConnection();
     949         112 :     if (!conn)
     950             :         /* Error message already written in GetConnection() */
     951           0 :         exit(1);
     952         112 :     atexit(disconnect_atexit);
     953             : 
     954             :     /*
     955             :      * Trap signals.  (Don't do this until after the initial password prompt,
     956             :      * if one is needed, in GetConnection.)
     957             :      */
     958             : #ifndef WIN32
     959         112 :     pqsignal(SIGINT, sigexit_handler);
     960         112 :     pqsignal(SIGTERM, sigexit_handler);
     961         112 :     pqsignal(SIGHUP, sighup_handler);
     962             : #endif
     963             : 
     964             :     /*
     965             :      * Run IDENTIFY_SYSTEM to check the connection type for each action.
     966             :      * --create-slot and --start actions require a database-specific
     967             :      * replication connection because they handle logical replication slots.
     968             :      * --drop-slot can remove replication slots from any replication
     969             :      * connection without this restriction.
     970             :      */
     971         112 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     972           0 :         exit(1);
     973             : 
     974         112 :     if (!do_drop_slot && db_name == NULL)
     975           0 :         pg_fatal("could not establish database-specific replication connection");
     976             : 
     977             :     /*
     978             :      * Set umask so that directories/files are created with the same
     979             :      * permissions as directories/files in the source data directory.
     980             :      *
     981             :      * pg_mode_mask is set to owner-only by default and then updated in
     982             :      * GetConnection() where we get the mode from the server-side with
     983             :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     984             :      */
     985         112 :     umask(pg_mode_mask);
     986             : 
     987             :     /* Drop a replication slot. */
     988         112 :     if (do_drop_slot)
     989             :     {
     990           6 :         if (verbose)
     991           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     992             : 
     993           6 :         if (!DropReplicationSlot(conn, replication_slot))
     994           0 :             exit(1);
     995             :     }
     996             : 
     997             :     /* Create a replication slot. */
     998         112 :     if (do_create_slot)
     999             :     {
    1000          58 :         if (verbose)
    1001           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
    1002             : 
    1003          58 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
    1004             :                                    false, false, slot_exists_ok, two_phase,
    1005             :                                    failover))
    1006           0 :             exit(1);
    1007          58 :         startpos = InvalidXLogRecPtr;
    1008             :     }
    1009             : 
    1010         112 :     if (!do_start_slot)
    1011          64 :         exit(0);
    1012             : 
    1013             :     /* Stream loop */
    1014             :     while (true)
    1015             :     {
    1016          50 :         StreamLogicalLog();
    1017          50 :         if (time_to_abort)
    1018             :         {
    1019             :             /*
    1020             :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
    1021             :              * not an error, so exit without an errorcode.
    1022             :              */
    1023          18 :             exit(0);
    1024             :         }
    1025             : 
    1026             :         /*
    1027             :          * Ensure all written data is flushed to disk before exiting or
    1028             :          * starting a new replication.
    1029             :          */
    1030          32 :         if (outfd != -1)
    1031          18 :             OutputFsync(feGetCurrentTimestamp());
    1032             : 
    1033          32 :         if (noloop)
    1034             :         {
    1035          30 :             pg_fatal("disconnected");
    1036             :         }
    1037             :         else
    1038             :         {
    1039             :             /* translator: check source for value for %d */
    1040           2 :             pg_log_info("disconnected; waiting %d seconds to try again",
    1041             :                         RECONNECT_SLEEP_TIME);
    1042           2 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
    1043             :         }
    1044             :     }
    1045             : }
    1046             : 
    1047             : /*
    1048             :  * Fsync our output data, and send a feedback message to the server.  Returns
    1049             :  * true if successful, false otherwise.
    1050             :  *
    1051             :  * If successful, *now is updated to the current timestamp just before sending
    1052             :  * feedback.
    1053             :  */
    1054             : static bool
    1055          16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
    1056             : {
    1057             :     /* flush data to disk, so that we send a recent flush pointer */
    1058          16 :     OutputFsync(*now);
    1059          16 :     *now = feGetCurrentTimestamp();
    1060          16 :     if (!sendFeedback(conn, *now, true, false))
    1061           0 :         return false;
    1062             : 
    1063          16 :     return true;
    1064             : }
    1065             : 
    1066             : /*
    1067             :  * Try to inform the server about our upcoming demise, but don't wait around or
    1068             :  * retry on failure.
    1069             :  */
    1070             : static void
    1071          18 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
    1072             :                    XLogRecPtr lsn)
    1073             : {
    1074          18 :     (void) PQputCopyEnd(conn, NULL);
    1075          18 :     (void) PQflush(conn);
    1076             : 
    1077          18 :     if (verbose)
    1078             :     {
    1079           2 :         switch (reason)
    1080             :         {
    1081           2 :             case STREAM_STOP_SIGNAL:
    1082           2 :                 pg_log_info("received interrupt signal, exiting");
    1083           2 :                 break;
    1084           0 :             case STREAM_STOP_KEEPALIVE:
    1085           0 :                 pg_log_info("end position %X/%08X reached by keepalive",
    1086             :                             LSN_FORMAT_ARGS(endpos));
    1087           0 :                 break;
    1088           0 :             case STREAM_STOP_END_OF_WAL:
    1089             :                 Assert(XLogRecPtrIsValid(lsn));
    1090           0 :                 pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
    1091             :                             LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
    1092           0 :                 break;
    1093           0 :             case STREAM_STOP_NONE:
    1094             :                 Assert(false);
    1095           0 :                 break;
    1096             :         }
    1097             :     }
    1098          18 : }

Generated by: LCOV version 1.16