LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 304 459 66.2 %
Date: 2020-05-29 01:06:25 Functions: 8 10 80.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
       4             :  *                    fashion and write it to a local file.
       5             :  *
       6             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        src/bin/pg_basebackup/pg_recvlogical.c
      10             :  *-------------------------------------------------------------------------
      11             :  */
      12             : 
      13             : #include "postgres_fe.h"
      14             : 
      15             : #include <dirent.h>
      16             : #include <sys/stat.h>
      17             : #include <unistd.h>
      18             : #ifdef HAVE_SYS_SELECT_H
      19             : #include <sys/select.h>
      20             : #endif
      21             : 
      22             : #include "access/xlog_internal.h"
      23             : #include "common/fe_memutils.h"
      24             : #include "common/file_perm.h"
      25             : #include "common/logging.h"
      26             : #include "getopt_long.h"
      27             : #include "libpq-fe.h"
      28             : #include "libpq/pqsignal.h"
      29             : #include "pqexpbuffer.h"
      30             : #include "streamutil.h"
      31             : 
      32             : /* Time to sleep between reconnection attempts */
      33             : #define RECONNECT_SLEEP_TIME 5
      34             : 
      35             : /* Global Options */
      36             : static char *outfile = NULL;
      37             : static int  verbose = 0;
      38             : static int  noloop = 0;
      39             : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      40             : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
      41             : static XLogRecPtr startpos = InvalidXLogRecPtr;
      42             : static XLogRecPtr endpos = InvalidXLogRecPtr;
      43             : static bool do_create_slot = false;
      44             : static bool slot_exists_ok = false;
      45             : static bool do_start_slot = false;
      46             : static bool do_drop_slot = false;
      47             : static char *replication_slot = NULL;
      48             : 
      49             : /* filled pairwise with option, value. value may be NULL */
      50             : static char **options;
      51             : static size_t noptions = 0;
      52             : static const char *plugin = "test_decoding";
      53             : 
      54             : /* Global State */
      55             : static int  outfd = -1;
      56             : static volatile sig_atomic_t time_to_abort = false;
      57             : static volatile sig_atomic_t output_reopen = false;
      58             : static bool output_isfile;
      59             : static TimestampTz output_last_fsync = -1;
      60             : static bool output_needs_fsync = false;
      61             : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
      62             : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
      63             : 
      64             : static void usage(void);
      65             : static void StreamLogicalLog(void);
      66             : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
      67             : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
      68             :                                bool keepalive, XLogRecPtr lsn);
      69             : 
      70             : static void
      71           2 : usage(void)
      72             : {
      73           2 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
      74             :            progname);
      75           2 :     printf(_("Usage:\n"));
      76           2 :     printf(_("  %s [OPTION]...\n"), progname);
      77           2 :     printf(_("\nAction to be performed:\n"));
      78           2 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
      79           2 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
      80           2 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
      81           2 :     printf(_("\nOptions:\n"));
      82           2 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      83           2 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
      84           2 :     printf(_("  -F  --fsync-interval=SECS\n"
      85             :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
      86           2 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
      87           2 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
      88           2 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
      89           2 :     printf(_("  -o, --option=NAME[=VALUE]\n"
      90             :              "                         pass option NAME with optional value VALUE to the\n"
      91             :              "                         output plugin\n"));
      92           2 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
      93           2 :     printf(_("  -s, --status-interval=SECS\n"
      94             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
      95           2 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
      96           2 :     printf(_("  -v, --verbose          output verbose messages\n"));
      97           2 :     printf(_("  -V, --version          output version information, then exit\n"));
      98           2 :     printf(_("  -?, --help             show this help, then exit\n"));
      99           2 :     printf(_("\nConnection options:\n"));
     100           2 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
     101           2 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
     102           2 :     printf(_("  -p, --port=PORT        database server port number\n"));
     103           2 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     104           2 :     printf(_("  -w, --no-password      never prompt for password\n"));
     105           2 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     106           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     107           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     108           2 : }
     109             : 
     110             : /*
     111             :  * Send a Standby Status Update message to server.
     112             :  */
     113             : static bool
     114          16 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
     115             : {
     116             :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
     117             :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
     118             : 
     119             :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     120          16 :     int         len = 0;
     121             : 
     122             :     /*
     123             :      * we normally don't want to send superfluous feedback, but if it's
     124             :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
     125             :      * us.
     126             :      */
     127          16 :     if (!force &&
     128           0 :         last_written_lsn == output_written_lsn &&
     129           0 :         last_fsync_lsn == output_fsync_lsn)
     130           0 :         return true;
     131             : 
     132          16 :     if (verbose)
     133           0 :         pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
     134             :                     (uint32) (output_written_lsn >> 32), (uint32) output_written_lsn,
     135             :                     (uint32) (output_fsync_lsn >> 32), (uint32) output_fsync_lsn,
     136             :                     replication_slot);
     137             : 
     138          16 :     replybuf[len] = 'r';
     139          16 :     len += 1;
     140          16 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
     141          16 :     len += 8;
     142          16 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
     143          16 :     len += 8;
     144          16 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     145          16 :     len += 8;
     146          16 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     147          16 :     len += 8;
     148          16 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     149          16 :     len += 1;
     150             : 
     151          16 :     startpos = output_written_lsn;
     152          16 :     last_written_lsn = output_written_lsn;
     153          16 :     last_fsync_lsn = output_fsync_lsn;
     154             : 
     155          16 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     156             :     {
     157           0 :         pg_log_error("could not send feedback packet: %s",
     158             :                      PQerrorMessage(conn));
     159           0 :         return false;
     160             :     }
     161             : 
     162          16 :     return true;
     163             : }
     164             : 
     165             : static void
     166          10 : disconnect_atexit(void)
     167             : {
     168          10 :     if (conn != NULL)
     169           2 :         PQfinish(conn);
     170          10 : }
     171             : 
     172             : static bool
     173          16 : OutputFsync(TimestampTz now)
     174             : {
     175          16 :     output_last_fsync = now;
     176             : 
     177          16 :     output_fsync_lsn = output_written_lsn;
     178             : 
     179          16 :     if (fsync_interval <= 0)
     180           0 :         return true;
     181             : 
     182          16 :     if (!output_needs_fsync)
     183          10 :         return true;
     184             : 
     185           6 :     output_needs_fsync = false;
     186             : 
     187             :     /* can only fsync if it's a regular file */
     188           6 :     if (!output_isfile)
     189           4 :         return true;
     190             : 
     191           2 :     if (fsync(outfd) != 0)
     192             :     {
     193           0 :         pg_log_fatal("could not fsync file \"%s\": %m", outfile);
     194           0 :         exit(1);
     195             :     }
     196             : 
     197           2 :     return true;
     198             : }
     199             : 
     200             : /*
     201             :  * Start the log streaming
     202             :  */
     203             : static void
     204           8 : StreamLogicalLog(void)
     205             : {
     206             :     PGresult   *res;
     207           8 :     char       *copybuf = NULL;
     208           8 :     TimestampTz last_status = -1;
     209             :     int         i;
     210             :     PQExpBuffer query;
     211             : 
     212           8 :     output_written_lsn = InvalidXLogRecPtr;
     213           8 :     output_fsync_lsn = InvalidXLogRecPtr;
     214             : 
     215           8 :     query = createPQExpBuffer();
     216             : 
     217             :     /*
     218             :      * Connect in replication mode to the server
     219             :      */
     220           8 :     if (!conn)
     221           0 :         conn = GetConnection();
     222           8 :     if (!conn)
     223             :         /* Error message already written in GetConnection() */
     224           0 :         return;
     225             : 
     226             :     /*
     227             :      * Start the replication
     228             :      */
     229           8 :     if (verbose)
     230           0 :         pg_log_info("starting log streaming at %X/%X (slot %s)",
     231             :                     (uint32) (startpos >> 32), (uint32) startpos,
     232             :                     replication_slot);
     233             : 
     234             :     /* Initiate the replication stream at specified location */
     235          16 :     appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
     236           8 :                       replication_slot, (uint32) (startpos >> 32), (uint32) startpos);
     237             : 
     238             :     /* print options if there are any */
     239           8 :     if (noptions)
     240           6 :         appendPQExpBufferStr(query, " (");
     241             : 
     242          20 :     for (i = 0; i < noptions; i++)
     243             :     {
     244             :         /* separator */
     245          12 :         if (i > 0)
     246           6 :             appendPQExpBufferStr(query, ", ");
     247             : 
     248             :         /* write option name */
     249          12 :         appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
     250             : 
     251             :         /* write option value if specified */
     252          12 :         if (options[(i * 2) + 1] != NULL)
     253          12 :             appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
     254             :     }
     255             : 
     256           8 :     if (noptions)
     257           6 :         appendPQExpBufferChar(query, ')');
     258             : 
     259           8 :     res = PQexec(conn, query->data);
     260           8 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
     261             :     {
     262           0 :         pg_log_error("could not send replication command \"%s\": %s",
     263             :                      query->data, PQresultErrorMessage(res));
     264           0 :         PQclear(res);
     265           0 :         goto error;
     266             :     }
     267           8 :     PQclear(res);
     268           8 :     resetPQExpBuffer(query);
     269             : 
     270           8 :     if (verbose)
     271           0 :         pg_log_info("streaming initiated");
     272             : 
     273         124 :     while (!time_to_abort)
     274             :     {
     275             :         int         r;
     276             :         int         bytes_left;
     277             :         int         bytes_written;
     278             :         TimestampTz now;
     279             :         int         hdr_len;
     280         124 :         XLogRecPtr  cur_record_lsn = InvalidXLogRecPtr;
     281             : 
     282         124 :         if (copybuf != NULL)
     283             :         {
     284          58 :             PQfreemem(copybuf);
     285          58 :             copybuf = NULL;
     286             :         }
     287             : 
     288             :         /*
     289             :          * Potentially send a status message to the master
     290             :          */
     291         124 :         now = feGetCurrentTimestamp();
     292             : 
     293         240 :         if (outfd != -1 &&
     294         116 :             feTimestampDifferenceExceeds(output_last_fsync, now,
     295             :                                          fsync_interval))
     296             :         {
     297           8 :             if (!OutputFsync(now))
     298           0 :                 goto error;
     299             :         }
     300             : 
     301         248 :         if (standby_message_timeout > 0 &&
     302         124 :             feTimestampDifferenceExceeds(last_status, now,
     303             :                                          standby_message_timeout))
     304             :         {
     305             :             /* Time to send feedback! */
     306           8 :             if (!sendFeedback(conn, now, true, false))
     307           0 :                 goto error;
     308             : 
     309           8 :             last_status = now;
     310             :         }
     311             : 
     312             :         /* got SIGHUP, close output file */
     313         124 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
     314             :         {
     315           0 :             now = feGetCurrentTimestamp();
     316           0 :             if (!OutputFsync(now))
     317           0 :                 goto error;
     318           0 :             close(outfd);
     319           0 :             outfd = -1;
     320             :         }
     321         124 :         output_reopen = false;
     322             : 
     323             :         /* open the output file, if not open yet */
     324         124 :         if (outfd == -1)
     325             :         {
     326             :             struct stat statbuf;
     327             : 
     328           8 :             if (strcmp(outfile, "-") == 0)
     329           8 :                 outfd = fileno(stdout);
     330             :             else
     331           0 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
     332             :                              S_IRUSR | S_IWUSR);
     333           8 :             if (outfd == -1)
     334             :             {
     335           0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
     336           0 :                 goto error;
     337             :             }
     338             : 
     339           8 :             if (fstat(outfd, &statbuf) != 0)
     340           0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
     341             : 
     342           8 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
     343             :         }
     344             : 
     345         124 :         r = PQgetCopyData(conn, &copybuf, 1);
     346         124 :         if (r == 0)
     347             :         {
     348             :             /*
     349             :              * In async mode, and no data available. We block on reading but
     350             :              * not more than the specified timeout, so that we can send a
     351             :              * response back to the client.
     352             :              */
     353             :             fd_set      input_mask;
     354          58 :             TimestampTz message_target = 0;
     355          58 :             TimestampTz fsync_target = 0;
     356             :             struct timeval timeout;
     357          58 :             struct timeval *timeoutptr = NULL;
     358             : 
     359          58 :             if (PQsocket(conn) < 0)
     360             :             {
     361           0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     362           0 :                 goto error;
     363             :             }
     364             : 
     365          58 :             FD_ZERO(&input_mask);
     366          58 :             FD_SET(PQsocket(conn), &input_mask);
     367             : 
     368             :             /* Compute when we need to wakeup to send a keepalive message. */
     369          58 :             if (standby_message_timeout)
     370          58 :                 message_target = last_status + (standby_message_timeout - 1) *
     371             :                     ((int64) 1000);
     372             : 
     373             :             /* Compute when we need to wakeup to fsync the output file. */
     374          58 :             if (fsync_interval > 0 && output_needs_fsync)
     375          44 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
     376             :                     ((int64) 1000);
     377             : 
     378             :             /* Now compute when to wakeup. */
     379          58 :             if (message_target > 0 || fsync_target > 0)
     380             :             {
     381             :                 TimestampTz targettime;
     382             :                 long        secs;
     383             :                 int         usecs;
     384             : 
     385          58 :                 targettime = message_target;
     386             : 
     387          58 :                 if (fsync_target > 0 && fsync_target < targettime)
     388           0 :                     targettime = fsync_target;
     389             : 
     390          58 :                 feTimestampDifference(now,
     391             :                                       targettime,
     392             :                                       &secs,
     393             :                                       &usecs);
     394          58 :                 if (secs <= 0)
     395           0 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
     396             :                 else
     397          58 :                     timeout.tv_sec = secs;
     398          58 :                 timeout.tv_usec = usecs;
     399          58 :                 timeoutptr = &timeout;
     400             :             }
     401             : 
     402          58 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
     403          58 :             if (r == 0 || (r < 0 && errno == EINTR))
     404             :             {
     405             :                 /*
     406             :                  * Got a timeout or signal. Continue the loop and either
     407             :                  * deliver a status packet to the server or just go back into
     408             :                  * blocking.
     409             :                  */
     410          58 :                 continue;
     411             :             }
     412          58 :             else if (r < 0)
     413             :             {
     414           0 :                 pg_log_error("select() failed: %m");
     415           0 :                 goto error;
     416             :             }
     417             : 
     418             :             /* Else there is actually data on the socket */
     419          58 :             if (PQconsumeInput(conn) == 0)
     420             :             {
     421           0 :                 pg_log_error("could not receive data from WAL stream: %s",
     422             :                              PQerrorMessage(conn));
     423           0 :                 goto error;
     424             :             }
     425          58 :             continue;
     426             :         }
     427             : 
     428             :         /* End of copy stream */
     429          66 :         if (r == -1)
     430           8 :             break;
     431             : 
     432             :         /* Failure while reading the copy stream */
     433          66 :         if (r == -2)
     434             :         {
     435           0 :             pg_log_error("could not read COPY data: %s",
     436             :                          PQerrorMessage(conn));
     437           0 :             goto error;
     438             :         }
     439             : 
     440             :         /* Check the message type. */
     441          66 :         if (copybuf[0] == 'k')
     442             :         {
     443             :             int         pos;
     444             :             bool        replyRequested;
     445             :             XLogRecPtr  walEnd;
     446           8 :             bool        endposReached = false;
     447             : 
     448             :             /*
     449             :              * Parse the keepalive message, enclosed in the CopyData message.
     450             :              * We just check if the server requested a reply, and ignore the
     451             :              * rest.
     452             :              */
     453           8 :             pos = 1;            /* skip msgtype 'k' */
     454           8 :             walEnd = fe_recvint64(&copybuf[pos]);
     455           8 :             output_written_lsn = Max(walEnd, output_written_lsn);
     456             : 
     457           8 :             pos += 8;           /* read walEnd */
     458             : 
     459           8 :             pos += 8;           /* skip sendTime */
     460             : 
     461           8 :             if (r < pos + 1)
     462             :             {
     463           0 :                 pg_log_error("streaming header too small: %d", r);
     464           0 :                 goto error;
     465             :             }
     466           8 :             replyRequested = copybuf[pos];
     467             : 
     468           8 :             if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
     469             :             {
     470             :                 /*
     471             :                  * If there's nothing to read on the socket until a keepalive
     472             :                  * we know that the server has nothing to send us; and if
     473             :                  * walEnd has passed endpos, we know nothing else can have
     474             :                  * committed before endpos.  So we can bail out now.
     475             :                  */
     476           2 :                 endposReached = true;
     477             :             }
     478             : 
     479             :             /* Send a reply, if necessary */
     480           8 :             if (replyRequested || endposReached)
     481             :             {
     482           2 :                 if (!flushAndSendFeedback(conn, &now))
     483           0 :                     goto error;
     484           2 :                 last_status = now;
     485             :             }
     486             : 
     487           8 :             if (endposReached)
     488             :             {
     489           2 :                 prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
     490           2 :                 time_to_abort = true;
     491           2 :                 break;
     492             :             }
     493             : 
     494           6 :             continue;
     495             :         }
     496          58 :         else if (copybuf[0] != 'w')
     497             :         {
     498           0 :             pg_log_error("unrecognized streaming header: \"%c\"",
     499             :                          copybuf[0]);
     500           0 :             goto error;
     501             :         }
     502             : 
     503             :         /*
     504             :          * Read the header of the XLogData message, enclosed in the CopyData
     505             :          * message. We only need the WAL location field (dataStart), the rest
     506             :          * of the header is ignored.
     507             :          */
     508          58 :         hdr_len = 1;            /* msgtype 'w' */
     509          58 :         hdr_len += 8;           /* dataStart */
     510          58 :         hdr_len += 8;           /* walEnd */
     511          58 :         hdr_len += 8;           /* sendTime */
     512          58 :         if (r < hdr_len + 1)
     513             :         {
     514           0 :             pg_log_error("streaming header too small: %d", r);
     515           0 :             goto error;
     516             :         }
     517             : 
     518             :         /* Extract WAL location for this block */
     519          58 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
     520             : 
     521          58 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
     522             :         {
     523             :             /*
     524             :              * We've read past our endpoint, so prepare to go away being
     525             :              * cautious about what happens to our output data.
     526             :              */
     527           0 :             if (!flushAndSendFeedback(conn, &now))
     528           0 :                 goto error;
     529           0 :             prepareToTerminate(conn, endpos, false, cur_record_lsn);
     530           0 :             time_to_abort = true;
     531           0 :             break;
     532             :         }
     533             : 
     534          58 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
     535             : 
     536          58 :         bytes_left = r - hdr_len;
     537          58 :         bytes_written = 0;
     538             : 
     539             :         /* signal that a fsync is needed */
     540          58 :         output_needs_fsync = true;
     541             : 
     542         116 :         while (bytes_left)
     543             :         {
     544             :             int         ret;
     545             : 
     546         174 :             ret = write(outfd,
     547          58 :                         copybuf + hdr_len + bytes_written,
     548             :                         bytes_left);
     549             : 
     550          58 :             if (ret < 0)
     551             :             {
     552           0 :                 pg_log_error("could not write %u bytes to log file \"%s\": %m",
     553             :                              bytes_left, outfile);
     554           0 :                 goto error;
     555             :             }
     556             : 
     557             :             /* Write was successful, advance our position */
     558          58 :             bytes_written += ret;
     559          58 :             bytes_left -= ret;
     560             :         }
     561             : 
     562          58 :         if (write(outfd, "\n", 1) != 1)
     563             :         {
     564           0 :             pg_log_error("could not write %u bytes to log file \"%s\": %m",
     565             :                          1, outfile);
     566           0 :             goto error;
     567             :         }
     568             : 
     569          58 :         if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
     570             :         {
     571             :             /* endpos was exactly the record we just processed, we're done */
     572           6 :             if (!flushAndSendFeedback(conn, &now))
     573           0 :                 goto error;
     574           6 :             prepareToTerminate(conn, endpos, false, cur_record_lsn);
     575           6 :             time_to_abort = true;
     576           6 :             break;
     577             :         }
     578             :     }
     579             : 
     580           8 :     res = PQgetResult(conn);
     581           8 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
     582             :     {
     583           8 :         PQclear(res);
     584             : 
     585             :         /*
     586             :          * We're doing a client-initiated clean exit and have sent CopyDone to
     587             :          * the server. Drain any messages, so we don't miss a last-minute
     588             :          * ErrorResponse. The walsender stops generating XLogData records once
     589             :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
     590             :          * it's too late for sendFeedback(), even if this were to take a long
     591             :          * time. Hence, use synchronous-mode PQgetCopyData().
     592             :          */
     593             :         while (1)
     594           0 :         {
     595             :             int         r;
     596             : 
     597           8 :             if (copybuf != NULL)
     598             :             {
     599           8 :                 PQfreemem(copybuf);
     600           8 :                 copybuf = NULL;
     601             :             }
     602           8 :             r = PQgetCopyData(conn, &copybuf, 0);
     603           8 :             if (r == -1)
     604           8 :                 break;
     605           0 :             if (r == -2)
     606             :             {
     607           0 :                 pg_log_error("could not read COPY data: %s",
     608             :                              PQerrorMessage(conn));
     609           0 :                 time_to_abort = false;  /* unclean exit */
     610           0 :                 goto error;
     611             :             }
     612             :         }
     613             : 
     614           8 :         res = PQgetResult(conn);
     615             :     }
     616           8 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     617             :     {
     618           0 :         pg_log_error("unexpected termination of replication stream: %s",
     619             :                      PQresultErrorMessage(res));
     620           0 :         goto error;
     621             :     }
     622           8 :     PQclear(res);
     623             : 
     624           8 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
     625             :     {
     626           0 :         TimestampTz t = feGetCurrentTimestamp();
     627             : 
     628             :         /* no need to jump to error on failure here, we're finishing anyway */
     629           0 :         OutputFsync(t);
     630             : 
     631           0 :         if (close(outfd) != 0)
     632           0 :             pg_log_error("could not close file \"%s\": %m", outfile);
     633             :     }
     634           8 :     outfd = -1;
     635           8 : error:
     636           8 :     if (copybuf != NULL)
     637             :     {
     638           0 :         PQfreemem(copybuf);
     639           0 :         copybuf = NULL;
     640             :     }
     641           8 :     destroyPQExpBuffer(query);
     642           8 :     PQfinish(conn);
     643           8 :     conn = NULL;
     644             : }
     645             : 
     646             : /*
     647             :  * Unfortunately we can't do sensible signal handling on windows...
     648             :  */
     649             : #ifndef WIN32
     650             : 
     651             : /*
     652             :  * When sigint is called, just tell the system to exit at the next possible
     653             :  * moment.
     654             :  */
     655             : static void
     656           0 : sigint_handler(int signum)
     657             : {
     658           0 :     time_to_abort = true;
     659           0 : }
     660             : 
     661             : /*
     662             :  * Trigger the output file to be reopened.
     663             :  */
     664             : static void
     665           0 : sighup_handler(int signum)
     666             : {
     667           0 :     output_reopen = true;
     668           0 : }
     669             : #endif
     670             : 
     671             : 
     672             : int
     673          24 : main(int argc, char **argv)
     674             : {
     675             :     static struct option long_options[] = {
     676             : /* general options */
     677             :         {"file", required_argument, NULL, 'f'},
     678             :         {"fsync-interval", required_argument, NULL, 'F'},
     679             :         {"no-loop", no_argument, NULL, 'n'},
     680             :         {"verbose", no_argument, NULL, 'v'},
     681             :         {"version", no_argument, NULL, 'V'},
     682             :         {"help", no_argument, NULL, '?'},
     683             : /* connection options */
     684             :         {"dbname", required_argument, NULL, 'd'},
     685             :         {"host", required_argument, NULL, 'h'},
     686             :         {"port", required_argument, NULL, 'p'},
     687             :         {"username", required_argument, NULL, 'U'},
     688             :         {"no-password", no_argument, NULL, 'w'},
     689             :         {"password", no_argument, NULL, 'W'},
     690             : /* replication options */
     691             :         {"startpos", required_argument, NULL, 'I'},
     692             :         {"endpos", required_argument, NULL, 'E'},
     693             :         {"option", required_argument, NULL, 'o'},
     694             :         {"plugin", required_argument, NULL, 'P'},
     695             :         {"status-interval", required_argument, NULL, 's'},
     696             :         {"slot", required_argument, NULL, 'S'},
     697             : /* action */
     698             :         {"create-slot", no_argument, NULL, 1},
     699             :         {"start", no_argument, NULL, 2},
     700             :         {"drop-slot", no_argument, NULL, 3},
     701             :         {"if-not-exists", no_argument, NULL, 4},
     702             :         {NULL, 0, NULL, 0}
     703             :     };
     704             :     int         c;
     705             :     int         option_index;
     706             :     uint32      hi,
     707             :                 lo;
     708             :     char       *db_name;
     709             : 
     710          24 :     pg_logging_init(argv[0]);
     711          24 :     progname = get_progname(argv[0]);
     712          24 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     713             : 
     714          24 :     if (argc > 1)
     715             :     {
     716          22 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     717             :         {
     718           2 :             usage();
     719           2 :             exit(0);
     720             :         }
     721          20 :         else if (strcmp(argv[1], "-V") == 0 ||
     722          20 :                  strcmp(argv[1], "--version") == 0)
     723             :         {
     724           2 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
     725           2 :             exit(0);
     726             :         }
     727             :     }
     728             : 
     729          98 :     while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
     730             :                             long_options, &option_index)) != -1)
     731             :     {
     732          80 :         switch (c)
     733             :         {
     734             : /* general options */
     735           8 :             case 'f':
     736           8 :                 outfile = pg_strdup(optarg);
     737           8 :                 break;
     738           0 :             case 'F':
     739           0 :                 fsync_interval = atoi(optarg) * 1000;
     740           0 :                 if (fsync_interval < 0)
     741             :                 {
     742           0 :                     pg_log_error("invalid fsync interval \"%s\"", optarg);
     743           0 :                     exit(1);
     744             :                 }
     745           0 :                 break;
     746           8 :             case 'n':
     747           8 :                 noloop = 1;
     748           8 :                 break;
     749           0 :             case 'v':
     750           0 :                 verbose++;
     751           0 :                 break;
     752             : /* connection options */
     753          14 :             case 'd':
     754          14 :                 dbname = pg_strdup(optarg);
     755          14 :                 break;
     756           0 :             case 'h':
     757           0 :                 dbhost = pg_strdup(optarg);
     758           0 :                 break;
     759           0 :             case 'p':
     760           0 :                 if (atoi(optarg) <= 0)
     761             :                 {
     762           0 :                     pg_log_error("invalid port number \"%s\"", optarg);
     763           0 :                     exit(1);
     764             :                 }
     765           0 :                 dbport = pg_strdup(optarg);
     766           0 :                 break;
     767           0 :             case 'U':
     768           0 :                 dbuser = pg_strdup(optarg);
     769           0 :                 break;
     770           0 :             case 'w':
     771           0 :                 dbgetpassword = -1;
     772           0 :                 break;
     773           0 :             case 'W':
     774           0 :                 dbgetpassword = 1;
     775           0 :                 break;
     776             : /* replication options */
     777           0 :             case 'I':
     778           0 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     779             :                 {
     780           0 :                     pg_log_error("could not parse start position \"%s\"", optarg);
     781           0 :                     exit(1);
     782             :                 }
     783           0 :                 startpos = ((uint64) hi) << 32 | lo;
     784           0 :                 break;
     785           8 :             case 'E':
     786           8 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     787             :                 {
     788           0 :                     pg_log_error("could not parse end position \"%s\"", optarg);
     789           0 :                     exit(1);
     790             :                 }
     791           8 :                 endpos = ((uint64) hi) << 32 | lo;
     792           8 :                 break;
     793          12 :             case 'o':
     794             :                 {
     795          12 :                     char       *data = pg_strdup(optarg);
     796          12 :                     char       *val = strchr(data, '=');
     797             : 
     798          12 :                     if (val != NULL)
     799             :                     {
     800             :                         /* remove =; separate data from val */
     801          12 :                         *val = '\0';
     802          12 :                         val++;
     803             :                     }
     804             : 
     805          12 :                     noptions += 1;
     806          12 :                     options = pg_realloc(options, sizeof(char *) * noptions * 2);
     807             : 
     808          12 :                     options[(noptions - 1) * 2] = data;
     809          12 :                     options[(noptions - 1) * 2 + 1] = val;
     810             :                 }
     811             : 
     812          12 :                 break;
     813           0 :             case 'P':
     814           0 :                 plugin = pg_strdup(optarg);
     815           0 :                 break;
     816           0 :             case 's':
     817           0 :                 standby_message_timeout = atoi(optarg) * 1000;
     818           0 :                 if (standby_message_timeout < 0)
     819             :                 {
     820           0 :                     pg_log_error("invalid status interval \"%s\"", optarg);
     821           0 :                     exit(1);
     822             :                 }
     823           0 :                 break;
     824          16 :             case 'S':
     825          16 :                 replication_slot = pg_strdup(optarg);
     826          16 :                 break;
     827             : /* action */
     828           2 :             case 1:
     829           2 :                 do_create_slot = true;
     830           2 :                 break;
     831          10 :             case 2:
     832          10 :                 do_start_slot = true;
     833          10 :                 break;
     834           0 :             case 3:
     835           0 :                 do_drop_slot = true;
     836           0 :                 break;
     837           0 :             case 4:
     838           0 :                 slot_exists_ok = true;
     839           0 :                 break;
     840             : 
     841           2 :             default:
     842             : 
     843             :                 /*
     844             :                  * getopt_long already emitted a complaint
     845             :                  */
     846           2 :                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     847             :                         progname);
     848           2 :                 exit(1);
     849             :         }
     850             :     }
     851             : 
     852             :     /*
     853             :      * Any non-option arguments?
     854             :      */
     855          18 :     if (optind < argc)
     856             :     {
     857           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     858             :                      argv[optind]);
     859           0 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     860             :                 progname);
     861           0 :         exit(1);
     862             :     }
     863             : 
     864             :     /*
     865             :      * Required arguments
     866             :      */
     867          18 :     if (replication_slot == NULL)
     868             :     {
     869           2 :         pg_log_error("no slot specified");
     870           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     871             :                 progname);
     872           2 :         exit(1);
     873             :     }
     874             : 
     875          16 :     if (do_start_slot && outfile == NULL)
     876             :     {
     877           2 :         pg_log_error("no target file specified");
     878           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     879             :                 progname);
     880           2 :         exit(1);
     881             :     }
     882             : 
     883          14 :     if (!do_drop_slot && dbname == NULL)
     884             :     {
     885           2 :         pg_log_error("no database specified");
     886           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     887             :                 progname);
     888           2 :         exit(1);
     889             :     }
     890             : 
     891          12 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
     892             :     {
     893           2 :         pg_log_error("at least one action needs to be specified");
     894           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     895             :                 progname);
     896           2 :         exit(1);
     897             :     }
     898             : 
     899          10 :     if (do_drop_slot && (do_create_slot || do_start_slot))
     900             :     {
     901           0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
     902           0 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     903             :                 progname);
     904           0 :         exit(1);
     905             :     }
     906             : 
     907          10 :     if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
     908             :     {
     909           0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
     910           0 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     911             :                 progname);
     912           0 :         exit(1);
     913             :     }
     914             : 
     915          10 :     if (endpos != InvalidXLogRecPtr && !do_start_slot)
     916             :     {
     917           0 :         pg_log_error("--endpos may only be specified with --start");
     918           0 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     919             :                 progname);
     920           0 :         exit(1);
     921             :     }
     922             : 
     923             : #ifndef WIN32
     924          10 :     pqsignal(SIGINT, sigint_handler);
     925          10 :     pqsignal(SIGHUP, sighup_handler);
     926             : #endif
     927             : 
     928             :     /*
     929             :      * Obtain a connection to server. This is not really necessary but it
     930             :      * helps to get more precise error messages about authentication, required
     931             :      * GUC parameters and such.
     932             :      */
     933          10 :     conn = GetConnection();
     934          10 :     if (!conn)
     935             :         /* Error message already written in GetConnection() */
     936           0 :         exit(1);
     937          10 :     atexit(disconnect_atexit);
     938             : 
     939             :     /*
     940             :      * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
     941             :      * replication connection.
     942             :      */
     943          10 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     944           0 :         exit(1);
     945             : 
     946          10 :     if (db_name == NULL)
     947             :     {
     948           0 :         pg_log_error("could not establish database-specific replication connection");
     949           0 :         exit(1);
     950             :     }
     951             : 
     952             :     /*
     953             :      * Set umask so that directories/files are created with the same
     954             :      * permissions as directories/files in the source data directory.
     955             :      *
     956             :      * pg_mode_mask is set to owner-only by default and then updated in
     957             :      * GetConnection() where we get the mode from the server-side with
     958             :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     959             :      */
     960          10 :     umask(pg_mode_mask);
     961             : 
     962             :     /* Drop a replication slot. */
     963          10 :     if (do_drop_slot)
     964             :     {
     965           0 :         if (verbose)
     966           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     967             : 
     968           0 :         if (!DropReplicationSlot(conn, replication_slot))
     969           0 :             exit(1);
     970             :     }
     971             : 
     972             :     /* Create a replication slot. */
     973          10 :     if (do_create_slot)
     974             :     {
     975           2 :         if (verbose)
     976           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
     977             : 
     978           2 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
     979             :                                    false, false, slot_exists_ok))
     980           0 :             exit(1);
     981           2 :         startpos = InvalidXLogRecPtr;
     982             :     }
     983             : 
     984          10 :     if (!do_start_slot)
     985           2 :         exit(0);
     986             : 
     987             :     /* Stream loop */
     988             :     while (true)
     989             :     {
     990           8 :         StreamLogicalLog();
     991           8 :         if (time_to_abort)
     992             :         {
     993             :             /*
     994             :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
     995             :              * not an error, so exit without an errorcode.
     996             :              */
     997           8 :             exit(0);
     998             :         }
     999           0 :         else if (noloop)
    1000             :         {
    1001           0 :             pg_log_error("disconnected");
    1002           0 :             exit(1);
    1003             :         }
    1004             :         else
    1005             :         {
    1006             :             /* translator: check source for value for %d */
    1007           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
    1008             :                         RECONNECT_SLEEP_TIME);
    1009           0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
    1010             :         }
    1011             :     }
    1012             : }
    1013             : 
    1014             : /*
    1015             :  * Fsync our output data, and send a feedback message to the server.  Returns
    1016             :  * true if successful, false otherwise.
    1017             :  *
    1018             :  * If successful, *now is updated to the current timestamp just before sending
    1019             :  * feedback.
    1020             :  */
    1021             : static bool
    1022           8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
    1023             : {
    1024             :     /* flush data to disk, so that we send a recent flush pointer */
    1025           8 :     if (!OutputFsync(*now))
    1026           0 :         return false;
    1027           8 :     *now = feGetCurrentTimestamp();
    1028           8 :     if (!sendFeedback(conn, *now, true, false))
    1029           0 :         return false;
    1030             : 
    1031           8 :     return true;
    1032             : }
    1033             : 
    1034             : /*
    1035             :  * Try to inform the server about our upcoming demise, but don't wait around or
    1036             :  * retry on failure.
    1037             :  */
    1038             : static void
    1039           8 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
    1040             : {
    1041           8 :     (void) PQputCopyEnd(conn, NULL);
    1042           8 :     (void) PQflush(conn);
    1043             : 
    1044           8 :     if (verbose)
    1045             :     {
    1046           0 :         if (keepalive)
    1047           0 :             pg_log_info("end position %X/%X reached by keepalive",
    1048             :                         (uint32) (endpos >> 32), (uint32) endpos);
    1049             :         else
    1050           0 :             pg_log_info("end position %X/%X reached by WAL record at %X/%X",
    1051             :                         (uint32) (endpos >> 32), (uint32) (endpos),
    1052             :                         (uint32) (lsn >> 32), (uint32) lsn);
    1053             :     }
    1054           8 : }

Generated by: LCOV version 1.13