LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_recvlogical.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 78.6 % 485 381
Test Date: 2026-06-18 03:16:44 Functions: 90.0 % 10 9
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
       4              :  *                    fashion and write it to a local file.
       5              :  *
       6              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *        src/bin/pg_basebackup/pg_recvlogical.c
      10              :  *-------------------------------------------------------------------------
      11              :  */
      12              : 
      13              : #include "postgres_fe.h"
      14              : 
      15              : #include <dirent.h>
      16              : #include <limits.h>
      17              : #include <sys/select.h>
      18              : #include <sys/stat.h>
      19              : #include <unistd.h>
      20              : 
      21              : #include "common/file_perm.h"
      22              : #include "common/logging.h"
      23              : #include "fe_utils/option_utils.h"
      24              : #include "getopt_long.h"
      25              : #include "libpq-fe.h"
      26              : #include "libpq/pqsignal.h"
      27              : #include "libpq/protocol.h"
      28              : #include "pqexpbuffer.h"
      29              : #include "streamutil.h"
      30              : 
      31              : /* Time to sleep between reconnection attempts */
      32              : #define RECONNECT_SLEEP_TIME 5
      33              : 
      34              : typedef enum
      35              : {
      36              :     STREAM_STOP_NONE,
      37              :     STREAM_STOP_END_OF_WAL,
      38              :     STREAM_STOP_KEEPALIVE,
      39              :     STREAM_STOP_SIGNAL
      40              : } StreamStopReason;
      41              : 
      42              : /* Global Options */
      43              : static char *outfile = NULL;
      44              : static int  verbose = 0;
      45              : static bool two_phase = false;  /* enable-two-phase option */
      46              : static bool failover = false;   /* enable-failover option */
      47              : static int  noloop = 0;
      48              : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      49              : static int  fsync_interval = 10 * 1000; /* 10 sec = default */
      50              : static XLogRecPtr startpos = InvalidXLogRecPtr;
      51              : static XLogRecPtr endpos = InvalidXLogRecPtr;
      52              : static bool do_create_slot = false;
      53              : static bool slot_exists_ok = false;
      54              : static bool do_start_slot = false;
      55              : static bool do_drop_slot = false;
      56              : static char *replication_slot = NULL;
      57              : 
      58              : /* filled pairwise with option, value. value may be NULL */
      59              : static char **options;
      60              : static size_t noptions = 0;
      61              : static const char *plugin = "test_decoding";
      62              : 
      63              : /* Global State */
      64              : static int  outfd = -1;
      65              : static volatile sig_atomic_t time_to_abort = false;
      66              : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
      67              : static volatile sig_atomic_t output_reopen = false;
      68              : static bool output_isfile;
      69              : static TimestampTz output_last_fsync = -1;
      70              : static bool output_needs_fsync = false;
      71              : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
      72              : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
      73              : 
      74              : static void usage(void);
      75              : static void StreamLogicalLog(void);
      76              : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
      77              : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
      78              :                                StreamStopReason reason,
      79              :                                XLogRecPtr lsn);
      80              : 
      81              : static void
      82            1 : usage(void)
      83              : {
      84            1 :     printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
      85              :            progname);
      86            1 :     printf(_("Usage:\n"));
      87            1 :     printf(_("  %s [OPTION]...\n"), progname);
      88            1 :     printf(_("\nAction to be performed:\n"));
      89            1 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
      90            1 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
      91            1 :     printf(_("      --start            start streaming in a replication slot (for the slot's name see --slot)\n"));
      92            1 :     printf(_("\nOptions:\n"));
      93            1 :     printf(_("      --enable-failover  enable replication slot synchronization to standby servers when\n"
      94              :              "                         creating a replication slot\n"));
      95            1 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      96            1 :     printf(_("  -f, --file=FILE        receive log into this file, - for stdout\n"));
      97            1 :     printf(_("  -F  --fsync-interval=SECS\n"
      98              :              "                         time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
      99            1 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
     100            1 :     printf(_("  -I, --startpos=LSN     where in an existing slot should the streaming start\n"));
     101            1 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
     102            1 :     printf(_("  -o, --option=NAME[=VALUE]\n"
     103              :              "                         pass option NAME with optional value VALUE to the\n"
     104              :              "                         output plugin\n"));
     105            1 :     printf(_("  -P, --plugin=PLUGIN    use output plugin PLUGIN (default: %s)\n"), plugin);
     106            1 :     printf(_("  -s, --status-interval=SECS\n"
     107              :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
     108            1 :     printf(_("  -S, --slot=SLOTNAME    name of the logical replication slot\n"));
     109            1 :     printf(_("  -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
     110            1 :     printf(_("      --two-phase        (same as --enable-two-phase, deprecated)\n"));
     111            1 :     printf(_("  -v, --verbose          output verbose messages\n"));
     112            1 :     printf(_("  -V, --version          output version information, then exit\n"));
     113            1 :     printf(_("  -?, --help             show this help, then exit\n"));
     114            1 :     printf(_("\nConnection options:\n"));
     115            1 :     printf(_("  -d, --dbname=DBNAME    database to connect to\n"));
     116            1 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
     117            1 :     printf(_("  -p, --port=PORT        database server port number\n"));
     118            1 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     119            1 :     printf(_("  -w, --no-password      never prompt for password\n"));
     120            1 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     121            1 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     122            1 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     123            1 : }
     124              : 
     125              : /*
     126              :  * Send a Standby Status Update message to server.
     127              :  */
     128              : static bool
     129           29 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
     130              : {
     131              :     static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
     132              :     static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
     133              : 
     134              :     char        replybuf[1 + 8 + 8 + 8 + 8 + 1];
     135           29 :     int         len = 0;
     136              : 
     137              :     /*
     138              :      * we normally don't want to send superfluous feedback, but if it's
     139              :      * because of a timeout we need to, otherwise wal_sender_timeout will kill
     140              :      * us.
     141              :      */
     142           29 :     if (!force &&
     143            0 :         last_written_lsn == output_written_lsn &&
     144            0 :         last_fsync_lsn == output_fsync_lsn)
     145            0 :         return true;
     146              : 
     147           29 :     if (verbose)
     148            2 :         pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
     149              :                     LSN_FORMAT_ARGS(output_written_lsn),
     150              :                     LSN_FORMAT_ARGS(output_fsync_lsn),
     151              :                     replication_slot);
     152              : 
     153           29 :     replybuf[len] = PqReplMsg_StandbyStatusUpdate;
     154           29 :     len += 1;
     155           29 :     fe_sendint64(output_written_lsn, &replybuf[len]);   /* write */
     156           29 :     len += 8;
     157           29 :     fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
     158           29 :     len += 8;
     159           29 :     fe_sendint64(InvalidXLogRecPtr, &replybuf[len]);    /* apply */
     160           29 :     len += 8;
     161           29 :     fe_sendint64(now, &replybuf[len]);  /* sendTime */
     162           29 :     len += 8;
     163           29 :     replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
     164           29 :     len += 1;
     165              : 
     166           29 :     startpos = output_written_lsn;
     167           29 :     last_written_lsn = output_written_lsn;
     168           29 :     last_fsync_lsn = output_fsync_lsn;
     169              : 
     170           29 :     if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
     171              :     {
     172            0 :         pg_log_error("could not send feedback packet: %s",
     173              :                      PQerrorMessage(conn));
     174            0 :         return false;
     175              :     }
     176              : 
     177           29 :     return true;
     178              : }
     179              : 
     180              : static void
     181           60 : disconnect_atexit(void)
     182              : {
     183           60 :     if (conn != NULL)
     184           34 :         PQfinish(conn);
     185           60 : }
     186              : 
     187              : static void
     188           40 : OutputFsync(TimestampTz now)
     189              : {
     190           40 :     output_last_fsync = now;
     191              : 
     192           40 :     output_fsync_lsn = output_written_lsn;
     193              : 
     194              :     /*
     195              :      * Save the last flushed position as the replication start point. On
     196              :      * reconnect, replication resumes from there to avoid re-sending flushed
     197              :      * data.
     198              :      */
     199           40 :     startpos = output_fsync_lsn;
     200              : 
     201           40 :     if (fsync_interval <= 0)
     202            0 :         return;
     203              : 
     204           40 :     if (!output_needs_fsync)
     205           26 :         return;
     206              : 
     207           14 :     output_needs_fsync = false;
     208              : 
     209              :     /* can only fsync if it's a regular file */
     210           14 :     if (!output_isfile)
     211           11 :         return;
     212              : 
     213            3 :     if (fsync(outfd) != 0)
     214            0 :         pg_fatal("could not fsync file \"%s\": %m", outfile);
     215              : }
     216              : 
     217              : /*
     218              :  * Start the log streaming
     219              :  */
     220              : static void
     221           27 : StreamLogicalLog(void)
     222              : {
     223              :     PGresult   *res;
     224           27 :     char       *copybuf = NULL;
     225           27 :     TimestampTz last_status = -1;
     226              :     int         i;
     227              :     PQExpBuffer query;
     228              :     XLogRecPtr  cur_record_lsn;
     229              : 
     230           27 :     cur_record_lsn = InvalidXLogRecPtr;
     231              : 
     232              :     /*
     233              :      * Connect in replication mode to the server
     234              :      */
     235           27 :     if (!conn)
     236            1 :         conn = GetConnection();
     237           27 :     if (!conn)
     238              :         /* Error message already written in GetConnection() */
     239            0 :         return;
     240              : 
     241              :     /*
     242              :      * Start the replication
     243              :      */
     244           27 :     if (verbose)
     245            2 :         pg_log_info("starting log streaming at %X/%08X (slot %s)",
     246              :                     LSN_FORMAT_ARGS(startpos),
     247              :                     replication_slot);
     248              : 
     249              :     /* Initiate the replication stream at specified location */
     250           27 :     query = createPQExpBuffer();
     251           27 :     appendPQExpBufferStr(query, "START_REPLICATION SLOT ");
     252           27 :     AppendQuotedIdentifier(query, replication_slot);
     253           27 :     appendPQExpBuffer(query, " LOGICAL %X/%08X", LSN_FORMAT_ARGS(startpos));
     254              : 
     255              :     /* print options if there are any */
     256           27 :     if (noptions)
     257           21 :         appendPQExpBufferStr(query, " (");
     258              : 
     259           69 :     for (i = 0; i < noptions; i++)
     260              :     {
     261              :         /* separator */
     262           42 :         if (i > 0)
     263           21 :             appendPQExpBufferStr(query, ", ");
     264              : 
     265              :         /* write option name */
     266           42 :         AppendQuotedIdentifier(query, options[i * 2]);
     267              : 
     268              :         /* write option value if specified */
     269           42 :         if (options[i * 2 + 1] != NULL)
     270              :         {
     271           42 :             appendPQExpBufferChar(query, ' ');
     272           42 :             AppendQuotedLiteral(query, options[i * 2 + 1]);
     273              :         }
     274              :     }
     275              : 
     276           27 :     if (noptions)
     277           21 :         appendPQExpBufferChar(query, ')');
     278              : 
     279           27 :     res = PQexec(conn, query->data);
     280           27 :     if (PQresultStatus(res) != PGRES_COPY_BOTH)
     281              :     {
     282            6 :         pg_log_error("could not send replication command \"%s\": %s",
     283              :                      query->data, PQresultErrorMessage(res));
     284            6 :         PQclear(res);
     285            6 :         goto error;
     286              :     }
     287           21 :     PQclear(res);
     288           21 :     resetPQExpBuffer(query);
     289              : 
     290           21 :     if (verbose)
     291            2 :         pg_log_info("streaming initiated");
     292              : 
     293          546 :     while (!time_to_abort)
     294              :     {
     295              :         int         r;
     296              :         int         bytes_left;
     297              :         int         bytes_written;
     298              :         TimestampTz now;
     299              :         int         hdr_len;
     300              : 
     301          543 :         cur_record_lsn = InvalidXLogRecPtr;
     302              : 
     303          543 :         if (copybuf != NULL)
     304              :         {
     305          421 :             PQfreemem(copybuf);
     306          421 :             copybuf = NULL;
     307              :         }
     308              : 
     309              :         /*
     310              :          * Potentially send a status message to the primary.
     311              :          */
     312          543 :         now = feGetCurrentTimestamp();
     313              : 
     314         1066 :         if (outfd != -1 &&
     315          523 :             feTimestampDifferenceExceeds(output_last_fsync, now,
     316              :                                          fsync_interval))
     317           20 :             OutputFsync(now);
     318              : 
     319         1086 :         if (standby_message_timeout > 0 &&
     320          543 :             feTimestampDifferenceExceeds(last_status, now,
     321              :                                          standby_message_timeout))
     322              :         {
     323              :             /* Time to send feedback! */
     324           21 :             if (!sendFeedback(conn, now, true, false))
     325            3 :                 goto error;
     326              : 
     327           21 :             last_status = now;
     328              :         }
     329              : 
     330              :         /* got SIGHUP, close output file */
     331          543 :         if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
     332              :         {
     333            0 :             now = feGetCurrentTimestamp();
     334            0 :             OutputFsync(now);
     335            0 :             close(outfd);
     336            0 :             outfd = -1;
     337              :         }
     338          543 :         output_reopen = false;
     339              : 
     340              :         /* open the output file, if not open yet */
     341          543 :         if (outfd == -1)
     342              :         {
     343              :             struct stat statbuf;
     344              : 
     345           20 :             if (strcmp(outfile, "-") == 0)
     346           18 :                 outfd = fileno(stdout);
     347              :             else
     348            2 :                 outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
     349              :                              pg_file_create_mode);
     350           20 :             if (outfd == -1)
     351              :             {
     352            0 :                 pg_log_error("could not open log file \"%s\": %m", outfile);
     353            0 :                 goto error;
     354              :             }
     355              : 
     356           20 :             if (fstat(outfd, &statbuf) != 0)
     357              :             {
     358            0 :                 pg_log_error("could not stat file \"%s\": %m", outfile);
     359            0 :                 goto error;
     360              :             }
     361              : 
     362           20 :             output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
     363              :         }
     364              : 
     365          543 :         r = PQgetCopyData(conn, &copybuf, 1);
     366          543 :         if (r == 0)
     367          101 :         {
     368              :             /*
     369              :              * In async mode, and no data available. We block on reading but
     370              :              * not more than the specified timeout, so that we can send a
     371              :              * response back to the client.
     372              :              */
     373              :             fd_set      input_mask;
     374          107 :             TimestampTz message_target = 0;
     375          107 :             TimestampTz fsync_target = 0;
     376              :             struct timeval timeout;
     377          107 :             struct timeval *timeoutptr = NULL;
     378              : 
     379          107 :             if (PQsocket(conn) < 0)
     380              :             {
     381            0 :                 pg_log_error("invalid socket: %s", PQerrorMessage(conn));
     382            3 :                 goto error;
     383              :             }
     384              : 
     385         1819 :             FD_ZERO(&input_mask);
     386          107 :             FD_SET(PQsocket(conn), &input_mask);
     387              : 
     388              :             /* Compute when we need to wakeup to send a keepalive message. */
     389          107 :             if (standby_message_timeout)
     390          107 :                 message_target = last_status + (standby_message_timeout - 1) *
     391              :                     ((int64) 1000);
     392              : 
     393              :             /* Compute when we need to wakeup to fsync the output file. */
     394          107 :             if (fsync_interval > 0 && output_needs_fsync)
     395           61 :                 fsync_target = output_last_fsync + (fsync_interval - 1) *
     396              :                     ((int64) 1000);
     397              : 
     398              :             /* Now compute when to wakeup. */
     399          107 :             if (message_target > 0 || fsync_target > 0)
     400              :             {
     401              :                 TimestampTz targettime;
     402              :                 long        secs;
     403              :                 int         usecs;
     404              : 
     405          107 :                 targettime = message_target;
     406              : 
     407          107 :                 if (fsync_target > 0 && fsync_target < targettime)
     408            5 :                     targettime = fsync_target;
     409              : 
     410          107 :                 feTimestampDifference(now,
     411              :                                       targettime,
     412              :                                       &secs,
     413              :                                       &usecs);
     414          107 :                 if (secs <= 0)
     415            5 :                     timeout.tv_sec = 1; /* Always sleep at least 1 sec */
     416              :                 else
     417          102 :                     timeout.tv_sec = secs;
     418          107 :                 timeout.tv_usec = usecs;
     419          107 :                 timeoutptr = &timeout;
     420              :             }
     421              : 
     422          107 :             r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
     423          107 :             if (r == 0 || (r < 0 && errno == EINTR))
     424              :             {
     425              :                 /*
     426              :                  * Got a timeout or signal. Continue the loop and either
     427              :                  * deliver a status packet to the server or just go back into
     428              :                  * blocking.
     429              :                  */
     430          104 :                 continue;
     431              :             }
     432          104 :             else if (r < 0)
     433              :             {
     434            0 :                 pg_log_error("%s() failed: %m", "select");
     435            0 :                 goto error;
     436              :             }
     437              : 
     438              :             /* Else there is actually data on the socket */
     439          104 :             if (PQconsumeInput(conn) == 0)
     440              :             {
     441            3 :                 pg_log_error("could not receive data from WAL stream: %s",
     442              :                              PQerrorMessage(conn));
     443            3 :                 goto error;
     444              :             }
     445          101 :             continue;
     446              :         }
     447              : 
     448              :         /* End of copy stream */
     449          436 :         if (r == -1)
     450           15 :             break;
     451              : 
     452              :         /* Failure while reading the copy stream */
     453          428 :         if (r == -2)
     454              :         {
     455            0 :             pg_log_error("could not read COPY data: %s",
     456              :                          PQerrorMessage(conn));
     457            0 :             goto error;
     458              :         }
     459              : 
     460              :         /* Check the message type. */
     461          428 :         if (copybuf[0] == PqReplMsg_Keepalive)
     462          305 :         {
     463              :             int         pos;
     464              :             bool        replyRequested;
     465              :             XLogRecPtr  walEnd;
     466          307 :             bool        endposReached = false;
     467              : 
     468              :             /*
     469              :              * Parse the keepalive message, enclosed in the CopyData message.
     470              :              * We just check if the server requested a reply, and ignore the
     471              :              * rest.
     472              :              */
     473          307 :             pos = 1;            /* skip msgtype PqReplMsg_Keepalive */
     474          307 :             walEnd = fe_recvint64(&copybuf[pos]);
     475          307 :             output_written_lsn = Max(walEnd, output_written_lsn);
     476              : 
     477          307 :             pos += 8;           /* read walEnd */
     478              : 
     479          307 :             pos += 8;           /* skip sendTime */
     480              : 
     481          307 :             if (r < pos + 1)
     482              :             {
     483            0 :                 pg_log_error("streaming header too small: %d", r);
     484            0 :                 goto error;
     485              :             }
     486          307 :             replyRequested = copybuf[pos];
     487              : 
     488          307 :             if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
     489              :             {
     490              :                 /*
     491              :                  * If there's nothing to read on the socket until a keepalive
     492              :                  * we know that the server has nothing to send us; and if
     493              :                  * walEnd has passed endpos, we know nothing else can have
     494              :                  * committed before endpos.  So we can bail out now.
     495              :                  */
     496            2 :                 endposReached = true;
     497              :             }
     498              : 
     499              :             /* Send a reply, if necessary */
     500          307 :             if (replyRequested || endposReached)
     501              :             {
     502            3 :                 if (!flushAndSendFeedback(conn, &now))
     503            0 :                     goto error;
     504            3 :                 last_status = now;
     505              :             }
     506              : 
     507          307 :             if (endposReached)
     508              :             {
     509            2 :                 stop_reason = STREAM_STOP_KEEPALIVE;
     510            2 :                 time_to_abort = true;
     511            2 :                 break;
     512              :             }
     513              : 
     514          305 :             continue;
     515              :         }
     516          121 :         else if (copybuf[0] != PqReplMsg_WALData)
     517              :         {
     518            0 :             pg_log_error("unrecognized streaming header: \"%c\"",
     519              :                          copybuf[0]);
     520            0 :             goto error;
     521              :         }
     522              : 
     523              :         /*
     524              :          * Read the header of the WALData message, enclosed in the CopyData
     525              :          * message. We only need the WAL location field (dataStart), the rest
     526              :          * of the header is ignored.
     527              :          */
     528          121 :         hdr_len = 1;            /* msgtype PqReplMsg_WALData */
     529          121 :         hdr_len += 8;           /* dataStart */
     530          121 :         hdr_len += 8;           /* walEnd */
     531          121 :         hdr_len += 8;           /* sendTime */
     532          121 :         if (r < hdr_len + 1)
     533              :         {
     534            0 :             pg_log_error("streaming header too small: %d", r);
     535            0 :             goto error;
     536              :         }
     537              : 
     538              :         /* Extract WAL location for this block */
     539          121 :         cur_record_lsn = fe_recvint64(&copybuf[1]);
     540              : 
     541          121 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
     542              :         {
     543              :             /*
     544              :              * We've read past our endpoint, so prepare to go away being
     545              :              * cautious about what happens to our output data.
     546              :              */
     547            0 :             if (!flushAndSendFeedback(conn, &now))
     548            0 :                 goto error;
     549            0 :             stop_reason = STREAM_STOP_END_OF_WAL;
     550            0 :             time_to_abort = true;
     551            0 :             break;
     552              :         }
     553              : 
     554          121 :         output_written_lsn = Max(cur_record_lsn, output_written_lsn);
     555              : 
     556          121 :         bytes_left = r - hdr_len;
     557          121 :         bytes_written = 0;
     558              : 
     559              :         /* signal that a fsync is needed */
     560          121 :         output_needs_fsync = true;
     561              : 
     562          242 :         while (bytes_left)
     563              :         {
     564              :             int         ret;
     565              : 
     566          242 :             ret = write(outfd,
     567          121 :                         copybuf + hdr_len + bytes_written,
     568              :                         bytes_left);
     569              : 
     570          121 :             if (ret < 0)
     571              :             {
     572            0 :                 pg_log_error("could not write %d bytes to log file \"%s\": %m",
     573              :                              bytes_left, outfile);
     574            0 :                 goto error;
     575              :             }
     576              : 
     577              :             /* Write was successful, advance our position */
     578          121 :             bytes_written += ret;
     579          121 :             bytes_left -= ret;
     580              :         }
     581              : 
     582          121 :         if (write(outfd, "\n", 1) != 1)
     583              :         {
     584            0 :             pg_log_error("could not write %d bytes to log file \"%s\": %m",
     585              :                          1, outfile);
     586            0 :             goto error;
     587              :         }
     588              : 
     589          121 :         if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
     590              :         {
     591              :             /* endpos was exactly the record we just processed, we're done */
     592            5 :             if (!flushAndSendFeedback(conn, &now))
     593            0 :                 goto error;
     594            5 :             stop_reason = STREAM_STOP_END_OF_WAL;
     595            5 :             time_to_abort = true;
     596            5 :             break;
     597              :         }
     598              :     }
     599              : 
     600              :     /* Clean up connection state if stream has been aborted */
     601           18 :     if (time_to_abort)
     602           10 :         prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
     603              : 
     604           18 :     res = PQgetResult(conn);
     605           18 :     if (PQresultStatus(res) == PGRES_COPY_OUT)
     606              :     {
     607           10 :         PQclear(res);
     608              : 
     609              :         /*
     610              :          * We're doing a client-initiated clean exit and have sent CopyDone to
     611              :          * the server. Drain any messages, so we don't miss a last-minute
     612              :          * ErrorResponse. The walsender stops generating WALData records once
     613              :          * it sees CopyDone, so expect this to finish quickly. After CopyDone,
     614              :          * it's too late for sendFeedback(), even if this were to take a long
     615              :          * time. Hence, use synchronous-mode PQgetCopyData().
     616              :          */
     617              :         while (1)
     618          153 :         {
     619              :             int         r;
     620              : 
     621          163 :             if (copybuf != NULL)
     622              :             {
     623          160 :                 PQfreemem(copybuf);
     624          160 :                 copybuf = NULL;
     625              :             }
     626          163 :             r = PQgetCopyData(conn, &copybuf, 0);
     627          163 :             if (r == -1)
     628           10 :                 break;
     629          153 :             if (r == -2)
     630              :             {
     631            0 :                 pg_log_error("could not read COPY data: %s",
     632              :                              PQerrorMessage(conn));
     633            0 :                 time_to_abort = false;  /* unclean exit */
     634            0 :                 goto error;
     635              :             }
     636              :         }
     637              : 
     638           10 :         res = PQgetResult(conn);
     639              :     }
     640           18 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     641              :     {
     642            7 :         pg_log_error("unexpected termination of replication stream: %s",
     643              :                      PQresultErrorMessage(res));
     644            7 :         PQclear(res);
     645            7 :         goto error;
     646              :     }
     647           11 :     PQclear(res);
     648              : 
     649           11 :     if (outfd != -1 && strcmp(outfile, "-") != 0)
     650              :     {
     651            2 :         TimestampTz t = feGetCurrentTimestamp();
     652              : 
     653            2 :         OutputFsync(t);
     654            2 :         if (close(outfd) != 0)
     655            0 :             pg_log_error("could not close file \"%s\": %m", outfile);
     656              :     }
     657           11 :     outfd = -1;
     658           27 : error:
     659           27 :     if (copybuf != NULL)
     660              :     {
     661            0 :         PQfreemem(copybuf);
     662            0 :         copybuf = NULL;
     663              :     }
     664           27 :     destroyPQExpBuffer(query);
     665           27 :     PQfinish(conn);
     666           27 :     conn = NULL;
     667              : }
     668              : 
     669              : /*
     670              :  * Unfortunately we can't do sensible signal handling on windows...
     671              :  */
     672              : #ifndef WIN32
     673              : 
     674              : /*
     675              :  * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
     676              :  * possible moment.
     677              :  */
     678              : static void
     679            3 : sigexit_handler(SIGNAL_ARGS)
     680              : {
     681            3 :     stop_reason = STREAM_STOP_SIGNAL;
     682            3 :     time_to_abort = true;
     683            3 : }
     684              : 
     685              : /*
     686              :  * Trigger the output file to be reopened.
     687              :  */
     688              : static void
     689            0 : sighup_handler(SIGNAL_ARGS)
     690              : {
     691            0 :     output_reopen = true;
     692            0 : }
     693              : #endif
     694              : 
     695              : 
     696              : int
     697           68 : main(int argc, char **argv)
     698              : {
     699              :     static struct option long_options[] = {
     700              : /* general options */
     701              :         {"file", required_argument, NULL, 'f'},
     702              :         {"fsync-interval", required_argument, NULL, 'F'},
     703              :         {"no-loop", no_argument, NULL, 'n'},
     704              :         {"enable-failover", no_argument, NULL, 5},
     705              :         {"enable-two-phase", no_argument, NULL, 't'},
     706              :         {"two-phase", no_argument, NULL, 't'},    /* deprecated */
     707              :         {"verbose", no_argument, NULL, 'v'},
     708              :         {"version", no_argument, NULL, 'V'},
     709              :         {"help", no_argument, NULL, '?'},
     710              : /* connection options */
     711              :         {"dbname", required_argument, NULL, 'd'},
     712              :         {"host", required_argument, NULL, 'h'},
     713              :         {"port", required_argument, NULL, 'p'},
     714              :         {"username", required_argument, NULL, 'U'},
     715              :         {"no-password", no_argument, NULL, 'w'},
     716              :         {"password", no_argument, NULL, 'W'},
     717              : /* replication options */
     718              :         {"startpos", required_argument, NULL, 'I'},
     719              :         {"endpos", required_argument, NULL, 'E'},
     720              :         {"option", required_argument, NULL, 'o'},
     721              :         {"plugin", required_argument, NULL, 'P'},
     722              :         {"status-interval", required_argument, NULL, 's'},
     723              :         {"slot", required_argument, NULL, 'S'},
     724              : /* action */
     725              :         {"create-slot", no_argument, NULL, 1},
     726              :         {"start", no_argument, NULL, 2},
     727              :         {"drop-slot", no_argument, NULL, 3},
     728              :         {"if-not-exists", no_argument, NULL, 4},
     729              :         {NULL, 0, NULL, 0}
     730              :     };
     731              :     int         c;
     732              :     int         option_index;
     733              :     uint32      hi,
     734              :                 lo;
     735              :     char       *db_name;
     736              : 
     737           68 :     pg_logging_init(argv[0]);
     738           68 :     progname = get_progname(argv[0]);
     739           68 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     740              : 
     741           68 :     if (argc > 1)
     742              :     {
     743           67 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     744              :         {
     745            1 :             usage();
     746            1 :             exit(0);
     747              :         }
     748           66 :         else if (strcmp(argv[1], "-V") == 0 ||
     749           66 :                  strcmp(argv[1], "--version") == 0)
     750              :         {
     751            1 :             puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
     752            1 :             exit(0);
     753              :         }
     754              :     }
     755              : 
     756          389 :     while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
     757          389 :                             long_options, &option_index)) != -1)
     758              :     {
     759          324 :         switch (c)
     760              :         {
     761              : /* general options */
     762           27 :             case 'f':
     763           27 :                 outfile = pg_strdup(optarg);
     764           27 :                 break;
     765            2 :             case 'F':
     766            2 :                 if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
     767              :                                       INT_MAX / 1000,
     768              :                                       &fsync_interval))
     769            0 :                     exit(1);
     770            2 :                 fsync_interval *= 1000;
     771            2 :                 break;
     772           24 :             case 'n':
     773           24 :                 noloop = 1;
     774           24 :                 break;
     775            2 :             case 't':
     776            2 :                 two_phase = true;
     777            2 :                 break;
     778            1 :             case 'v':
     779            1 :                 verbose++;
     780            1 :                 break;
     781            1 :             case 5:
     782            1 :                 failover = true;
     783            1 :                 break;
     784              : /* connection options */
     785           62 :             case 'd':
     786           62 :                 dbname = pg_strdup(optarg);
     787           62 :                 break;
     788            0 :             case 'h':
     789            0 :                 dbhost = pg_strdup(optarg);
     790            0 :                 break;
     791            0 :             case 'p':
     792            0 :                 dbport = pg_strdup(optarg);
     793            0 :                 break;
     794            0 :             case 'U':
     795            0 :                 dbuser = pg_strdup(optarg);
     796            0 :                 break;
     797            0 :             case 'w':
     798            0 :                 dbgetpassword = -1;
     799            0 :                 break;
     800            0 :             case 'W':
     801            0 :                 dbgetpassword = 1;
     802            0 :                 break;
     803              : /* replication options */
     804            0 :             case 'I':
     805            0 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
     806            0 :                     pg_fatal("could not parse start position \"%s\"", optarg);
     807            0 :                 startpos = ((uint64) hi) << 32 | lo;
     808            0 :                 break;
     809            8 :             case 'E':
     810            8 :                 if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
     811            0 :                     pg_fatal("could not parse end position \"%s\"", optarg);
     812            8 :                 endpos = ((uint64) hi) << 32 | lo;
     813            8 :                 break;
     814           42 :             case 'o':
     815              :                 {
     816           42 :                     char       *data = pg_strdup(optarg);
     817           42 :                     char       *val = strchr(data, '=');
     818              : 
     819           42 :                     if (val != NULL)
     820              :                     {
     821              :                         /* remove =; separate data from val */
     822           42 :                         *val = '\0';
     823           42 :                         val++;
     824              :                     }
     825              : 
     826           42 :                     noptions += 1;
     827           42 :                     options = pg_realloc_array(options, char *, noptions * 2);
     828              : 
     829           42 :                     options[(noptions - 1) * 2] = data;
     830           42 :                     options[(noptions - 1) * 2 + 1] = val;
     831              :                 }
     832              : 
     833           42 :                 break;
     834           27 :             case 'P':
     835           27 :                 plugin = pg_strdup(optarg);
     836           27 :                 break;
     837            1 :             case 's':
     838            1 :                 if (!option_parse_int(optarg, "-s/--status-interval", 0,
     839              :                                       INT_MAX / 1000,
     840              :                                       &standby_message_timeout))
     841            0 :                     exit(1);
     842            1 :                 standby_message_timeout *= 1000;
     843            1 :                 break;
     844           64 :             case 'S':
     845           64 :                 replication_slot = pg_strdup(optarg);
     846           64 :                 break;
     847              : /* action */
     848           31 :             case 1:
     849           31 :                 do_create_slot = true;
     850           31 :                 break;
     851           28 :             case 2:
     852           28 :                 do_start_slot = true;
     853           28 :                 break;
     854            3 :             case 3:
     855            3 :                 do_drop_slot = true;
     856            3 :                 break;
     857            0 :             case 4:
     858            0 :                 slot_exists_ok = true;
     859            0 :                 break;
     860              : 
     861            1 :             default:
     862              :                 /* getopt_long already emitted a complaint */
     863            1 :                 pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     864            1 :                 exit(1);
     865              :         }
     866              :     }
     867              : 
     868              :     /*
     869              :      * Any non-option arguments?
     870              :      */
     871           65 :     if (optind < argc)
     872              :     {
     873            0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     874              :                      argv[optind]);
     875            0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     876            0 :         exit(1);
     877              :     }
     878              : 
     879              :     /*
     880              :      * Required arguments
     881              :      */
     882           65 :     if (replication_slot == NULL)
     883              :     {
     884            1 :         pg_log_error("no slot specified");
     885            1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     886            1 :         exit(1);
     887              :     }
     888              : 
     889           64 :     if (do_start_slot && outfile == NULL)
     890              :     {
     891            1 :         pg_log_error("no target file specified");
     892            1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     893            1 :         exit(1);
     894              :     }
     895              : 
     896           63 :     if (!do_drop_slot && dbname == NULL)
     897              :     {
     898            1 :         pg_log_error("no database specified");
     899            1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     900            1 :         exit(1);
     901              :     }
     902              : 
     903           62 :     if (!do_drop_slot && !do_create_slot && !do_start_slot)
     904              :     {
     905            1 :         pg_log_error("at least one action needs to be specified");
     906            1 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     907            1 :         exit(1);
     908              :     }
     909              : 
     910           61 :     if (do_drop_slot && (do_create_slot || do_start_slot))
     911              :     {
     912            0 :         pg_log_error("cannot use --create-slot or --start together with --drop-slot");
     913            0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     914            0 :         exit(1);
     915              :     }
     916              : 
     917           61 :     if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
     918              :     {
     919            0 :         pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
     920            0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     921            0 :         exit(1);
     922              :     }
     923              : 
     924           61 :     if (XLogRecPtrIsValid(endpos) && !do_start_slot)
     925              :     {
     926            0 :         pg_log_error("--endpos may only be specified with --start");
     927            0 :         pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     928            0 :         exit(1);
     929              :     }
     930              : 
     931           61 :     if (!do_create_slot)
     932              :     {
     933           30 :         if (two_phase)
     934              :         {
     935            1 :             pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
     936            1 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     937            1 :             exit(1);
     938              :         }
     939              : 
     940           29 :         if (failover)
     941              :         {
     942            0 :             pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
     943            0 :             pg_log_error_hint("Try \"%s --help\" for more information.", progname);
     944            0 :             exit(1);
     945              :         }
     946              :     }
     947              : 
     948              :     /*
     949              :      * Obtain a connection to server.  Notably, if we need a password, we want
     950              :      * to collect it from the user immediately.
     951              :      */
     952           60 :     conn = GetConnection();
     953           60 :     if (!conn)
     954              :         /* Error message already written in GetConnection() */
     955            0 :         exit(1);
     956           60 :     atexit(disconnect_atexit);
     957              : 
     958              :     /*
     959              :      * Trap signals.  (Don't do this until after the initial password prompt,
     960              :      * if one is needed, in GetConnection.)
     961              :      */
     962              : #ifndef WIN32
     963           60 :     pqsignal(SIGINT, sigexit_handler);
     964           60 :     pqsignal(SIGTERM, sigexit_handler);
     965           60 :     pqsignal(SIGHUP, sighup_handler);
     966              : #endif
     967              : 
     968              :     /*
     969              :      * Run IDENTIFY_SYSTEM to check the connection type for each action.
     970              :      * --create-slot and --start actions require a database-specific
     971              :      * replication connection because they handle logical replication slots.
     972              :      * --drop-slot can remove replication slots from any replication
     973              :      * connection without this restriction.
     974              :      */
     975           60 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     976            0 :         exit(1);
     977              : 
     978           60 :     if (!do_drop_slot && db_name == NULL)
     979            0 :         pg_fatal("could not establish database-specific replication connection");
     980              : 
     981              :     /*
     982              :      * Set umask so that directories/files are created with the same
     983              :      * permissions as directories/files in the source data directory.
     984              :      *
     985              :      * pg_mode_mask is set to owner-only by default and then updated in
     986              :      * GetConnection() where we get the mode from the server-side with
     987              :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     988              :      */
     989           60 :     umask(pg_mode_mask);
     990              : 
     991              :     /* Drop a replication slot. */
     992           60 :     if (do_drop_slot)
     993              :     {
     994            3 :         if (verbose)
     995            0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     996              : 
     997            3 :         if (!DropReplicationSlot(conn, replication_slot))
     998            0 :             exit(1);
     999              :     }
    1000              : 
    1001              :     /* Create a replication slot. */
    1002           60 :     if (do_create_slot)
    1003              :     {
    1004           31 :         if (verbose)
    1005            0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
    1006              : 
    1007           31 :         if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
    1008              :                                    false, false, slot_exists_ok, two_phase,
    1009              :                                    failover))
    1010            0 :             exit(1);
    1011           31 :         startpos = InvalidXLogRecPtr;
    1012              :     }
    1013              : 
    1014           60 :     if (!do_start_slot)
    1015           34 :         exit(0);
    1016              : 
    1017              :     /* Stream loop */
    1018              :     while (true)
    1019              :     {
    1020           27 :         StreamLogicalLog();
    1021           27 :         if (time_to_abort)
    1022              :         {
    1023              :             /*
    1024              :              * We've been Ctrl-C'ed or reached an exit limit condition. That's
    1025              :              * not an error, so exit without an errorcode.
    1026              :              */
    1027           10 :             exit(0);
    1028              :         }
    1029              : 
    1030              :         /*
    1031              :          * Ensure all written data is flushed to disk before exiting or
    1032              :          * starting a new replication.
    1033              :          */
    1034           17 :         if (outfd != -1)
    1035           10 :             OutputFsync(feGetCurrentTimestamp());
    1036              : 
    1037           17 :         if (noloop)
    1038              :         {
    1039           16 :             pg_fatal("disconnected");
    1040              :         }
    1041              :         else
    1042              :         {
    1043              :             /* translator: check source for value for %d */
    1044            1 :             pg_log_info("disconnected; waiting %d seconds to try again",
    1045              :                         RECONNECT_SLEEP_TIME);
    1046            1 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
    1047              :         }
    1048              :     }
    1049              : }
    1050              : 
    1051              : /*
    1052              :  * Fsync our output data, and send a feedback message to the server.  Returns
    1053              :  * true if successful, false otherwise.
    1054              :  *
    1055              :  * If successful, *now is updated to the current timestamp just before sending
    1056              :  * feedback.
    1057              :  */
    1058              : static bool
    1059            8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
    1060              : {
    1061              :     /* flush data to disk, so that we send a recent flush pointer */
    1062            8 :     OutputFsync(*now);
    1063            8 :     *now = feGetCurrentTimestamp();
    1064            8 :     if (!sendFeedback(conn, *now, true, false))
    1065            0 :         return false;
    1066              : 
    1067            8 :     return true;
    1068              : }
    1069              : 
    1070              : /*
    1071              :  * Try to inform the server about our upcoming demise, but don't wait around or
    1072              :  * retry on failure.
    1073              :  */
    1074              : static void
    1075           10 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
    1076              :                    XLogRecPtr lsn)
    1077              : {
    1078           10 :     (void) PQputCopyEnd(conn, NULL);
    1079           10 :     (void) PQflush(conn);
    1080              : 
    1081           10 :     if (verbose)
    1082              :     {
    1083            1 :         switch (reason)
    1084              :         {
    1085            1 :             case STREAM_STOP_SIGNAL:
    1086            1 :                 pg_log_info("received interrupt signal, exiting");
    1087            1 :                 break;
    1088            0 :             case STREAM_STOP_KEEPALIVE:
    1089            0 :                 pg_log_info("end position %X/%08X reached by keepalive",
    1090              :                             LSN_FORMAT_ARGS(endpos));
    1091            0 :                 break;
    1092            0 :             case STREAM_STOP_END_OF_WAL:
    1093              :                 Assert(XLogRecPtrIsValid(lsn));
    1094            0 :                 pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
    1095              :                             LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
    1096            0 :                 break;
    1097            0 :             case STREAM_STOP_NONE:
    1098              :                 Assert(false);
    1099            0 :                 break;
    1100              :         }
    1101              :     }
    1102           10 : }
        

Generated by: LCOV version 2.0-1