LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 16beta1 Lines: 337 459 73.4 %
Date: 2023-05-31 00:12:04 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
       4             :  *                    fashion and write it to a local file.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/bin/pg_basebackup/pg_recvlogical.c
      10             :  *-------------------------------------------------------------------------
      11             :  */
      12             : 
      13             : #include "postgres_fe.h"
      14             : 
      15             : #include <dirent.h>
      16             : #include <limits.h>
      17             : #include <sys/select.h>
      18             : #include <sys/stat.h>
      19             : #include <unistd.h>
      20             : 
      21             : #include "access/xlog_internal.h"
      22             : #include "common/fe_memutils.h"
      23             : #include "common/file_perm.h"
      24             : #include "common/logging.h"
      25             : #include "fe_utils/option_utils.h"
      26             : #include "getopt_long.h"
      27             : #include "libpq-fe.h"
      28             : #include "libpq/pqsignal.h"
      29             : #include "pqexpbuffer.h"
      30             : #include "streamutil.h"
      31             : 
      32             : /* Time to sleep between reconnection attempts */
      33             : #define RECONNECT_SLEEP_TIME 5
      34             : 
      35             : /* Global Options */
      36             : static char *outfile = NULL;
      37             : static int  verbose = 0;
      38             : static bool two_phase = false;
      39             : static int  noloop = 0;
      40             : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      41             : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
      42             : static XLogRecPtr startpos = InvalidXLogRecPtr;
      43             : static XLogRecPtr endpos = InvalidXLogRecPtr;
      44             : static bool do_create_slot = false;
      45             : static bool slot_exists_ok = false;
      46             : static bool do_start_slot = false;
      47             : static bool do_drop_slot = false;
      48             : static char *replication_slot = NULL;
      49             : 
      50             : /* filled pairwise with option, value. value may be NULL */
      51             : static char **options;
      52             : static size_t noptions = 0;
      53             : static const char *plugin = "test_decoding";
      54             : 
      55             : /* Global State */
      56             : static int  outfd = -1;
      57             : static volatile sig_atomic_t time_to_abort = false;
      58             : static volatile sig_atomic_t output_reopen = false;
      59             : static bool output_isfile;
      60             : static TimestampTz output_last_fsync = -1;
      61             : static bool output_needs_fsync = false;
      62             : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
      63             : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
      64             : 
      65             : static void usage(void);
      66             : static void StreamLogicalLog(void);
      67             : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
      68             : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
      69             :                                bool keepalive, XLogRecPtr lsn);
      70             : 
      71             : static void
      72           2 : usage(void)
      73             : {
      74           2 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
      75             :            progname);
      76           2 :     printf(_("Usage:\n"));
      77           2 :     printf(_("  %s [OPTION]...\n"), progname);
      78           2 :     printf(_("\nAction to be performed:\n"));
      79           2 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
      80           2 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
      81           2 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
      82           2 :     printf(_("\nOptions:\n"));
      83           2 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      84           2 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
      85           2 :     printf(_("  -F  --fsync-interval=SECS\n"
      86             :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
      87           2 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
      88           2 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
      89           2 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
      90           2 :     printf(_("  -o, --option=NAME[=VALUE]\n"
      91             :              "                         pass option NAME with optional value VALUE to the\n"
      92             :              "                         output plugin\n"));
      93           2 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
      94           2 :     printf(_("  -s, --status-interval=SECS\n"
      95             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
      96           2 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
      97           2 :     printf(_("  -t, --two-phase        enable decoding of prepared transactions when creating a slot\n"));
      98           2 :     printf(_("  -v, --verbose          output verbose messages\n"));
      99           2 :     printf(_("  -V, --version          output version information, then exit\n"));
     100           2 :     printf(_("  -?, --help             show this help, then exit\n"));
     101           2 :     printf(_("\nConnection options:\n"));
     102           2 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
     103           2 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
     104           2 :     printf(_("  -p, --port=PORT        database server port number\n"));
     105           2 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     106           2 :     printf(_("  -w, --no-password      never prompt for password\n"));
     107           2 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     108           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     109           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     110           2 : }
     111             : 
     112             : /*
     113             :  * Send a Standby Status Update message to server.
     114             :  */
     115             : static bool
     116          50 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
     117             : {
     118             :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
     119             :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
     120             : 
     121             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     122          50 :     int         len = 0;
     123             : 
     124             :     /*
     125             :      * we normally don't want to send superfluous feedback, but if it's
     126             :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
     127             :      * us.
     128             :      */
     129          50 :     if (!force &&
     130           0 :         last_written_lsn == output_written_lsn &&
     131           0 :         last_fsync_lsn == output_fsync_lsn)
     132           0 :         return true;
     133             : 
     134          50 :     if (verbose)
     135           0 :         pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
     136             :                     LSN_FORMAT_ARGS(output_written_lsn),
     137             :                     LSN_FORMAT_ARGS(output_fsync_lsn),
     138             :                     replication_slot);
     139             : 
     140          50 :     replybuf[len] = 'r';
     141          50 :     len += 1;
     142          50 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
     143          50 :     len += 8;
     144          50 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
     145          50 :     len += 8;
     146          50 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     147          50 :     len += 8;
     148          50 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     149          50 :     len += 8;
     150          50 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     151          50 :     len += 1;
     152             : 
     153          50 :     startpos = output_written_lsn;
     154          50 :     last_written_lsn = output_written_lsn;
     155          50 :     last_fsync_lsn = output_fsync_lsn;
     156             : 
     157          50 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     158             :     {
     159           0 :         pg_log_error("could not send feedback packet: %s",
     160             :                      PQerrorMessage(conn));
     161           0 :         return false;
     162             :     }
     163             : 
     164          50 :     return true;
     165             : }
     166             : 
     167             : static void
     168          94 : disconnect_atexit(void)
     169             : {
     170          94 :     if (conn != NULL)
     171          48 :         PQfinish(conn);
     172          94 : }
     173             : 
     174             : static bool
     175          50 : OutputFsync(TimestampTz now)
     176             : {
     177          50 :     output_last_fsync = now;
     178             : 
     179          50 :     output_fsync_lsn = output_written_lsn;
     180             : 
     181          50 :     if (fsync_interval <= 0)
     182           0 :         return true;
     183             : 
     184          50 :     if (!output_needs_fsync)
     185          38 :         return true;
     186             : 
     187          12 :     output_needs_fsync = false;
     188             : 
     189             :     /* can only fsync if it's a regular file */
     190          12 :     if (!output_isfile)
     191           8 :         return true;
     192             : 
     193           4 :     if (fsync(outfd) != 0)
     194           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
     195             : 
     196           4 :     return true;
     197             : }
     198             : 
     199             : /*
     200             :  * Start the log streaming
     201             :  */
     202             : static void
     203          46 : StreamLogicalLog(void)
     204             : {
     205             :     PGresult   *res;
     206          46 :     char       *copybuf = NULL;
     207          46 :     TimestampTz last_status = -1;
     208             :     int         i;
     209             :     PQExpBuffer query;
     210             : 
     211          46 :     output_written_lsn = InvalidXLogRecPtr;
     212          46 :     output_fsync_lsn = InvalidXLogRecPtr;
     213             : 
     214             :     /*
     215             :      * Connect in replication mode to the server
     216             :      */
     217          46 :     if (!conn)
     218           0 :         conn = GetConnection();
     219          46 :     if (!conn)
     220             :         /* Error message already written in GetConnection() */
     221           0 :         return;
     222             : 
     223             :     /*
     224             :      * Start the replication
     225             :      */
     226          46 :     if (verbose)
     227           0 :         pg_log_info("starting log streaming at %X/%X (slot %s)",
     228             :                     LSN_FORMAT_ARGS(startpos),
     229             :                     replication_slot);
     230             : 
     231             :     /* Initiate the replication stream at specified location */
     232          46 :     query = createPQExpBuffer();
     233          46 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
     234          46 :                       replication_slot, LSN_FORMAT_ARGS(startpos));
     235             : 
     236             :     /* print options if there are any */
     237          46 :     if (noptions)
     238          40 :         appendPQExpBufferStr(query, " (");
     239             : 
     240         126 :     for (i = 0; i < noptions; i++)
     241             :     {
     242             :         /* separator */
     243          80 :         if (i > 0)
     244          40 :             appendPQExpBufferStr(query, ", ");
     245             : 
     246             :         /* write option name */
     247          80 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
     248             : 
     249             :         /* write option value if specified */
     250          80 :         if (options[(i * 2) + 1] != NULL)
     251          80 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
     252             :     }
     253             : 
     254          46 :     if (noptions)
     255          40 :         appendPQExpBufferChar(query, ')');
     256             : 
     257          46 :     res = PQexec(conn, query->data);
     258          46 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
     259             :     {
     260          12 :         pg_log_error("could not send replication command \"%s\": %s",
     261             :                      query->data, PQresultErrorMessage(res));
     262          12 :         PQclear(res);
     263          12 :         goto error;
     264             :     }
     265          34 :     PQclear(res);
     266          34 :     resetPQExpBuffer(query);
     267             : 
     268          34 :     if (verbose)
     269           0 :         pg_log_info("streaming initiated");
     270             : 
     271        1718 :     while (!time_to_abort)
     272             :     {
     273             :         int         r;
     274             :         int         bytes_left;
     275             :         int         bytes_written;
     276             :         TimestampTz now;
     277             :         int         hdr_len;
     278        1716 :         XLogRecPtr  cur_record_lsn = InvalidXLogRecPtr;
     279             : 
     280        1716 :         if (copybuf != NULL)
     281             :         {
     282         892 :             PQfreemem(copybuf);
     283         892 :             copybuf = NULL;
     284             :         }
     285             : 
     286             :         /*
     287             :          * Potentially send a status message to the primary.
     288             :          */
     289        1716 :         now = feGetCurrentTimestamp();
     290             : 
     291        3398 :         if (outfd != -1 &&
     292        1682 :             feTimestampDifferenceExceeds(output_last_fsync, now,
     293             :                                          fsync_interval))
     294             :         {
     295          34 :             if (!OutputFsync(now))
     296           4 :                 goto error;
     297             :         }
     298             : 
     299        3432 :         if (standby_message_timeout > 0 &&
     300        1716 :             feTimestampDifferenceExceeds(last_status, now,
     301             :                                          standby_message_timeout))
     302             :         {
     303             :             /* Time to send feedback! */
     304          34 :             if (!sendFeedback(conn, now, true, false))
     305           0 :                 goto error;
     306             : 
     307          34 :             last_status = now;
     308             :         }
     309             : 
     310             :         /* got SIGHUP, close output file */
     311        1716 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
     312             :         {
     313           0 :             now = feGetCurrentTimestamp();
     314           0 :             if (!OutputFsync(now))
     315           0 :                 goto error;
     316           0 :             close(outfd);
     317           0 :             outfd = -1;
     318             :         }
     319        1716 :         output_reopen = false;
     320             : 
     321             :         /* open the output file, if not open yet */
     322        1716 :         if (outfd == -1)
     323             :         {
     324             :             struct stat statbuf;
     325             : 
     326          34 :             if (strcmp(outfile, "-") == 0)
     327          34 :                 outfd = fileno(stdout);
     328             :             else
     329           0 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
     330             :                              S_IRUSR | S_IWUSR);
     331          34 :             if (outfd == -1)
     332             :             {
     333           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
     334           0 :                 goto error;
     335             :             }
     336             : 
     337          34 :             if (fstat(outfd, &statbuf) != 0)
     338             :             {
     339           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
     340           0 :                 goto error;
     341             :             }
     342             : 
     343          34 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
     344             :         }
     345             : 
     346        1716 :         r = PQgetCopyData(conn, &copybuf, 1);
     347        1716 :         if (r == 0)
     348             :         {
     349             :             /*
     350             :              * In async mode, and no data available. We block on reading but
     351             :              * not more than the specified timeout, so that we can send a
     352             :              * response back to the client.
     353             :              */
     354             :             fd_set      input_mask;
     355         796 :             TimestampTz message_target = 0;
     356         796 :             TimestampTz fsync_target = 0;
     357             :             struct timeval timeout;
     358         796 :             struct timeval *timeoutptr = NULL;
     359             : 
     360         796 :             if (PQsocket(conn) < 0)
     361             :             {
     362           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     363           4 :                 goto error;
     364             :             }
     365             : 
     366         796 :             FD_ZERO(&input_mask);
     367         796 :             FD_SET(PQsocket(conn), &input_mask);
     368             : 
     369             :             /* Compute when we need to wakeup to send a keepalive message. */
     370         796 :             if (standby_message_timeout)
     371         796 :                 message_target = last_status + (standby_message_timeout - 1) *
     372             :                     ((int64) 1000);
     373             : 
     374             :             /* Compute when we need to wakeup to fsync the output file. */
     375         796 :             if (fsync_interval > 0 && output_needs_fsync)
     376         182 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
     377             :                     ((int64) 1000);
     378             : 
     379             :             /* Now compute when to wakeup. */
     380         796 :             if (message_target > 0 || fsync_target > 0)
     381             :             {
     382             :                 TimestampTz targettime;
     383             :                 long        secs;
     384             :                 int         usecs;
     385             : 
     386         796 :                 targettime = message_target;
     387             : 
     388         796 :                 if (fsync_target > 0 && fsync_target < targettime)
     389           0 :                     targettime = fsync_target;
     390             : 
     391         796 :                 feTimestampDifference(now,
     392             :                                       targettime,
     393             :                                       &secs,
     394             :                                       &usecs);
     395         796 :                 if (secs <= 0)
     396           0 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
     397             :                 else
     398         796 :                     timeout.tv_sec = secs;
     399         796 :                 timeout.tv_usec = usecs;
     400         796 :                 timeoutptr = &timeout;
     401             :             }
     402             : 
     403         796 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
     404         796 :             if (r == 0 || (r < 0 && errno == EINTR))
     405             :             {
     406             :                 /*
     407             :                  * Got a timeout or signal. Continue the loop and either
     408             :                  * deliver a status packet to the server or just go back into
     409             :                  * blocking.
     410             :                  */
     411         792 :                 continue;
     412             :             }
     413         794 :             else if (r < 0)
     414             :             {
     415           0 :                 pg_log_error("%s() failed: %m", "select");
     416           0 :                 goto error;
     417             :             }
     418             : 
     419             :             /* Else there is actually data on the socket */
     420         794 :             if (PQconsumeInput(conn) == 0)
     421             :             {
     422           4 :                 pg_log_error("could not receive data from WAL stream: %s",
     423             :                              PQerrorMessage(conn));
     424           4 :                 goto error;
     425             :             }
     426         790 :             continue;
     427             :         }
     428             : 
     429             :         /* End of copy stream */
     430         920 :         if (r == -1)
     431          28 :             break;
     432             : 
     433             :         /* Failure while reading the copy stream */
     434         906 :         if (r == -2)
     435             :         {
     436           0 :             pg_log_error("could not read COPY data: %s",
     437             :                          PQerrorMessage(conn));
     438           0 :             goto error;
     439             :         }
     440             : 
     441             :         /* Check the message type. */
     442         906 :         if (copybuf[0] == 'k')
     443             :         {
     444             :             int         pos;
     445             :             bool        replyRequested;
     446             :             XLogRecPtr  walEnd;
     447         730 :             bool        endposReached = false;
     448             : 
     449             :             /*
     450             :              * Parse the keepalive message, enclosed in the CopyData message.
     451             :              * We just check if the server requested a reply, and ignore the
     452             :              * rest.
     453             :              */
     454         730 :             pos = 1;            /* skip msgtype 'k' */
     455         730 :             walEnd = fe_recvint64(&copybuf[pos]);
     456         730 :             output_written_lsn = Max(walEnd, output_written_lsn);
     457             : 
     458         730 :             pos += 8;           /* read walEnd */
     459             : 
     460         730 :             pos += 8;           /* skip sendTime */
     461             : 
     462         730 :             if (r < pos + 1)
     463             :             {
     464           0 :                 pg_log_error("streaming header too small: %d", r);
     465           0 :                 goto error;
     466             :             }
     467         730 :             replyRequested = copybuf[pos];
     468             : 
     469         730 :             if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
     470             :             {
     471             :                 /*
     472             :                  * If there's nothing to read on the socket until a keepalive
     473             :                  * we know that the server has nothing to send us; and if
     474             :                  * walEnd has passed endpos, we know nothing else can have
     475             :                  * committed before endpos.  So we can bail out now.
     476             :                  */
     477           4 :                 endposReached = true;
     478             :             }
     479             : 
     480             :             /* Send a reply, if necessary */
     481         730 :             if (replyRequested || endposReached)
     482             :             {
     483           6 :                 if (!flushAndSendFeedback(conn, &now))
     484           0 :                     goto error;
     485           6 :                 last_status = now;
     486             :             }
     487             : 
     488         730 :             if (endposReached)
     489             :             {
     490           4 :                 prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
     491           4 :                 time_to_abort = true;
     492           4 :                 break;
     493             :             }
     494             : 
     495         726 :             continue;
     496             :         }
     497         176 :         else if (copybuf[0] != 'w')
     498             :         {
     499           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
     500             :                          copybuf[0]);
     501           0 :             goto error;
     502             :         }
     503             : 
     504             :         /*
     505             :          * Read the header of the XLogData message, enclosed in the CopyData
     506             :          * message. We only need the WAL location field (dataStart), the rest
     507             :          * of the header is ignored.
     508             :          */
     509         176 :         hdr_len = 1;            /* msgtype 'w' */
     510         176 :         hdr_len += 8;           /* dataStart */
     511         176 :         hdr_len += 8;           /* walEnd */
     512         176 :         hdr_len += 8;           /* sendTime */
     513         176 :         if (r < hdr_len + 1)
     514             :         {
     515           0 :             pg_log_error("streaming header too small: %d", r);
     516           0 :             goto error;
     517             :         }
     518             : 
     519             :         /* Extract WAL location for this block */
     520         176 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
     521             : 
     522         176 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
     523             :         {
     524             :             /*
     525             :              * We've read past our endpoint, so prepare to go away being
     526             :              * cautious about what happens to our output data.
     527             :              */
     528           0 :             if (!flushAndSendFeedback(conn, &now))
     529           0 :                 goto error;
     530           0 :             prepareToTerminate(conn, endpos, false, cur_record_lsn);
     531           0 :             time_to_abort = true;
     532           0 :             break;
     533             :         }
     534             : 
     535         176 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
     536             : 
     537         176 :         bytes_left = r - hdr_len;
     538         176 :         bytes_written = 0;
     539             : 
     540             :         /* signal that a fsync is needed */
     541         176 :         output_needs_fsync = true;
     542             : 
     543         352 :         while (bytes_left)
     544             :         {
     545             :             int         ret;
     546             : 
     547         352 :             ret = write(outfd,
     548         176 :                         copybuf + hdr_len + bytes_written,
     549             :                         bytes_left);
     550             : 
     551         176 :             if (ret < 0)
     552             :             {
     553           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
     554             :                              bytes_left, outfile);
     555           0 :                 goto error;
     556             :             }
     557             : 
     558             :             /* Write was successful, advance our position */
     559         176 :             bytes_written += ret;
     560         176 :             bytes_left -= ret;
     561             :         }
     562             : 
     563         176 :         if (write(outfd, "\n", 1) != 1)
     564             :         {
     565           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
     566             :                          1, outfile);
     567           0 :             goto error;
     568             :         }
     569             : 
     570         176 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
     571             :         {
     572             :             /* endpos was exactly the record we just processed, we're done */
     573          10 :             if (!flushAndSendFeedback(conn, &now))
     574           0 :                 goto error;
     575          10 :             prepareToTerminate(conn, endpos, false, cur_record_lsn);
     576          10 :             time_to_abort = true;
     577          10 :             break;
     578             :         }
     579             :     }
     580             : 
     581          30 :     res = PQgetResult(conn);
     582          30 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
     583             :     {
     584          14 :         PQclear(res);
     585             : 
     586             :         /*
     587             :          * We're doing a client-initiated clean exit and have sent CopyDone to
     588             :          * the server. Drain any messages, so we don't miss a last-minute
     589             :          * ErrorResponse. The walsender stops generating XLogData records once
     590             :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
     591             :          * it's too late for sendFeedback(), even if this were to take a long
     592             :          * time. Hence, use synchronous-mode PQgetCopyData().
     593             :          */
     594             :         while (1)
     595         102 :         {
     596             :             int         r;
     597             : 
     598         116 :             if (copybuf != NULL)
     599             :             {
     600         116 :                 PQfreemem(copybuf);
     601         116 :                 copybuf = NULL;
     602             :             }
     603         116 :             r = PQgetCopyData(conn, &copybuf, 0);
     604         116 :             if (r == -1)
     605          14 :                 break;
     606         102 :             if (r == -2)
     607             :             {
     608           0 :                 pg_log_error("could not read COPY data: %s",
     609             :                              PQerrorMessage(conn));
     610           0 :                 time_to_abort = false;  /* unclean exit */
     611           0 :                 goto error;
     612             :             }
     613             :         }
     614             : 
     615          14 :         res = PQgetResult(conn);
     616             :     }
     617          30 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     618             :     {
     619          14 :         pg_log_error("unexpected termination of replication stream: %s",
     620             :                      PQresultErrorMessage(res));
     621          14 :         goto error;
     622             :     }
     623          16 :     PQclear(res);
     624             : 
     625          16 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
     626             :     {
     627           0 :         TimestampTz t = feGetCurrentTimestamp();
     628             : 
     629             :         /* no need to jump to error on failure here, we're finishing anyway */
     630           0 :         OutputFsync(t);
     631             : 
     632           0 :         if (close(outfd) != 0)
     633           0 :             pg_log_error("could not close file \"%s\": %m", outfile);
     634             :     }
     635          16 :     outfd = -1;
     636          46 : error:
     637          46 :     if (copybuf != NULL)
     638             :     {
     639           0 :         PQfreemem(copybuf);
     640           0 :         copybuf = NULL;
     641             :     }
     642          46 :     destroyPQExpBuffer(query);
     643          46 :     PQfinish(conn);
     644          46 :     conn = NULL;
     645             : }
     646             : 
     647             : /*
     648             :  * Unfortunately we can't do sensible signal handling on windows...
     649             :  */
     650             : #ifndef WIN32
     651             : 
     652             : /*
     653             :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
     654             :  * possible moment.
     655             :  */
     656             : static void
     657           2 : sigexit_handler(SIGNAL_ARGS)
     658             : {
     659           2 :     time_to_abort = true;
     660           2 : }
     661             : 
     662             : /*
     663             :  * Trigger the output file to be reopened.
     664             :  */
     665             : static void
     666           0 : sighup_handler(SIGNAL_ARGS)
     667             : {
     668           0 :     output_reopen = true;
     669           0 : }
     670             : #endif
     671             : 
     672             : 
     673             : int
     674         110 : main(int argc, char **argv)
     675             : {
     676             :     static struct option long_options[] = {
     677             : /* general options */
     678             :         {"file", required_argument, NULL, 'f'},
     679             :         {"fsync-interval", required_argument, NULL, 'F'},
     680             :         {"no-loop", no_argument, NULL, 'n'},
     681             :         {"verbose", no_argument, NULL, 'v'},
     682             :         {"two-phase", no_argument, NULL, 't'},
     683             :         {"version", no_argument, NULL, 'V'},
     684             :         {"help", no_argument, NULL, '?'},
     685             : /* connection options */
     686             :         {"dbname", required_argument, NULL, 'd'},
     687             :         {"host", required_argument, NULL, 'h'},
     688             :         {"port", required_argument, NULL, 'p'},
     689             :         {"username", required_argument, NULL, 'U'},
     690             :         {"no-password", no_argument, NULL, 'w'},
     691             :         {"password", no_argument, NULL, 'W'},
     692             : /* replication options */
     693             :         {"startpos", required_argument, NULL, 'I'},
     694             :         {"endpos", required_argument, NULL, 'E'},
     695             :         {"option", required_argument, NULL, 'o'},
     696             :         {"plugin", required_argument, NULL, 'P'},
     697             :         {"status-interval", required_argument, NULL, 's'},
     698             :         {"slot", required_argument, NULL, 'S'},
     699             : /* action */
     700             :         {"create-slot", no_argument, NULL, 1},
     701             :         {"start", no_argument, NULL, 2},
     702             :         {"drop-slot", no_argument, NULL, 3},
     703             :         {"if-not-exists", no_argument, NULL, 4},
     704             :         {NULL, 0, NULL, 0}
     705             :     };
     706             :     int         c;
     707             :     int         option_index;
     708             :     uint32      hi,
     709             :                 lo;
     710             :     char       *db_name;
     711             : 
     712         110 :     pg_logging_init(argv[0]);
     713         110 :     progname = get_progname(argv[0]);
     714         110 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     715             : 
     716         110 :     if (argc > 1)
     717             :     {
     718         108 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     719             :         {
     720           2 :             usage();
     721           2 :             exit(0);
     722             :         }
     723         106 :         else if (strcmp(argv[1], "-V") == 0 ||
     724         106 :                  strcmp(argv[1], "--version") == 0)
     725             :         {
     726           2 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
     727           2 :             exit(0);
     728             :         }
     729             :     }
     730             : 
     731         642 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
     732             :                             long_options, &option_index)) != -1)
     733             :     {
     734         538 :         switch (c)
     735             :         {
     736             : /* general options */
     737          48 :             case 'f':
     738          48 :                 outfile = pg_strdup(optarg);
     739          48 :                 break;
     740           0 :             case 'F':
     741           0 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
     742             :                                       INT_MAX / 1000,
     743             :                                       &fsync_interval))
     744           0 :                     exit(1);
     745           0 :                 fsync_interval *= 1000;
     746           0 :                 break;
     747          46 :             case 'n':
     748          46 :                 noloop = 1;
     749          46 :                 break;
     750           4 :             case 't':
     751           4 :                 two_phase = true;
     752           4 :                 break;
     753           0 :             case 'v':
     754           0 :                 verbose++;
     755           0 :                 break;
     756             : /* connection options */
     757         100 :             case 'd':
     758         100 :                 dbname = pg_strdup(optarg);
     759         100 :                 break;
     760           0 :             case 'h':
     761           0 :                 dbhost = pg_strdup(optarg);
     762           0 :                 break;
     763           0 :             case 'p':
     764           0 :                 dbport = pg_strdup(optarg);
     765           0 :                 break;
     766           0 :             case 'U':
     767           0 :                 dbuser = pg_strdup(optarg);
     768           0 :                 break;
     769           0 :             case 'w':
     770           0 :                 dbgetpassword = -1;
     771           0 :                 break;
     772           0 :             case 'W':
     773           0 :                 dbgetpassword = 1;
     774           0 :                 break;
     775             : /* replication options */
     776           0 :             case 'I':
     777           0 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     778           0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
     779           0 :                 startpos = ((uint64) hi) << 32 | lo;
     780           0 :                 break;
     781          16 :             case 'E':
     782          16 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     783           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
     784          16 :                 endpos = ((uint64) hi) << 32 | lo;
     785          16 :                 break;
     786          80 :             case 'o':
     787             :                 {
     788          80 :                     char       *data = pg_strdup(optarg);
     789          80 :                     char       *val = strchr(data, '=');
     790             : 
     791          80 :                     if (val != NULL)
     792             :                     {
     793             :                         /* remove =; separate data from val */
     794          80 :                         *val = '\0';
     795          80 :                         val++;
     796             :                     }
     797             : 
     798          80 :                     noptions += 1;
     799          80 :                     options = pg_realloc(options, sizeof(char *) * noptions * 2);
     800             : 
     801          80 :                     options[(noptions - 1) * 2] = data;
     802          80 :                     options[(noptions - 1) * 2 + 1] = val;
     803             :                 }
     804             : 
     805          80 :                 break;
     806          42 :             case 'P':
     807          42 :                 plugin = pg_strdup(optarg);
     808          42 :                 break;
     809           0 :             case 's':
     810           0 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
     811             :                                       INT_MAX / 1000,
     812             :                                       &standby_message_timeout))
     813           0 :                     exit(1);
     814           0 :                 standby_message_timeout *= 1000;
     815           0 :                 break;
     816         102 :             case 'S':
     817         102 :                 replication_slot = pg_strdup(optarg);
     818         102 :                 break;
     819             : /* action */
     820          46 :             case 1:
     821          46 :                 do_create_slot = true;
     822          46 :                 break;
     823          50 :             case 2:
     824          50 :                 do_start_slot = true;
     825          50 :                 break;
     826           2 :             case 3:
     827           2 :                 do_drop_slot = true;
     828           2 :                 break;
     829           0 :             case 4:
     830           0 :                 slot_exists_ok = true;
     831           0 :                 break;
     832             : 
     833           2 :             default:
     834             :                 /* getopt_long already emitted a complaint */
     835           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     836           2 :                 exit(1);
     837             :         }
     838             :     }
     839             : 
     840             :     /*
     841             :      * Any non-option arguments?
     842             :      */
     843         104 :     if (optind < argc)
     844             :     {
     845           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     846             :                      argv[optind]);
     847           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     848           0 :         exit(1);
     849             :     }
     850             : 
     851             :     /*
     852             :      * Required arguments
     853             :      */
     854         104 :     if (replication_slot == NULL)
     855             :     {
     856           2 :         pg_log_error("no slot specified");
     857           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     858           2 :         exit(1);
     859             :     }
     860             : 
     861         102 :     if (do_start_slot && outfile == NULL)
     862             :     {
     863           2 :         pg_log_error("no target file specified");
     864           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     865           2 :         exit(1);
     866             :     }
     867             : 
     868         100 :     if (!do_drop_slot && dbname == NULL)
     869             :     {
     870           2 :         pg_log_error("no database specified");
     871           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     872           2 :         exit(1);
     873             :     }
     874             : 
     875          98 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
     876             :     {
     877           2 :         pg_log_error("at least one action needs to be specified");
     878           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     879           2 :         exit(1);
     880             :     }
     881             : 
     882          96 :     if (do_drop_slot && (do_create_slot || do_start_slot))
     883             :     {
     884           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
     885           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     886           0 :         exit(1);
     887             :     }
     888             : 
     889          96 :     if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
     890             :     {
     891           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
     892           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     893           0 :         exit(1);
     894             :     }
     895             : 
     896          96 :     if (endpos != InvalidXLogRecPtr && !do_start_slot)
     897             :     {
     898           0 :         pg_log_error("--endpos may only be specified with --start");
     899           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     900           0 :         exit(1);
     901             :     }
     902             : 
     903          96 :     if (two_phase && !do_create_slot)
     904             :     {
     905           2 :         pg_log_error("--two-phase may only be specified with --create-slot");
     906           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     907           2 :         exit(1);
     908             :     }
     909             : 
     910             :     /*
     911             :      * Obtain a connection to server.  Notably, if we need a password, we want
     912             :      * to collect it from the user immediately.
     913             :      */
     914          94 :     conn = GetConnection();
     915          94 :     if (!conn)
     916             :         /* Error message already written in GetConnection() */
     917           0 :         exit(1);
     918          94 :     atexit(disconnect_atexit);
     919             : 
     920             :     /*
     921             :      * Trap signals.  (Don't do this until after the initial password prompt,
     922             :      * if one is needed, in GetConnection.)
     923             :      */
     924             : #ifndef WIN32
     925          94 :     pqsignal(SIGINT, sigexit_handler);
     926          94 :     pqsignal(SIGTERM, sigexit_handler);
     927          94 :     pqsignal(SIGHUP, sighup_handler);
     928             : #endif
     929             : 
     930             :     /*
     931             :      * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
     932             :      * replication connection.
     933             :      */
     934          94 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     935           0 :         exit(1);
     936             : 
     937          94 :     if (db_name == NULL)
     938           0 :         pg_fatal("could not establish database-specific replication connection");
     939             : 
     940             :     /*
     941             :      * Set umask so that directories/files are created with the same
     942             :      * permissions as directories/files in the source data directory.
     943             :      *
     944             :      * pg_mode_mask is set to owner-only by default and then updated in
     945             :      * GetConnection() where we get the mode from the server-side with
     946             :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     947             :      */
     948          94 :     umask(pg_mode_mask);
     949             : 
     950             :     /* Drop a replication slot. */
     951          94 :     if (do_drop_slot)
     952             :     {
     953           2 :         if (verbose)
     954           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     955             : 
     956           2 :         if (!DropReplicationSlot(conn, replication_slot))
     957           0 :             exit(1);
     958             :     }
     959             : 
     960             :     /* Create a replication slot. */
     961          94 :     if (do_create_slot)
     962             :     {
     963          46 :         if (verbose)
     964           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
     965             : 
     966          46 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
     967             :                                    false, false, slot_exists_ok, two_phase))
     968           0 :             exit(1);
     969          46 :         startpos = InvalidXLogRecPtr;
     970             :     }
     971             : 
     972          94 :     if (!do_start_slot)
     973          48 :         exit(0);
     974             : 
     975             :     /* Stream loop */
     976             :     while (true)
     977             :     {
     978          46 :         StreamLogicalLog();
     979          46 :         if (time_to_abort)
     980             :         {
     981             :             /*
     982             :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
     983             :              * not an error, so exit without an errorcode.
     984             :              */
     985          16 :             exit(0);
     986             :         }
     987          30 :         else if (noloop)
     988          30 :             pg_fatal("disconnected");
     989             :         else
     990             :         {
     991             :             /* translator: check source for value for %d */
     992           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
     993             :                         RECONNECT_SLEEP_TIME);
     994           0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
     995             :         }
     996             :     }
     997             : }
     998             : 
     999             : /*
    1000             :  * Fsync our output data, and send a feedback message to the server.  Returns
    1001             :  * true if successful, false otherwise.
    1002             :  *
    1003             :  * If successful, *now is updated to the current timestamp just before sending
    1004             :  * feedback.
    1005             :  */
    1006             : static bool
    1007          16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
    1008             : {
    1009             :     /* flush data to disk, so that we send a recent flush pointer */
    1010          16 :     if (!OutputFsync(*now))
    1011           0 :         return false;
    1012          16 :     *now = feGetCurrentTimestamp();
    1013          16 :     if (!sendFeedback(conn, *now, true, false))
    1014           0 :         return false;
    1015             : 
    1016          16 :     return true;
    1017             : }
    1018             : 
    1019             : /*
    1020             :  * Try to inform the server about our upcoming demise, but don't wait around or
    1021             :  * retry on failure.
    1022             :  */
    1023             : static void
    1024          14 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
    1025             : {
    1026          14 :     (void) PQputCopyEnd(conn, NULL);
    1027          14 :     (void) PQflush(conn);
    1028             : 
    1029          14 :     if (verbose)
    1030             :     {
    1031           0 :         if (keepalive)
    1032           0 :             pg_log_info("end position %X/%X reached by keepalive",
    1033             :                         LSN_FORMAT_ARGS(endpos));
    1034             :         else
    1035           0 :             pg_log_info("end position %X/%X reached by WAL record at %X/%X",
    1036             :                         LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
    1037             :     }
    1038          14 : }

Generated by: LCOV version 1.14