LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_receivewal.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 170 287 59.2 %
Date: 2019-06-19 16:07:09 Functions: 8 9 88.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_receivewal.c - receive streaming WAL data and write it
       4             :  *                    to a local file.
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2019, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/pg_receivewal.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <dirent.h>
      18             : #include <signal.h>
      19             : #include <sys/stat.h>
      20             : #include <unistd.h>
      21             : 
      22             : #include "common/file_perm.h"
      23             : #include "common/logging.h"
      24             : #include "libpq-fe.h"
      25             : #include "access/xlog_internal.h"
      26             : #include "getopt_long.h"
      27             : 
      28             : #include "receivelog.h"
      29             : #include "streamutil.h"
      30             : 
      31             : 
      32             : /* Time to sleep between reconnection attempts */
      33             : #define RECONNECT_SLEEP_TIME 5
      34             : 
      35             : /* Global options */
      36             : static char *basedir = NULL;
      37             : static int  verbose = 0;
      38             : static int  compresslevel = 0;
      39             : static int  noloop = 0;
      40             : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      41             : static volatile bool time_to_stop = false;
      42             : static bool do_create_slot = false;
      43             : static bool slot_exists_ok = false;
      44             : static bool do_drop_slot = false;
      45             : static bool do_sync = true;
      46             : static bool synchronous = false;
      47             : static char *replication_slot = NULL;
      48             : static XLogRecPtr endpos = InvalidXLogRecPtr;
      49             : 
      50             : 
      51             : static void usage(void);
      52             : static DIR *get_destination_dir(char *dest_folder);
      53             : static void close_destination_dir(DIR *dest_dir, char *dest_folder);
      54             : static XLogRecPtr FindStreamingStart(uint32 *tli);
      55             : static void StreamLog(void);
      56             : static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
      57             :                            bool segment_finished);
      58             : 
      59             : static void
      60           6 : disconnect_atexit(void)
      61             : {
      62           6 :     if (conn != NULL)
      63           4 :         PQfinish(conn);
      64           6 : }
      65             : 
      66             : /* Routines to evaluate segment file format */
      67             : #define IsCompressXLogFileName(fname)    \
      68             :     (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
      69             :      strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&       \
      70             :      strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
      71             : #define IsPartialCompressXLogFileName(fname)    \
      72             :     (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
      73             :      strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&       \
      74             :      strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
      75             : 
      76             : static void
      77           2 : usage(void)
      78             : {
      79           2 :     printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
      80             :            progname);
      81           2 :     printf(_("Usage:\n"));
      82           2 :     printf(_("  %s [OPTION]...\n"), progname);
      83           2 :     printf(_("\nOptions:\n"));
      84           2 :     printf(_("  -D, --directory=DIR    receive write-ahead log files into this directory\n"));
      85           2 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      86           2 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
      87           2 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
      88           2 :     printf(_("      --no-sync          do not wait for changes to be written safely to disk\n"));
      89           2 :     printf(_("  -s, --status-interval=SECS\n"
      90             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
      91           2 :     printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
      92           2 :     printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
      93           2 :     printf(_("  -v, --verbose          output verbose messages\n"));
      94           2 :     printf(_("  -V, --version          output version information, then exit\n"));
      95           2 :     printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
      96           2 :     printf(_("  -?, --help             show this help, then exit\n"));
      97           2 :     printf(_("\nConnection options:\n"));
      98           2 :     printf(_("  -d, --dbname=CONNSTR   connection string\n"));
      99           2 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
     100           2 :     printf(_("  -p, --port=PORT        database server port number\n"));
     101           2 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     102           2 :     printf(_("  -w, --no-password      never prompt for password\n"));
     103           2 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     104           2 :     printf(_("\nOptional actions:\n"));
     105           2 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
     106           2 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
     107           2 :     printf(_("\nReport bugs to <pgsql-bugs@lists.postgresql.org>.\n"));
     108           2 : }
     109             : 
     110             : static bool
     111           8 : stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
     112             : {
     113             :     static uint32 prevtimeline = 0;
     114             :     static XLogRecPtr prevpos = InvalidXLogRecPtr;
     115             : 
     116             :     /* we assume that we get called once at the end of each segment */
     117           8 :     if (verbose && segment_finished)
     118           0 :         pg_log_info("finished segment at %X/%X (timeline %u)",
     119             :                     (uint32) (xlogpos >> 32), (uint32) xlogpos,
     120             :                     timeline);
     121             : 
     122           8 :     if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
     123             :     {
     124           4 :         if (verbose)
     125           4 :             pg_log_info("stopped log streaming at %X/%X (timeline %u)",
     126             :                         (uint32) (xlogpos >> 32), (uint32) xlogpos,
     127             :                         timeline);
     128           4 :         time_to_stop = true;
     129           4 :         return true;
     130             :     }
     131             : 
     132             :     /*
     133             :      * Note that we report the previous, not current, position here. After a
     134             :      * timeline switch, xlogpos points to the beginning of the segment because
     135             :      * that's where we always begin streaming. Reporting the end of previous
     136             :      * timeline isn't totally accurate, because the next timeline can begin
     137             :      * slightly before the end of the WAL that we received on the previous
     138             :      * timeline, but it's close enough for reporting purposes.
     139             :      */
     140           4 :     if (verbose && prevtimeline != 0 && prevtimeline != timeline)
     141           0 :         pg_log_info("switched to timeline %u at %X/%X",
     142             :                     timeline,
     143             :                     (uint32) (prevpos >> 32), (uint32) prevpos);
     144             : 
     145           4 :     prevtimeline = timeline;
     146           4 :     prevpos = xlogpos;
     147             : 
     148           4 :     if (time_to_stop)
     149             :     {
     150           0 :         if (verbose)
     151           0 :             pg_log_info("received interrupt signal, exiting");
     152           0 :         return true;
     153             :     }
     154           4 :     return false;
     155             : }
     156             : 
     157             : 
     158             : /*
     159             :  * Get destination directory.
     160             :  */
     161             : static DIR *
     162           4 : get_destination_dir(char *dest_folder)
     163             : {
     164             :     DIR        *dir;
     165             : 
     166             :     Assert(dest_folder != NULL);
     167           4 :     dir = opendir(dest_folder);
     168           4 :     if (dir == NULL)
     169             :     {
     170           0 :         pg_log_error("could not open directory \"%s\": %m", basedir);
     171           0 :         exit(1);
     172             :     }
     173             : 
     174           4 :     return dir;
     175             : }
     176             : 
     177             : 
     178             : /*
     179             :  * Close existing directory.
     180             :  */
     181             : static void
     182           4 : close_destination_dir(DIR *dest_dir, char *dest_folder)
     183             : {
     184             :     Assert(dest_dir != NULL && dest_folder != NULL);
     185           4 :     if (closedir(dest_dir))
     186             :     {
     187           0 :         pg_log_error("could not close directory \"%s\": %m", dest_folder);
     188           0 :         exit(1);
     189             :     }
     190           4 : }
     191             : 
     192             : 
     193             : /*
     194             :  * Determine starting location for streaming, based on any existing xlog
     195             :  * segments in the directory. We start at the end of the last one that is
     196             :  * complete (size matches wal segment size), on the timeline with highest ID.
     197             :  *
     198             :  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
     199             :  */
     200             : static XLogRecPtr
     201           2 : FindStreamingStart(uint32 *tli)
     202             : {
     203             :     DIR        *dir;
     204             :     struct dirent *dirent;
     205           2 :     XLogSegNo   high_segno = 0;
     206           2 :     uint32      high_tli = 0;
     207           2 :     bool        high_ispartial = false;
     208             : 
     209           2 :     dir = get_destination_dir(basedir);
     210             : 
     211           8 :     while (errno = 0, (dirent = readdir(dir)) != NULL)
     212             :     {
     213             :         uint32      tli;
     214             :         XLogSegNo   segno;
     215             :         bool        ispartial;
     216             :         bool        iscompress;
     217             : 
     218             :         /*
     219             :          * Check if the filename looks like an xlog file, or a .partial file.
     220             :          */
     221           4 :         if (IsXLogFileName(dirent->d_name))
     222             :         {
     223           0 :             ispartial = false;
     224           0 :             iscompress = false;
     225             :         }
     226           4 :         else if (IsPartialXLogFileName(dirent->d_name))
     227             :         {
     228           0 :             ispartial = true;
     229           0 :             iscompress = false;
     230             :         }
     231           4 :         else if (IsCompressXLogFileName(dirent->d_name))
     232             :         {
     233           0 :             ispartial = false;
     234           0 :             iscompress = true;
     235             :         }
     236           4 :         else if (IsPartialCompressXLogFileName(dirent->d_name))
     237             :         {
     238           0 :             ispartial = true;
     239           0 :             iscompress = true;
     240             :         }
     241             :         else
     242           8 :             continue;
     243             : 
     244             :         /*
     245             :          * Looks like an xlog file. Parse its position.
     246             :          */
     247           0 :         XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
     248             : 
     249             :         /*
     250             :          * Check that the segment has the right size, if it's supposed to be
     251             :          * completed.  For non-compressed segments just check the on-disk size
     252             :          * and see if it matches a completed segment. For compressed segments,
     253             :          * look at the last 4 bytes of the compressed file, which is where the
     254             :          * uncompressed size is located for gz files with a size lower than
     255             :          * 4GB, and then compare it to the size of a completed segment. The 4
     256             :          * last bytes correspond to the ISIZE member according to
     257             :          * http://www.zlib.org/rfc-gzip.html.
     258             :          */
     259           0 :         if (!ispartial && !iscompress)
     260           0 :         {
     261             :             struct stat statbuf;
     262             :             char        fullpath[MAXPGPATH * 2];
     263             : 
     264           0 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     265           0 :             if (stat(fullpath, &statbuf) != 0)
     266             :             {
     267           0 :                 pg_log_error("could not stat file \"%s\": %m", fullpath);
     268           0 :                 exit(1);
     269             :             }
     270             : 
     271           0 :             if (statbuf.st_size != WalSegSz)
     272             :             {
     273           0 :                 pg_log_warning("segment file \"%s\" has incorrect size %d, skipping",
     274             :                                dirent->d_name, (int) statbuf.st_size);
     275           0 :                 continue;
     276             :             }
     277             :         }
     278           0 :         else if (!ispartial && iscompress)
     279             :         {
     280             :             int         fd;
     281             :             char        buf[4];
     282             :             int         bytes_out;
     283             :             char        fullpath[MAXPGPATH * 2];
     284             :             int         r;
     285             : 
     286           0 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     287             : 
     288           0 :             fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
     289           0 :             if (fd < 0)
     290             :             {
     291           0 :                 pg_log_error("could not open compressed file \"%s\": %m",
     292             :                              fullpath);
     293           0 :                 exit(1);
     294             :             }
     295           0 :             if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
     296             :             {
     297           0 :                 pg_log_error("could not seek in compressed file \"%s\": %m",
     298             :                              fullpath);
     299           0 :                 exit(1);
     300             :             }
     301           0 :             r = read(fd, (char *) buf, sizeof(buf));
     302           0 :             if (r != sizeof(buf))
     303             :             {
     304           0 :                 if (r < 0)
     305           0 :                     pg_log_error("could not read compressed file \"%s\": %m",
     306             :                                  fullpath);
     307             :                 else
     308           0 :                     pg_log_error("could not read compressed file \"%s\": read %d of %zu",
     309             :                                  fullpath, r, sizeof(buf));
     310           0 :                 exit(1);
     311             :             }
     312             : 
     313           0 :             close(fd);
     314           0 :             bytes_out = (buf[3] << 24) | (buf[2] << 16) |
     315           0 :                 (buf[1] << 8) | buf[0];
     316             : 
     317           0 :             if (bytes_out != WalSegSz)
     318             :             {
     319           0 :                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
     320             :                                dirent->d_name, bytes_out);
     321           0 :                 continue;
     322             :             }
     323             :         }
     324             : 
     325             :         /* Looks like a valid segment. Remember that we saw it. */
     326           0 :         if ((segno > high_segno) ||
     327           0 :             (segno == high_segno && tli > high_tli) ||
     328           0 :             (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
     329             :         {
     330           0 :             high_segno = segno;
     331           0 :             high_tli = tli;
     332           0 :             high_ispartial = ispartial;
     333             :         }
     334             :     }
     335             : 
     336           2 :     if (errno)
     337             :     {
     338           0 :         pg_log_error("could not read directory \"%s\": %m", basedir);
     339           0 :         exit(1);
     340             :     }
     341             : 
     342           2 :     close_destination_dir(dir, basedir);
     343             : 
     344           2 :     if (high_segno > 0)
     345             :     {
     346             :         XLogRecPtr  high_ptr;
     347             : 
     348             :         /*
     349             :          * Move the starting pointer to the start of the next segment, if the
     350             :          * highest one we saw was completed. Otherwise start streaming from
     351             :          * the beginning of the .partial segment.
     352             :          */
     353           0 :         if (!high_ispartial)
     354           0 :             high_segno++;
     355             : 
     356           0 :         XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
     357             : 
     358           0 :         *tli = high_tli;
     359           0 :         return high_ptr;
     360             :     }
     361             :     else
     362           2 :         return InvalidXLogRecPtr;
     363             : }
     364             : 
     365             : /*
     366             :  * Start the log streaming
     367             :  */
     368             : static void
     369           2 : StreamLog(void)
     370             : {
     371             :     XLogRecPtr  serverpos;
     372             :     TimeLineID  servertli;
     373             :     StreamCtl   stream;
     374             : 
     375           2 :     MemSet(&stream, 0, sizeof(stream));
     376             : 
     377             :     /*
     378             :      * Connect in replication mode to the server
     379             :      */
     380           2 :     if (conn == NULL)
     381           0 :         conn = GetConnection();
     382           2 :     if (!conn)
     383             :         /* Error message already written in GetConnection() */
     384           0 :         return;
     385             : 
     386           2 :     if (!CheckServerVersionForStreaming(conn))
     387             :     {
     388             :         /*
     389             :          * Error message already written in CheckServerVersionForStreaming().
     390             :          * There's no hope of recovering from a version mismatch, so don't
     391             :          * retry.
     392             :          */
     393           0 :         exit(1);
     394             :     }
     395             : 
     396             :     /*
     397             :      * Identify server, obtaining start LSN position and current timeline ID
     398             :      * at the same time, necessary if not valid data can be found in the
     399             :      * existing output directory.
     400             :      */
     401           2 :     if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
     402           0 :         exit(1);
     403             : 
     404             :     /*
     405             :      * Figure out where to start streaming.
     406             :      */
     407           2 :     stream.startpos = FindStreamingStart(&stream.timeline);
     408           2 :     if (stream.startpos == InvalidXLogRecPtr)
     409             :     {
     410           2 :         stream.startpos = serverpos;
     411           2 :         stream.timeline = servertli;
     412             :     }
     413             : 
     414             :     /*
     415             :      * Always start streaming at the beginning of a segment
     416             :      */
     417           2 :     stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
     418             : 
     419             :     /*
     420             :      * Start the replication
     421             :      */
     422           2 :     if (verbose)
     423           2 :         pg_log_info("starting log streaming at %X/%X (timeline %u)",
     424             :                     (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
     425             :                     stream.timeline);
     426             : 
     427           2 :     stream.stream_stop = stop_streaming;
     428           2 :     stream.stop_socket = PGINVALID_SOCKET;
     429           2 :     stream.standby_message_timeout = standby_message_timeout;
     430           2 :     stream.synchronous = synchronous;
     431           2 :     stream.do_sync = do_sync;
     432           2 :     stream.mark_done = false;
     433           2 :     stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
     434           2 :                                                 stream.do_sync);
     435           2 :     stream.partial_suffix = ".partial";
     436           2 :     stream.replication_slot = replication_slot;
     437             : 
     438           2 :     ReceiveXlogStream(conn, &stream);
     439             : 
     440           2 :     if (!stream.walmethod->finish())
     441             :     {
     442           0 :         pg_log_info("could not finish writing WAL files: %m");
     443           0 :         return;
     444             :     }
     445             : 
     446           2 :     PQfinish(conn);
     447           2 :     conn = NULL;
     448             : 
     449           2 :     FreeWalDirectoryMethod();
     450           2 :     pg_free(stream.walmethod);
     451             : 
     452           2 :     conn = NULL;
     453             : }
     454             : 
     455             : /*
     456             :  * When sigint is called, just tell the system to exit at the next possible
     457             :  * moment.
     458             :  */
     459             : #ifndef WIN32
     460             : 
     461             : static void
     462           0 : sigint_handler(int signum)
     463             : {
     464           0 :     time_to_stop = true;
     465           0 : }
     466             : #endif
     467             : 
     468             : int
     469          20 : main(int argc, char **argv)
     470             : {
     471             :     static struct option long_options[] = {
     472             :         {"help", no_argument, NULL, '?'},
     473             :         {"version", no_argument, NULL, 'V'},
     474             :         {"directory", required_argument, NULL, 'D'},
     475             :         {"dbname", required_argument, NULL, 'd'},
     476             :         {"endpos", required_argument, NULL, 'E'},
     477             :         {"host", required_argument, NULL, 'h'},
     478             :         {"port", required_argument, NULL, 'p'},
     479             :         {"username", required_argument, NULL, 'U'},
     480             :         {"no-loop", no_argument, NULL, 'n'},
     481             :         {"no-password", no_argument, NULL, 'w'},
     482             :         {"password", no_argument, NULL, 'W'},
     483             :         {"status-interval", required_argument, NULL, 's'},
     484             :         {"slot", required_argument, NULL, 'S'},
     485             :         {"verbose", no_argument, NULL, 'v'},
     486             :         {"compress", required_argument, NULL, 'Z'},
     487             : /* action */
     488             :         {"create-slot", no_argument, NULL, 1},
     489             :         {"drop-slot", no_argument, NULL, 2},
     490             :         {"if-not-exists", no_argument, NULL, 3},
     491             :         {"synchronous", no_argument, NULL, 4},
     492             :         {"no-sync", no_argument, NULL, 5},
     493             :         {NULL, 0, NULL, 0}
     494             :     };
     495             : 
     496             :     int         c;
     497             :     int         option_index;
     498             :     char       *db_name;
     499             :     uint32      hi,
     500             :                 lo;
     501             : 
     502          20 :     pg_logging_init(argv[0]);
     503          20 :     progname = get_progname(argv[0]);
     504          20 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     505             : 
     506          20 :     if (argc > 1)
     507             :     {
     508          18 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     509             :         {
     510           2 :             usage();
     511           2 :             exit(0);
     512             :         }
     513          32 :         else if (strcmp(argv[1], "-V") == 0 ||
     514          16 :                  strcmp(argv[1], "--version") == 0)
     515             :         {
     516           2 :             puts("pg_receivewal (PostgreSQL) " PG_VERSION);
     517           2 :             exit(0);
     518             :         }
     519             :     }
     520             : 
     521          66 :     while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:",
     522             :                             long_options, &option_index)) != -1)
     523             :     {
     524          36 :         switch (c)
     525             :         {
     526             :             case 'D':
     527           8 :                 basedir = pg_strdup(optarg);
     528           8 :                 break;
     529             :             case 'd':
     530           0 :                 connection_string = pg_strdup(optarg);
     531           0 :                 break;
     532             :             case 'h':
     533           0 :                 dbhost = pg_strdup(optarg);
     534           0 :                 break;
     535             :             case 'p':
     536           0 :                 if (atoi(optarg) <= 0)
     537             :                 {
     538           0 :                     pg_log_error("invalid port number \"%s\"", optarg);
     539           0 :                     exit(1);
     540             :                 }
     541           0 :                 dbport = pg_strdup(optarg);
     542           0 :                 break;
     543             :             case 'U':
     544           0 :                 dbuser = pg_strdup(optarg);
     545           0 :                 break;
     546             :             case 'w':
     547           0 :                 dbgetpassword = -1;
     548           0 :                 break;
     549             :             case 'W':
     550           0 :                 dbgetpassword = 1;
     551           0 :                 break;
     552             :             case 's':
     553           0 :                 standby_message_timeout = atoi(optarg) * 1000;
     554           0 :                 if (standby_message_timeout < 0)
     555             :                 {
     556           0 :                     pg_log_error("invalid status interval \"%s\"", optarg);
     557           0 :                     exit(1);
     558             :                 }
     559           0 :                 break;
     560             :             case 'S':
     561           4 :                 replication_slot = pg_strdup(optarg);
     562           4 :                 break;
     563             :             case 'E':
     564           2 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     565             :                 {
     566           0 :                     pg_log_error("could not parse end position \"%s\"", optarg);
     567           0 :                     exit(1);
     568             :                 }
     569           2 :                 endpos = ((uint64) hi) << 32 | lo;
     570           2 :                 break;
     571             :             case 'n':
     572           2 :                 noloop = 1;
     573           2 :                 break;
     574             :             case 'v':
     575           2 :                 verbose++;
     576           2 :                 break;
     577             :             case 'Z':
     578           0 :                 compresslevel = atoi(optarg);
     579           0 :                 if (compresslevel < 0 || compresslevel > 9)
     580             :                 {
     581           0 :                     pg_log_error("invalid compression level \"%s\"", optarg);
     582           0 :                     exit(1);
     583             :                 }
     584           0 :                 break;
     585             : /* action */
     586             :             case 1:
     587           6 :                 do_create_slot = true;
     588           6 :                 break;
     589             :             case 2:
     590           4 :                 do_drop_slot = true;
     591           4 :                 break;
     592             :             case 3:
     593           0 :                 slot_exists_ok = true;
     594           0 :                 break;
     595             :             case 4:
     596           4 :                 synchronous = true;
     597           4 :                 break;
     598             :             case 5:
     599           2 :                 do_sync = false;
     600           2 :                 break;
     601             :             default:
     602             : 
     603             :                 /*
     604             :                  * getopt_long already emitted a complaint
     605             :                  */
     606           2 :                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     607             :                         progname);
     608           2 :                 exit(1);
     609             :         }
     610             :     }
     611             : 
     612             :     /*
     613             :      * Any non-option arguments?
     614             :      */
     615          14 :     if (optind < argc)
     616             :     {
     617           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     618             :                      argv[optind]);
     619           0 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     620             :                 progname);
     621           0 :         exit(1);
     622             :     }
     623             : 
     624          14 :     if (do_drop_slot && do_create_slot)
     625             :     {
     626           2 :         pg_log_error("cannot use --create-slot together with --drop-slot");
     627           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     628             :                 progname);
     629           2 :         exit(1);
     630             :     }
     631             : 
     632          12 :     if (replication_slot == NULL && (do_drop_slot || do_create_slot))
     633             :     {
     634             :         /* translator: second %s is an option name */
     635           2 :         pg_log_error("%s needs a slot to be specified using --slot",
     636             :                      do_drop_slot ? "--drop-slot" : "--create-slot");
     637           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     638             :                 progname);
     639           2 :         exit(1);
     640             :     }
     641             : 
     642          10 :     if (synchronous && !do_sync)
     643             :     {
     644           2 :         pg_log_error("cannot use --synchronous together with --no-sync");
     645           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     646             :                 progname);
     647           2 :         exit(1);
     648             :     }
     649             : 
     650             :     /*
     651             :      * Required arguments
     652             :      */
     653           8 :     if (basedir == NULL && !do_drop_slot && !do_create_slot)
     654             :     {
     655           2 :         pg_log_error("no target directory specified");
     656           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     657             :                 progname);
     658           2 :         exit(1);
     659             :     }
     660             : 
     661             : #ifndef HAVE_LIBZ
     662             :     if (compresslevel != 0)
     663             :     {
     664             :         pg_log_error("this build does not support compression");
     665             :         exit(1);
     666             :     }
     667             : #endif
     668             : 
     669             :     /*
     670             :      * Check existence of destination folder.
     671             :      */
     672           6 :     if (!do_drop_slot && !do_create_slot)
     673             :     {
     674           2 :         DIR        *dir = get_destination_dir(basedir);
     675             : 
     676           2 :         close_destination_dir(dir, basedir);
     677             :     }
     678             : 
     679             : #ifndef WIN32
     680           6 :     pqsignal(SIGINT, sigint_handler);
     681             : #endif
     682             : 
     683             :     /*
     684             :      * Obtain a connection before doing anything.
     685             :      */
     686           6 :     conn = GetConnection();
     687           6 :     if (!conn)
     688             :         /* error message already written in GetConnection() */
     689           0 :         exit(1);
     690           6 :     atexit(disconnect_atexit);
     691             : 
     692             :     /*
     693             :      * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
     694             :      * replication connection and haven't connected using a database specific
     695             :      * connection.
     696             :      */
     697           6 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     698           0 :         exit(1);
     699             : 
     700             :     /*
     701             :      * Set umask so that directories/files are created with the same
     702             :      * permissions as directories/files in the source data directory.
     703             :      *
     704             :      * pg_mode_mask is set to owner-only by default and then updated in
     705             :      * GetConnection() where we get the mode from the server-side with
     706             :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     707             :      */
     708           6 :     umask(pg_mode_mask);
     709             : 
     710             :     /* determine remote server's xlog segment size */
     711           6 :     if (!RetrieveWalSegSize(conn))
     712           0 :         exit(1);
     713             : 
     714             :     /*
     715             :      * Check that there is a database associated with connection, none should
     716             :      * be defined in this context.
     717             :      */
     718           6 :     if (db_name)
     719             :     {
     720           0 :         pg_log_error("replication connection using slot \"%s\" is unexpectedly database specific",
     721             :                      replication_slot);
     722           0 :         exit(1);
     723             :     }
     724             : 
     725             :     /*
     726             :      * Drop a replication slot.
     727             :      */
     728           6 :     if (do_drop_slot)
     729             :     {
     730           2 :         if (verbose)
     731           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     732             : 
     733           2 :         if (!DropReplicationSlot(conn, replication_slot))
     734           0 :             exit(1);
     735           2 :         exit(0);
     736             :     }
     737             : 
     738             :     /* Create a replication slot */
     739           4 :     if (do_create_slot)
     740             :     {
     741           2 :         if (verbose)
     742           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
     743             : 
     744           2 :         if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
     745             :                                    slot_exists_ok))
     746           0 :             exit(1);
     747           2 :         exit(0);
     748             :     }
     749             : 
     750             :     /*
     751             :      * Don't close the connection here so that subsequent StreamLog() can
     752             :      * reuse it.
     753             :      */
     754             : 
     755             :     while (true)
     756             :     {
     757           0 :         StreamLog();
     758           2 :         if (time_to_stop)
     759             :         {
     760             :             /*
     761             :              * We've been Ctrl-C'ed or end of streaming position has been
     762             :              * willingly reached, so exit without an error code.
     763             :              */
     764           2 :             exit(0);
     765             :         }
     766           0 :         else if (noloop)
     767             :         {
     768           0 :             pg_log_error("disconnected");
     769           0 :             exit(1);
     770             :         }
     771             :         else
     772             :         {
     773             :             /* translator: check source for value for %d */
     774           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
     775             :                         RECONNECT_SLEEP_TIME);
     776           0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
     777             :         }
     778             :     }
     779             : }

Generated by: LCOV version 1.13