LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17beta1 Lines: 342 473 72.3 %
Date: 2024-05-29 11:10:47 Functions: 9 10 90.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
       4             :  *                    fashion and write it to a local file.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/bin/pg_basebackup/pg_recvlogical.c
      10             :  *-------------------------------------------------------------------------
      11             :  */
      12             : 
      13             : #include "postgres_fe.h"
      14             : 
      15             : #include <dirent.h>
      16             : #include <limits.h>
      17             : #include <sys/select.h>
      18             : #include <sys/stat.h>
      19             : #include <unistd.h>
      20             : 
      21             : #include "access/xlog_internal.h"
      22             : #include "common/fe_memutils.h"
      23             : #include "common/file_perm.h"
      24             : #include "common/logging.h"
      25             : #include "fe_utils/option_utils.h"
      26             : #include "getopt_long.h"
      27             : #include "libpq-fe.h"
      28             : #include "libpq/pqsignal.h"
      29             : #include "pqexpbuffer.h"
      30             : #include "streamutil.h"
      31             : 
      32             : /* Time to sleep between reconnection attempts */
      33             : #define RECONNECT_SLEEP_TIME 5
      34             : 
      35             : typedef enum
      36             : {
      37             :     STREAM_STOP_NONE,
      38             :     STREAM_STOP_END_OF_WAL,
      39             :     STREAM_STOP_KEEPALIVE,
      40             :     STREAM_STOP_SIGNAL
      41             : } StreamStopReason;
      42             : 
      43             : /* Global Options */
      44             : static char *outfile = NULL;
      45             : static int  verbose = 0;
      46             : static bool two_phase = false;
      47             : static int  noloop = 0;
      48             : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      49             : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
      50             : static XLogRecPtr startpos = InvalidXLogRecPtr;
      51             : static XLogRecPtr endpos = InvalidXLogRecPtr;
      52             : static bool do_create_slot = false;
      53             : static bool slot_exists_ok = false;
      54             : static bool do_start_slot = false;
      55             : static bool do_drop_slot = false;
      56             : static char *replication_slot = NULL;
      57             : 
      58             : /* filled pairwise with option, value. value may be NULL */
      59             : static char **options;
      60             : static size_t noptions = 0;
      61             : static const char *plugin = "test_decoding";
      62             : 
      63             : /* Global State */
      64             : static int  outfd = -1;
      65             : static volatile sig_atomic_t time_to_abort = false;
      66             : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
      67             : static volatile sig_atomic_t output_reopen = false;
      68             : static bool output_isfile;
      69             : static TimestampTz output_last_fsync = -1;
      70             : static bool output_needs_fsync = false;
      71             : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
      72             : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
      73             : 
      74             : static void usage(void);
      75             : static void StreamLogicalLog(void);
      76             : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
      77             : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
      78             :                                StreamStopReason reason,
      79             :                                XLogRecPtr lsn);
      80             : 
      81             : static void
      82           2 : usage(void)
      83             : {
      84           2 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
      85             :            progname);
      86           2 :     printf(_("Usage:\n"));
      87           2 :     printf(_("  %s [OPTION]...\n"), progname);
      88           2 :     printf(_("\nAction to be performed:\n"));
      89           2 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
      90           2 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
      91           2 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
      92           2 :     printf(_("\nOptions:\n"));
      93           2 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      94           2 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
      95           2 :     printf(_("  -F  --fsync-interval=SECS\n"
      96             :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
      97           2 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
      98           2 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
      99           2 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
     100           2 :     printf(_("  -o, --option=NAME[=VALUE]\n"
     101             :              "                         pass option NAME with optional value VALUE to the\n"
     102             :              "                         output plugin\n"));
     103           2 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
     104           2 :     printf(_("  -s, --status-interval=SECS\n"
     105             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
     106           2 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
     107           2 :     printf(_("  -t, --two-phase        enable decoding of prepared transactions when creating a slot\n"));
     108           2 :     printf(_("  -v, --verbose          output verbose messages\n"));
     109           2 :     printf(_("  -V, --version          output version information, then exit\n"));
     110           2 :     printf(_("  -?, --help             show this help, then exit\n"));
     111           2 :     printf(_("\nConnection options:\n"));
     112           2 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
     113           2 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
     114           2 :     printf(_("  -p, --port=PORT        database server port number\n"));
     115           2 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     116           2 :     printf(_("  -w, --no-password      never prompt for password\n"));
     117           2 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     118           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     119           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     120           2 : }
     121             : 
     122             : /*
     123             :  * Send a Standby Status Update message to server.
     124             :  */
     125             : static bool
     126          50 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
     127             : {
     128             :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
     129             :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
     130             : 
     131             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     132          50 :     int         len = 0;
     133             : 
     134             :     /*
     135             :      * we normally don't want to send superfluous feedback, but if it's
     136             :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
     137             :      * us.
     138             :      */
     139          50 :     if (!force &&
     140           0 :         last_written_lsn == output_written_lsn &&
     141           0 :         last_fsync_lsn == output_fsync_lsn)
     142           0 :         return true;
     143             : 
     144          50 :     if (verbose)
     145           0 :         pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
     146             :                     LSN_FORMAT_ARGS(output_written_lsn),
     147             :                     LSN_FORMAT_ARGS(output_fsync_lsn),
     148             :                     replication_slot);
     149             : 
     150          50 :     replybuf[len] = 'r';
     151          50 :     len += 1;
     152          50 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
     153          50 :     len += 8;
     154          50 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
     155          50 :     len += 8;
     156          50 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     157          50 :     len += 8;
     158          50 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     159          50 :     len += 8;
     160          50 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     161          50 :     len += 1;
     162             : 
     163          50 :     startpos = output_written_lsn;
     164          50 :     last_written_lsn = output_written_lsn;
     165          50 :     last_fsync_lsn = output_fsync_lsn;
     166             : 
     167          50 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     168             :     {
     169           0 :         pg_log_error("could not send feedback packet: %s",
     170             :                      PQerrorMessage(conn));
     171           0 :         return false;
     172             :     }
     173             : 
     174          50 :     return true;
     175             : }
     176             : 
     177             : static void
     178          94 : disconnect_atexit(void)
     179             : {
     180          94 :     if (conn != NULL)
     181          48 :         PQfinish(conn);
     182          94 : }
     183             : 
     184             : static bool
     185          50 : OutputFsync(TimestampTz now)
     186             : {
     187          50 :     output_last_fsync = now;
     188             : 
     189          50 :     output_fsync_lsn = output_written_lsn;
     190             : 
     191          50 :     if (fsync_interval <= 0)
     192           0 :         return true;
     193             : 
     194          50 :     if (!output_needs_fsync)
     195          38 :         return true;
     196             : 
     197          12 :     output_needs_fsync = false;
     198             : 
     199             :     /* can only fsync if it's a regular file */
     200          12 :     if (!output_isfile)
     201           8 :         return true;
     202             : 
     203           4 :     if (fsync(outfd) != 0)
     204           0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
     205             : 
     206           4 :     return true;
     207             : }
     208             : 
     209             : /*
     210             :  * Start the log streaming
     211             :  */
     212             : static void
     213          46 : StreamLogicalLog(void)
     214             : {
     215             :     PGresult   *res;
     216          46 :     char       *copybuf = NULL;
     217          46 :     TimestampTz last_status = -1;
     218             :     int         i;
     219             :     PQExpBuffer query;
     220             :     XLogRecPtr  cur_record_lsn;
     221             : 
     222          46 :     output_written_lsn = InvalidXLogRecPtr;
     223          46 :     output_fsync_lsn = InvalidXLogRecPtr;
     224          46 :     cur_record_lsn = InvalidXLogRecPtr;
     225             : 
     226             :     /*
     227             :      * Connect in replication mode to the server
     228             :      */
     229          46 :     if (!conn)
     230           0 :         conn = GetConnection();
     231          46 :     if (!conn)
     232             :         /* Error message already written in GetConnection() */
     233           0 :         return;
     234             : 
     235             :     /*
     236             :      * Start the replication
     237             :      */
     238          46 :     if (verbose)
     239           0 :         pg_log_info("starting log streaming at %X/%X (slot %s)",
     240             :                     LSN_FORMAT_ARGS(startpos),
     241             :                     replication_slot);
     242             : 
     243             :     /* Initiate the replication stream at specified location */
     244          46 :     query = createPQExpBuffer();
     245          46 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
     246          46 :                       replication_slot, LSN_FORMAT_ARGS(startpos));
     247             : 
     248             :     /* print options if there are any */
     249          46 :     if (noptions)
     250          40 :         appendPQExpBufferStr(query, " (");
     251             : 
     252         126 :     for (i = 0; i < noptions; i++)
     253             :     {
     254             :         /* separator */
     255          80 :         if (i > 0)
     256          40 :             appendPQExpBufferStr(query, ", ");
     257             : 
     258             :         /* write option name */
     259          80 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
     260             : 
     261             :         /* write option value if specified */
     262          80 :         if (options[(i * 2) + 1] != NULL)
     263          80 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
     264             :     }
     265             : 
     266          46 :     if (noptions)
     267          40 :         appendPQExpBufferChar(query, ')');
     268             : 
     269          46 :     res = PQexec(conn, query->data);
     270          46 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
     271             :     {
     272          12 :         pg_log_error("could not send replication command \"%s\": %s",
     273             :                      query->data, PQresultErrorMessage(res));
     274          12 :         PQclear(res);
     275          12 :         goto error;
     276             :     }
     277          34 :     PQclear(res);
     278          34 :     resetPQExpBuffer(query);
     279             : 
     280          34 :     if (verbose)
     281           0 :         pg_log_info("streaming initiated");
     282             : 
     283        1494 :     while (!time_to_abort)
     284             :     {
     285             :         int         r;
     286             :         int         bytes_left;
     287             :         int         bytes_written;
     288             :         TimestampTz now;
     289             :         int         hdr_len;
     290             : 
     291        1492 :         cur_record_lsn = InvalidXLogRecPtr;
     292             : 
     293        1492 :         if (copybuf != NULL)
     294             :         {
     295         792 :             PQfreemem(copybuf);
     296         792 :             copybuf = NULL;
     297             :         }
     298             : 
     299             :         /*
     300             :          * Potentially send a status message to the primary.
     301             :          */
     302        1492 :         now = feGetCurrentTimestamp();
     303             : 
     304        2950 :         if (outfd != -1 &&
     305        1458 :             feTimestampDifferenceExceeds(output_last_fsync, now,
     306             :                                          fsync_interval))
     307             :         {
     308          34 :             if (!OutputFsync(now))
     309           4 :                 goto error;
     310             :         }
     311             : 
     312        2984 :         if (standby_message_timeout > 0 &&
     313        1492 :             feTimestampDifferenceExceeds(last_status, now,
     314             :                                          standby_message_timeout))
     315             :         {
     316             :             /* Time to send feedback! */
     317          34 :             if (!sendFeedback(conn, now, true, false))
     318           0 :                 goto error;
     319             : 
     320          34 :             last_status = now;
     321             :         }
     322             : 
     323             :         /* got SIGHUP, close output file */
     324        1492 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
     325             :         {
     326           0 :             now = feGetCurrentTimestamp();
     327           0 :             if (!OutputFsync(now))
     328           0 :                 goto error;
     329           0 :             close(outfd);
     330           0 :             outfd = -1;
     331             :         }
     332        1492 :         output_reopen = false;
     333             : 
     334             :         /* open the output file, if not open yet */
     335        1492 :         if (outfd == -1)
     336             :         {
     337             :             struct stat statbuf;
     338             : 
     339          34 :             if (strcmp(outfile, "-") == 0)
     340          34 :                 outfd = fileno(stdout);
     341             :             else
     342           0 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
     343             :                              S_IRUSR | S_IWUSR);
     344          34 :             if (outfd == -1)
     345             :             {
     346           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
     347           0 :                 goto error;
     348             :             }
     349             : 
     350          34 :             if (fstat(outfd, &statbuf) != 0)
     351             :             {
     352           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
     353           0 :                 goto error;
     354             :             }
     355             : 
     356          34 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
     357             :         }
     358             : 
     359        1492 :         r = PQgetCopyData(conn, &copybuf, 1);
     360        1492 :         if (r == 0)
     361             :         {
     362             :             /*
     363             :              * In async mode, and no data available. We block on reading but
     364             :              * not more than the specified timeout, so that we can send a
     365             :              * response back to the client.
     366             :              */
     367             :             fd_set      input_mask;
     368         672 :             TimestampTz message_target = 0;
     369         672 :             TimestampTz fsync_target = 0;
     370             :             struct timeval timeout;
     371         672 :             struct timeval *timeoutptr = NULL;
     372             : 
     373         672 :             if (PQsocket(conn) < 0)
     374             :             {
     375           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     376           4 :                 goto error;
     377             :             }
     378             : 
     379         672 :             FD_ZERO(&input_mask);
     380         672 :             FD_SET(PQsocket(conn), &input_mask);
     381             : 
     382             :             /* Compute when we need to wakeup to send a keepalive message. */
     383         672 :             if (standby_message_timeout)
     384         672 :                 message_target = last_status + (standby_message_timeout - 1) *
     385             :                     ((int64) 1000);
     386             : 
     387             :             /* Compute when we need to wakeup to fsync the output file. */
     388         672 :             if (fsync_interval > 0 && output_needs_fsync)
     389         290 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
     390             :                     ((int64) 1000);
     391             : 
     392             :             /* Now compute when to wakeup. */
     393         672 :             if (message_target > 0 || fsync_target > 0)
     394             :             {
     395             :                 TimestampTz targettime;
     396             :                 long        secs;
     397             :                 int         usecs;
     398             : 
     399         672 :                 targettime = message_target;
     400             : 
     401         672 :                 if (fsync_target > 0 && fsync_target < targettime)
     402           0 :                     targettime = fsync_target;
     403             : 
     404         672 :                 feTimestampDifference(now,
     405             :                                       targettime,
     406             :                                       &secs,
     407             :                                       &usecs);
     408         672 :                 if (secs <= 0)
     409           0 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
     410             :                 else
     411         672 :                     timeout.tv_sec = secs;
     412         672 :                 timeout.tv_usec = usecs;
     413         672 :                 timeoutptr = &timeout;
     414             :             }
     415             : 
     416         672 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
     417         672 :             if (r == 0 || (r < 0 && errno == EINTR))
     418             :             {
     419             :                 /*
     420             :                  * Got a timeout or signal. Continue the loop and either
     421             :                  * deliver a status packet to the server or just go back into
     422             :                  * blocking.
     423             :                  */
     424         668 :                 continue;
     425             :             }
     426         670 :             else if (r < 0)
     427             :             {
     428           0 :                 pg_log_error("%s() failed: %m", "select");
     429           0 :                 goto error;
     430             :             }
     431             : 
     432             :             /* Else there is actually data on the socket */
     433         670 :             if (PQconsumeInput(conn) == 0)
     434             :             {
     435           4 :                 pg_log_error("could not receive data from WAL stream: %s",
     436             :                              PQerrorMessage(conn));
     437           4 :                 goto error;
     438             :             }
     439         666 :             continue;
     440             :         }
     441             : 
     442             :         /* End of copy stream */
     443         820 :         if (r == -1)
     444          28 :             break;
     445             : 
     446             :         /* Failure while reading the copy stream */
     447         806 :         if (r == -2)
     448             :         {
     449           0 :             pg_log_error("could not read COPY data: %s",
     450             :                          PQerrorMessage(conn));
     451           0 :             goto error;
     452             :         }
     453             : 
     454             :         /* Check the message type. */
     455         806 :         if (copybuf[0] == 'k')
     456             :         {
     457             :             int         pos;
     458             :             bool        replyRequested;
     459             :             XLogRecPtr  walEnd;
     460         624 :             bool        endposReached = false;
     461             : 
     462             :             /*
     463             :              * Parse the keepalive message, enclosed in the CopyData message.
     464             :              * We just check if the server requested a reply, and ignore the
     465             :              * rest.
     466             :              */
     467         624 :             pos = 1;            /* skip msgtype 'k' */
     468         624 :             walEnd = fe_recvint64(&copybuf[pos]);
     469         624 :             output_written_lsn = Max(walEnd, output_written_lsn);
     470             : 
     471         624 :             pos += 8;           /* read walEnd */
     472             : 
     473         624 :             pos += 8;           /* skip sendTime */
     474             : 
     475         624 :             if (r < pos + 1)
     476             :             {
     477           0 :                 pg_log_error("streaming header too small: %d", r);
     478           0 :                 goto error;
     479             :             }
     480         624 :             replyRequested = copybuf[pos];
     481             : 
     482         624 :             if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
     483             :             {
     484             :                 /*
     485             :                  * If there's nothing to read on the socket until a keepalive
     486             :                  * we know that the server has nothing to send us; and if
     487             :                  * walEnd has passed endpos, we know nothing else can have
     488             :                  * committed before endpos.  So we can bail out now.
     489             :                  */
     490           4 :                 endposReached = true;
     491             :             }
     492             : 
     493             :             /* Send a reply, if necessary */
     494         624 :             if (replyRequested || endposReached)
     495             :             {
     496           6 :                 if (!flushAndSendFeedback(conn, &now))
     497           0 :                     goto error;
     498           6 :                 last_status = now;
     499             :             }
     500             : 
     501         624 :             if (endposReached)
     502             :             {
     503           4 :                 stop_reason = STREAM_STOP_KEEPALIVE;
     504           4 :                 time_to_abort = true;
     505           4 :                 break;
     506             :             }
     507             : 
     508         620 :             continue;
     509             :         }
     510         182 :         else if (copybuf[0] != 'w')
     511             :         {
     512           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
     513             :                          copybuf[0]);
     514           0 :             goto error;
     515             :         }
     516             : 
     517             :         /*
     518             :          * Read the header of the XLogData message, enclosed in the CopyData
     519             :          * message. We only need the WAL location field (dataStart), the rest
     520             :          * of the header is ignored.
     521             :          */
     522         182 :         hdr_len = 1;            /* msgtype 'w' */
     523         182 :         hdr_len += 8;           /* dataStart */
     524         182 :         hdr_len += 8;           /* walEnd */
     525         182 :         hdr_len += 8;           /* sendTime */
     526         182 :         if (r < hdr_len + 1)
     527             :         {
     528           0 :             pg_log_error("streaming header too small: %d", r);
     529           0 :             goto error;
     530             :         }
     531             : 
     532             :         /* Extract WAL location for this block */
     533         182 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
     534             : 
     535         182 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
     536             :         {
     537             :             /*
     538             :              * We've read past our endpoint, so prepare to go away being
     539             :              * cautious about what happens to our output data.
     540             :              */
     541           0 :             if (!flushAndSendFeedback(conn, &now))
     542           0 :                 goto error;
     543           0 :             stop_reason = STREAM_STOP_END_OF_WAL;
     544           0 :             time_to_abort = true;
     545           0 :             break;
     546             :         }
     547             : 
     548         182 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
     549             : 
     550         182 :         bytes_left = r - hdr_len;
     551         182 :         bytes_written = 0;
     552             : 
     553             :         /* signal that a fsync is needed */
     554         182 :         output_needs_fsync = true;
     555             : 
     556         364 :         while (bytes_left)
     557             :         {
     558             :             int         ret;
     559             : 
     560         364 :             ret = write(outfd,
     561         182 :                         copybuf + hdr_len + bytes_written,
     562             :                         bytes_left);
     563             : 
     564         182 :             if (ret < 0)
     565             :             {
     566           0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
     567             :                              bytes_left, outfile);
     568           0 :                 goto error;
     569             :             }
     570             : 
     571             :             /* Write was successful, advance our position */
     572         182 :             bytes_written += ret;
     573         182 :             bytes_left -= ret;
     574             :         }
     575             : 
     576         182 :         if (write(outfd, "\n", 1) != 1)
     577             :         {
     578           0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
     579             :                          1, outfile);
     580           0 :             goto error;
     581             :         }
     582             : 
     583         182 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
     584             :         {
     585             :             /* endpos was exactly the record we just processed, we're done */
     586          10 :             if (!flushAndSendFeedback(conn, &now))
     587           0 :                 goto error;
     588          10 :             stop_reason = STREAM_STOP_END_OF_WAL;
     589          10 :             time_to_abort = true;
     590          10 :             break;
     591             :         }
     592             :     }
     593             : 
     594             :     /* Clean up connection state if stream has been aborted */
     595          30 :     if (time_to_abort)
     596          16 :         prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
     597             : 
     598          30 :     res = PQgetResult(conn);
     599          30 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
     600             :     {
     601          16 :         PQclear(res);
     602             : 
     603             :         /*
     604             :          * We're doing a client-initiated clean exit and have sent CopyDone to
     605             :          * the server. Drain any messages, so we don't miss a last-minute
     606             :          * ErrorResponse. The walsender stops generating XLogData records once
     607             :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
     608             :          * it's too late for sendFeedback(), even if this were to take a long
     609             :          * time. Hence, use synchronous-mode PQgetCopyData().
     610             :          */
     611             :         while (1)
     612           4 :         {
     613             :             int         r;
     614             : 
     615          20 :             if (copybuf != NULL)
     616             :             {
     617          18 :                 PQfreemem(copybuf);
     618          18 :                 copybuf = NULL;
     619             :             }
     620          20 :             r = PQgetCopyData(conn, &copybuf, 0);
     621          20 :             if (r == -1)
     622          16 :                 break;
     623           4 :             if (r == -2)
     624             :             {
     625           0 :                 pg_log_error("could not read COPY data: %s",
     626             :                              PQerrorMessage(conn));
     627           0 :                 time_to_abort = false;  /* unclean exit */
     628           0 :                 goto error;
     629             :             }
     630             :         }
     631             : 
     632          16 :         res = PQgetResult(conn);
     633             :     }
     634          30 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     635             :     {
     636          12 :         pg_log_error("unexpected termination of replication stream: %s",
     637             :                      PQresultErrorMessage(res));
     638          12 :         goto error;
     639             :     }
     640          18 :     PQclear(res);
     641             : 
     642          18 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
     643             :     {
     644           0 :         TimestampTz t = feGetCurrentTimestamp();
     645             : 
     646             :         /* no need to jump to error on failure here, we're finishing anyway */
     647           0 :         OutputFsync(t);
     648             : 
     649           0 :         if (close(outfd) != 0)
     650           0 :             pg_log_error("could not close file \"%s\": %m", outfile);
     651             :     }
     652          18 :     outfd = -1;
     653          46 : error:
     654          46 :     if (copybuf != NULL)
     655             :     {
     656           0 :         PQfreemem(copybuf);
     657           0 :         copybuf = NULL;
     658             :     }
     659          46 :     destroyPQExpBuffer(query);
     660          46 :     PQfinish(conn);
     661          46 :     conn = NULL;
     662             : }
     663             : 
     664             : /*
     665             :  * Unfortunately we can't do sensible signal handling on windows...
     666             :  */
     667             : #ifndef WIN32
     668             : 
     669             : /*
     670             :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
     671             :  * possible moment.
     672             :  */
     673             : static void
     674           2 : sigexit_handler(SIGNAL_ARGS)
     675             : {
     676           2 :     stop_reason = STREAM_STOP_SIGNAL;
     677           2 :     time_to_abort = true;
     678           2 : }
     679             : 
     680             : /*
     681             :  * Trigger the output file to be reopened.
     682             :  */
     683             : static void
     684           0 : sighup_handler(SIGNAL_ARGS)
     685             : {
     686           0 :     output_reopen = true;
     687           0 : }
     688             : #endif
     689             : 
     690             : 
     691             : int
     692         110 : main(int argc, char **argv)
     693             : {
     694             :     static struct option long_options[] = {
     695             : /* general options */
     696             :         {"file", required_argument, NULL, 'f'},
     697             :         {"fsync-interval", required_argument, NULL, 'F'},
     698             :         {"no-loop", no_argument, NULL, 'n'},
     699             :         {"verbose", no_argument, NULL, 'v'},
     700             :         {"two-phase", no_argument, NULL, 't'},
     701             :         {"version", no_argument, NULL, 'V'},
     702             :         {"help", no_argument, NULL, '?'},
     703             : /* connection options */
     704             :         {"dbname", required_argument, NULL, 'd'},
     705             :         {"host", required_argument, NULL, 'h'},
     706             :         {"port", required_argument, NULL, 'p'},
     707             :         {"username", required_argument, NULL, 'U'},
     708             :         {"no-password", no_argument, NULL, 'w'},
     709             :         {"password", no_argument, NULL, 'W'},
     710             : /* replication options */
     711             :         {"startpos", required_argument, NULL, 'I'},
     712             :         {"endpos", required_argument, NULL, 'E'},
     713             :         {"option", required_argument, NULL, 'o'},
     714             :         {"plugin", required_argument, NULL, 'P'},
     715             :         {"status-interval", required_argument, NULL, 's'},
     716             :         {"slot", required_argument, NULL, 'S'},
     717             : /* action */
     718             :         {"create-slot", no_argument, NULL, 1},
     719             :         {"start", no_argument, NULL, 2},
     720             :         {"drop-slot", no_argument, NULL, 3},
     721             :         {"if-not-exists", no_argument, NULL, 4},
     722             :         {NULL, 0, NULL, 0}
     723             :     };
     724             :     int         c;
     725             :     int         option_index;
     726             :     uint32      hi,
     727             :                 lo;
     728             :     char       *db_name;
     729             : 
     730         110 :     pg_logging_init(argv[0]);
     731         110 :     progname = get_progname(argv[0]);
     732         110 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     733             : 
     734         110 :     if (argc > 1)
     735             :     {
     736         108 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     737             :         {
     738           2 :             usage();
     739           2 :             exit(0);
     740             :         }
     741         106 :         else if (strcmp(argv[1], "-V") == 0 ||
     742         106 :                  strcmp(argv[1], "--version") == 0)
     743             :         {
     744           2 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
     745           2 :             exit(0);
     746             :         }
     747             :     }
     748             : 
     749         642 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
     750             :                             long_options, &option_index)) != -1)
     751             :     {
     752         538 :         switch (c)
     753             :         {
     754             : /* general options */
     755          48 :             case 'f':
     756          48 :                 outfile = pg_strdup(optarg);
     757          48 :                 break;
     758           0 :             case 'F':
     759           0 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
     760             :                                       INT_MAX / 1000,
     761             :                                       &fsync_interval))
     762           0 :                     exit(1);
     763           0 :                 fsync_interval *= 1000;
     764           0 :                 break;
     765          46 :             case 'n':
     766          46 :                 noloop = 1;
     767          46 :                 break;
     768           4 :             case 't':
     769           4 :                 two_phase = true;
     770           4 :                 break;
     771           0 :             case 'v':
     772           0 :                 verbose++;
     773           0 :                 break;
     774             : /* connection options */
     775         100 :             case 'd':
     776         100 :                 dbname = pg_strdup(optarg);
     777         100 :                 break;
     778           0 :             case 'h':
     779           0 :                 dbhost = pg_strdup(optarg);
     780           0 :                 break;
     781           0 :             case 'p':
     782           0 :                 dbport = pg_strdup(optarg);
     783           0 :                 break;
     784           0 :             case 'U':
     785           0 :                 dbuser = pg_strdup(optarg);
     786           0 :                 break;
     787           0 :             case 'w':
     788           0 :                 dbgetpassword = -1;
     789           0 :                 break;
     790           0 :             case 'W':
     791           0 :                 dbgetpassword = 1;
     792           0 :                 break;
     793             : /* replication options */
     794           0 :             case 'I':
     795           0 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     796           0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
     797           0 :                 startpos = ((uint64) hi) << 32 | lo;
     798           0 :                 break;
     799          16 :             case 'E':
     800          16 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     801           0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
     802          16 :                 endpos = ((uint64) hi) << 32 | lo;
     803          16 :                 break;
     804          80 :             case 'o':
     805             :                 {
     806          80 :                     char       *data = pg_strdup(optarg);
     807          80 :                     char       *val = strchr(data, '=');
     808             : 
     809          80 :                     if (val != NULL)
     810             :                     {
     811             :                         /* remove =; separate data from val */
     812          80 :                         *val = '\0';
     813          80 :                         val++;
     814             :                     }
     815             : 
     816          80 :                     noptions += 1;
     817          80 :                     options = pg_realloc(options, sizeof(char *) * noptions * 2);
     818             : 
     819          80 :                     options[(noptions - 1) * 2] = data;
     820          80 :                     options[(noptions - 1) * 2 + 1] = val;
     821             :                 }
     822             : 
     823          80 :                 break;
     824          42 :             case 'P':
     825          42 :                 plugin = pg_strdup(optarg);
     826          42 :                 break;
     827           0 :             case 's':
     828           0 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
     829             :                                       INT_MAX / 1000,
     830             :                                       &standby_message_timeout))
     831           0 :                     exit(1);
     832           0 :                 standby_message_timeout *= 1000;
     833           0 :                 break;
     834         102 :             case 'S':
     835         102 :                 replication_slot = pg_strdup(optarg);
     836         102 :                 break;
     837             : /* action */
     838          46 :             case 1:
     839          46 :                 do_create_slot = true;
     840          46 :                 break;
     841          50 :             case 2:
     842          50 :                 do_start_slot = true;
     843          50 :                 break;
     844           2 :             case 3:
     845           2 :                 do_drop_slot = true;
     846           2 :                 break;
     847           0 :             case 4:
     848           0 :                 slot_exists_ok = true;
     849           0 :                 break;
     850             : 
     851           2 :             default:
     852             :                 /* getopt_long already emitted a complaint */
     853           2 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     854           2 :                 exit(1);
     855             :         }
     856             :     }
     857             : 
     858             :     /*
     859             :      * Any non-option arguments?
     860             :      */
     861         104 :     if (optind < argc)
     862             :     {
     863           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     864             :                      argv[optind]);
     865           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     866           0 :         exit(1);
     867             :     }
     868             : 
     869             :     /*
     870             :      * Required arguments
     871             :      */
     872         104 :     if (replication_slot == NULL)
     873             :     {
     874           2 :         pg_log_error("no slot specified");
     875           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     876           2 :         exit(1);
     877             :     }
     878             : 
     879         102 :     if (do_start_slot && outfile == NULL)
     880             :     {
     881           2 :         pg_log_error("no target file specified");
     882           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     883           2 :         exit(1);
     884             :     }
     885             : 
     886         100 :     if (!do_drop_slot && dbname == NULL)
     887             :     {
     888           2 :         pg_log_error("no database specified");
     889           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     890           2 :         exit(1);
     891             :     }
     892             : 
     893          98 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
     894             :     {
     895           2 :         pg_log_error("at least one action needs to be specified");
     896           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     897           2 :         exit(1);
     898             :     }
     899             : 
     900          96 :     if (do_drop_slot && (do_create_slot || do_start_slot))
     901             :     {
     902           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
     903           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     904           0 :         exit(1);
     905             :     }
     906             : 
     907          96 :     if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
     908             :     {
     909           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
     910           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     911           0 :         exit(1);
     912             :     }
     913             : 
     914          96 :     if (endpos != InvalidXLogRecPtr && !do_start_slot)
     915             :     {
     916           0 :         pg_log_error("--endpos may only be specified with --start");
     917           0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     918           0 :         exit(1);
     919             :     }
     920             : 
     921          96 :     if (two_phase && !do_create_slot)
     922             :     {
     923           2 :         pg_log_error("--two-phase may only be specified with --create-slot");
     924           2 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     925           2 :         exit(1);
     926             :     }
     927             : 
     928             :     /*
     929             :      * Obtain a connection to server.  Notably, if we need a password, we want
     930             :      * to collect it from the user immediately.
     931             :      */
     932          94 :     conn = GetConnection();
     933          94 :     if (!conn)
     934             :         /* Error message already written in GetConnection() */
     935           0 :         exit(1);
     936          94 :     atexit(disconnect_atexit);
     937             : 
     938             :     /*
     939             :      * Trap signals.  (Don't do this until after the initial password prompt,
     940             :      * if one is needed, in GetConnection.)
     941             :      */
     942             : #ifndef WIN32
     943          94 :     pqsignal(SIGINT, sigexit_handler);
     944          94 :     pqsignal(SIGTERM, sigexit_handler);
     945          94 :     pqsignal(SIGHUP, sighup_handler);
     946             : #endif
     947             : 
     948             :     /*
     949             :      * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
     950             :      * replication connection.
     951             :      */
     952          94 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     953           0 :         exit(1);
     954             : 
     955          94 :     if (db_name == NULL)
     956           0 :         pg_fatal("could not establish database-specific replication connection");
     957             : 
     958             :     /*
     959             :      * Set umask so that directories/files are created with the same
     960             :      * permissions as directories/files in the source data directory.
     961             :      *
     962             :      * pg_mode_mask is set to owner-only by default and then updated in
     963             :      * GetConnection() where we get the mode from the server-side with
     964             :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     965             :      */
     966          94 :     umask(pg_mode_mask);
     967             : 
     968             :     /* Drop a replication slot. */
     969          94 :     if (do_drop_slot)
     970             :     {
     971           2 :         if (verbose)
     972           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     973             : 
     974           2 :         if (!DropReplicationSlot(conn, replication_slot))
     975           0 :             exit(1);
     976             :     }
     977             : 
     978             :     /* Create a replication slot. */
     979          94 :     if (do_create_slot)
     980             :     {
     981          46 :         if (verbose)
     982           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
     983             : 
     984          46 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
     985             :                                    false, false, slot_exists_ok, two_phase))
     986           0 :             exit(1);
     987          46 :         startpos = InvalidXLogRecPtr;
     988             :     }
     989             : 
     990          94 :     if (!do_start_slot)
     991          48 :         exit(0);
     992             : 
     993             :     /* Stream loop */
     994             :     while (true)
     995             :     {
     996          46 :         StreamLogicalLog();
     997          46 :         if (time_to_abort)
     998             :         {
     999             :             /*
    1000             :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
    1001             :              * not an error, so exit without an errorcode.
    1002             :              */
    1003          16 :             exit(0);
    1004             :         }
    1005          30 :         else if (noloop)
    1006          30 :             pg_fatal("disconnected");
    1007             :         else
    1008             :         {
    1009             :             /* translator: check source for value for %d */
    1010           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
    1011             :                         RECONNECT_SLEEP_TIME);
    1012           0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
    1013             :         }
    1014             :     }
    1015             : }
    1016             : 
    1017             : /*
    1018             :  * Fsync our output data, and send a feedback message to the server.  Returns
    1019             :  * true if successful, false otherwise.
    1020             :  *
    1021             :  * If successful, *now is updated to the current timestamp just before sending
    1022             :  * feedback.
    1023             :  */
    1024             : static bool
    1025          16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
    1026             : {
    1027             :     /* flush data to disk, so that we send a recent flush pointer */
    1028          16 :     if (!OutputFsync(*now))
    1029           0 :         return false;
    1030          16 :     *now = feGetCurrentTimestamp();
    1031          16 :     if (!sendFeedback(conn, *now, true, false))
    1032           0 :         return false;
    1033             : 
    1034          16 :     return true;
    1035             : }
    1036             : 
    1037             : /*
    1038             :  * Try to inform the server about our upcoming demise, but don't wait around or
    1039             :  * retry on failure.
    1040             :  */
    1041             : static void
    1042          16 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
    1043             :                    XLogRecPtr lsn)
    1044             : {
    1045          16 :     (void) PQputCopyEnd(conn, NULL);
    1046          16 :     (void) PQflush(conn);
    1047             : 
    1048          16 :     if (verbose)
    1049             :     {
    1050           0 :         switch (reason)
    1051             :         {
    1052           0 :             case STREAM_STOP_SIGNAL:
    1053           0 :                 pg_log_info("received interrupt signal, exiting");
    1054           0 :                 break;
    1055           0 :             case STREAM_STOP_KEEPALIVE:
    1056           0 :                 pg_log_info("end position %X/%X reached by keepalive",
    1057             :                             LSN_FORMAT_ARGS(endpos));
    1058           0 :                 break;
    1059           0 :             case STREAM_STOP_END_OF_WAL:
    1060             :                 Assert(!XLogRecPtrIsInvalid(lsn));
    1061           0 :                 pg_log_info("end position %X/%X reached by WAL record at %X/%X",
    1062             :                             LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
    1063           0 :                 break;
    1064           0 :             case STREAM_STOP_NONE:
    1065             :                 Assert(false);
    1066           0 :                 break;
    1067             :         }
    1068          16 :     }
    1069          16 : }

Generated by: LCOV version 1.14