LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - pg_receivewal.c (source / functions) Hit Total Coverage
Test: PostgreSQL 13beta1 Lines: 182 307 59.3 %
Date: 2020-06-03 11:07:14 Functions: 8 9 88.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * pg_receivewal.c - receive streaming WAL data and write it
       4             :  *                    to a local file.
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2020, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/pg_receivewal.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <dirent.h>
      18             : #include <signal.h>
      19             : #include <sys/stat.h>
      20             : #include <unistd.h>
      21             : 
      22             : #include "access/xlog_internal.h"
      23             : #include "common/file_perm.h"
      24             : #include "common/logging.h"
      25             : #include "getopt_long.h"
      26             : #include "libpq-fe.h"
      27             : #include "receivelog.h"
      28             : #include "streamutil.h"
      29             : 
      30             : /* Time to sleep between reconnection attempts */
      31             : #define RECONNECT_SLEEP_TIME 5
      32             : 
      33             : /* Global options */
      34             : static char *basedir = NULL;
      35             : static int  verbose = 0;
      36             : static int  compresslevel = 0;
      37             : static int  noloop = 0;
      38             : static int  standby_message_timeout = 10 * 1000;    /* 10 sec = default */
      39             : static volatile bool time_to_stop = false;
      40             : static bool do_create_slot = false;
      41             : static bool slot_exists_ok = false;
      42             : static bool do_drop_slot = false;
      43             : static bool do_sync = true;
      44             : static bool synchronous = false;
      45             : static char *replication_slot = NULL;
      46             : static XLogRecPtr endpos = InvalidXLogRecPtr;
      47             : 
      48             : 
      49             : static void usage(void);
      50             : static DIR *get_destination_dir(char *dest_folder);
      51             : static void close_destination_dir(DIR *dest_dir, char *dest_folder);
      52             : static XLogRecPtr FindStreamingStart(uint32 *tli);
      53             : static void StreamLog(void);
      54             : static bool stop_streaming(XLogRecPtr segendpos, uint32 timeline,
      55             :                            bool segment_finished);
      56             : 
      57             : static void
      58           6 : disconnect_atexit(void)
      59             : {
      60           6 :     if (conn != NULL)
      61           4 :         PQfinish(conn);
      62           6 : }
      63             : 
      64             : /* Routines to evaluate segment file format */
      65             : #define IsCompressXLogFileName(fname)    \
      66             :     (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz") && \
      67             :      strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&       \
      68             :      strcmp((fname) + XLOG_FNAME_LEN, ".gz") == 0)
      69             : #define IsPartialCompressXLogFileName(fname)    \
      70             :     (strlen(fname) == XLOG_FNAME_LEN + strlen(".gz.partial") && \
      71             :      strspn(fname, "0123456789ABCDEF") == XLOG_FNAME_LEN &&       \
      72             :      strcmp((fname) + XLOG_FNAME_LEN, ".gz.partial") == 0)
      73             : 
      74             : static void
      75           2 : usage(void)
      76             : {
      77           2 :     printf(_("%s receives PostgreSQL streaming write-ahead logs.\n\n"),
      78             :            progname);
      79           2 :     printf(_("Usage:\n"));
      80           2 :     printf(_("  %s [OPTION]...\n"), progname);
      81           2 :     printf(_("\nOptions:\n"));
      82           2 :     printf(_("  -D, --directory=DIR    receive write-ahead log files into this directory\n"));
      83           2 :     printf(_("  -E, --endpos=LSN       exit after receiving the specified LSN\n"));
      84           2 :     printf(_("      --if-not-exists    do not error if slot already exists when creating a slot\n"));
      85           2 :     printf(_("  -n, --no-loop          do not loop on connection lost\n"));
      86           2 :     printf(_("      --no-sync          do not wait for changes to be written safely to disk\n"));
      87           2 :     printf(_("  -s, --status-interval=SECS\n"
      88             :              "                         time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
      89           2 :     printf(_("  -S, --slot=SLOTNAME    replication slot to use\n"));
      90           2 :     printf(_("      --synchronous      flush write-ahead log immediately after writing\n"));
      91           2 :     printf(_("  -v, --verbose          output verbose messages\n"));
      92           2 :     printf(_("  -V, --version          output version information, then exit\n"));
      93           2 :     printf(_("  -Z, --compress=0-9     compress logs with given compression level\n"));
      94           2 :     printf(_("  -?, --help             show this help, then exit\n"));
      95           2 :     printf(_("\nConnection options:\n"));
      96           2 :     printf(_("  -d, --dbname=CONNSTR   connection string\n"));
      97           2 :     printf(_("  -h, --host=HOSTNAME    database server host or socket directory\n"));
      98           2 :     printf(_("  -p, --port=PORT        database server port number\n"));
      99           2 :     printf(_("  -U, --username=NAME    connect as specified database user\n"));
     100           2 :     printf(_("  -w, --no-password      never prompt for password\n"));
     101           2 :     printf(_("  -W, --password         force password prompt (should happen automatically)\n"));
     102           2 :     printf(_("\nOptional actions:\n"));
     103           2 :     printf(_("      --create-slot      create a new replication slot (for the slot's name see --slot)\n"));
     104           2 :     printf(_("      --drop-slot        drop the replication slot (for the slot's name see --slot)\n"));
     105           2 :     printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
     106           2 :     printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
     107           2 : }
     108             : 
     109             : static bool
     110           8 : stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
     111             : {
     112             :     static uint32 prevtimeline = 0;
     113             :     static XLogRecPtr prevpos = InvalidXLogRecPtr;
     114             : 
     115             :     /* we assume that we get called once at the end of each segment */
     116           8 :     if (verbose && segment_finished)
     117           0 :         pg_log_info("finished segment at %X/%X (timeline %u)",
     118             :                     (uint32) (xlogpos >> 32), (uint32) xlogpos,
     119             :                     timeline);
     120             : 
     121           8 :     if (!XLogRecPtrIsInvalid(endpos) && endpos < xlogpos)
     122             :     {
     123           4 :         if (verbose)
     124           4 :             pg_log_info("stopped log streaming at %X/%X (timeline %u)",
     125             :                         (uint32) (xlogpos >> 32), (uint32) xlogpos,
     126             :                         timeline);
     127           4 :         time_to_stop = true;
     128           4 :         return true;
     129             :     }
     130             : 
     131             :     /*
     132             :      * Note that we report the previous, not current, position here. After a
     133             :      * timeline switch, xlogpos points to the beginning of the segment because
     134             :      * that's where we always begin streaming. Reporting the end of previous
     135             :      * timeline isn't totally accurate, because the next timeline can begin
     136             :      * slightly before the end of the WAL that we received on the previous
     137             :      * timeline, but it's close enough for reporting purposes.
     138             :      */
     139           4 :     if (verbose && prevtimeline != 0 && prevtimeline != timeline)
     140           0 :         pg_log_info("switched to timeline %u at %X/%X",
     141             :                     timeline,
     142             :                     (uint32) (prevpos >> 32), (uint32) prevpos);
     143             : 
     144           4 :     prevtimeline = timeline;
     145           4 :     prevpos = xlogpos;
     146             : 
     147           4 :     if (time_to_stop)
     148             :     {
     149           0 :         if (verbose)
     150           0 :             pg_log_info("received interrupt signal, exiting");
     151           0 :         return true;
     152             :     }
     153           4 :     return false;
     154             : }
     155             : 
     156             : 
     157             : /*
     158             :  * Get destination directory.
     159             :  */
     160             : static DIR *
     161           4 : get_destination_dir(char *dest_folder)
     162             : {
     163             :     DIR        *dir;
     164             : 
     165             :     Assert(dest_folder != NULL);
     166           4 :     dir = opendir(dest_folder);
     167           4 :     if (dir == NULL)
     168             :     {
     169           0 :         pg_log_error("could not open directory \"%s\": %m", basedir);
     170           0 :         exit(1);
     171             :     }
     172             : 
     173           4 :     return dir;
     174             : }
     175             : 
     176             : 
     177             : /*
     178             :  * Close existing directory.
     179             :  */
     180             : static void
     181           4 : close_destination_dir(DIR *dest_dir, char *dest_folder)
     182             : {
     183             :     Assert(dest_dir != NULL && dest_folder != NULL);
     184           4 :     if (closedir(dest_dir))
     185             :     {
     186           0 :         pg_log_error("could not close directory \"%s\": %m", dest_folder);
     187           0 :         exit(1);
     188             :     }
     189           4 : }
     190             : 
     191             : 
     192             : /*
     193             :  * Determine starting location for streaming, based on any existing xlog
     194             :  * segments in the directory. We start at the end of the last one that is
     195             :  * complete (size matches wal segment size), on the timeline with highest ID.
     196             :  *
     197             :  * If there are no WAL files in the directory, returns InvalidXLogRecPtr.
     198             :  */
     199             : static XLogRecPtr
     200           2 : FindStreamingStart(uint32 *tli)
     201             : {
     202             :     DIR        *dir;
     203             :     struct dirent *dirent;
     204           2 :     XLogSegNo   high_segno = 0;
     205           2 :     uint32      high_tli = 0;
     206           2 :     bool        high_ispartial = false;
     207             : 
     208           2 :     dir = get_destination_dir(basedir);
     209             : 
     210           6 :     while (errno = 0, (dirent = readdir(dir)) != NULL)
     211             :     {
     212             :         uint32      tli;
     213             :         XLogSegNo   segno;
     214             :         bool        ispartial;
     215             :         bool        iscompress;
     216             : 
     217             :         /*
     218             :          * Check if the filename looks like an xlog file, or a .partial file.
     219             :          */
     220           4 :         if (IsXLogFileName(dirent->d_name))
     221             :         {
     222           0 :             ispartial = false;
     223           0 :             iscompress = false;
     224             :         }
     225           4 :         else if (IsPartialXLogFileName(dirent->d_name))
     226             :         {
     227           0 :             ispartial = true;
     228           0 :             iscompress = false;
     229             :         }
     230           4 :         else if (IsCompressXLogFileName(dirent->d_name))
     231             :         {
     232           0 :             ispartial = false;
     233           0 :             iscompress = true;
     234             :         }
     235           4 :         else if (IsPartialCompressXLogFileName(dirent->d_name))
     236             :         {
     237           0 :             ispartial = true;
     238           0 :             iscompress = true;
     239             :         }
     240             :         else
     241           4 :             continue;
     242             : 
     243             :         /*
     244             :          * Looks like an xlog file. Parse its position.
     245             :          */
     246           0 :         XLogFromFileName(dirent->d_name, &tli, &segno, WalSegSz);
     247             : 
     248             :         /*
     249             :          * Check that the segment has the right size, if it's supposed to be
     250             :          * completed.  For non-compressed segments just check the on-disk size
     251             :          * and see if it matches a completed segment. For compressed segments,
     252             :          * look at the last 4 bytes of the compressed file, which is where the
     253             :          * uncompressed size is located for gz files with a size lower than
     254             :          * 4GB, and then compare it to the size of a completed segment. The 4
     255             :          * last bytes correspond to the ISIZE member according to
     256             :          * http://www.zlib.org/rfc-gzip.html.
     257             :          */
     258           0 :         if (!ispartial && !iscompress)
     259           0 :         {
     260             :             struct stat statbuf;
     261             :             char        fullpath[MAXPGPATH * 2];
     262             : 
     263           0 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     264           0 :             if (stat(fullpath, &statbuf) != 0)
     265             :             {
     266           0 :                 pg_log_error("could not stat file \"%s\": %m", fullpath);
     267           0 :                 exit(1);
     268             :             }
     269             : 
     270           0 :             if (statbuf.st_size != WalSegSz)
     271             :             {
     272           0 :                 pg_log_warning("segment file \"%s\" has incorrect size %d, skipping",
     273             :                                dirent->d_name, (int) statbuf.st_size);
     274           0 :                 continue;
     275             :             }
     276             :         }
     277           0 :         else if (!ispartial && iscompress)
     278             :         {
     279             :             int         fd;
     280             :             char        buf[4];
     281             :             int         bytes_out;
     282             :             char        fullpath[MAXPGPATH * 2];
     283             :             int         r;
     284             : 
     285           0 :             snprintf(fullpath, sizeof(fullpath), "%s/%s", basedir, dirent->d_name);
     286             : 
     287           0 :             fd = open(fullpath, O_RDONLY | PG_BINARY, 0);
     288           0 :             if (fd < 0)
     289             :             {
     290           0 :                 pg_log_error("could not open compressed file \"%s\": %m",
     291             :                              fullpath);
     292           0 :                 exit(1);
     293             :             }
     294           0 :             if (lseek(fd, (off_t) (-4), SEEK_END) < 0)
     295             :             {
     296           0 :                 pg_log_error("could not seek in compressed file \"%s\": %m",
     297             :                              fullpath);
     298           0 :                 exit(1);
     299             :             }
     300           0 :             r = read(fd, (char *) buf, sizeof(buf));
     301           0 :             if (r != sizeof(buf))
     302             :             {
     303           0 :                 if (r < 0)
     304           0 :                     pg_log_error("could not read compressed file \"%s\": %m",
     305             :                                  fullpath);
     306             :                 else
     307           0 :                     pg_log_error("could not read compressed file \"%s\": read %d of %zu",
     308             :                                  fullpath, r, sizeof(buf));
     309           0 :                 exit(1);
     310             :             }
     311             : 
     312           0 :             close(fd);
     313           0 :             bytes_out = (buf[3] << 24) | (buf[2] << 16) |
     314           0 :                 (buf[1] << 8) | buf[0];
     315             : 
     316           0 :             if (bytes_out != WalSegSz)
     317             :             {
     318           0 :                 pg_log_warning("compressed segment file \"%s\" has incorrect uncompressed size %d, skipping",
     319             :                                dirent->d_name, bytes_out);
     320           0 :                 continue;
     321             :             }
     322             :         }
     323             : 
     324             :         /* Looks like a valid segment. Remember that we saw it. */
     325           0 :         if ((segno > high_segno) ||
     326           0 :             (segno == high_segno && tli > high_tli) ||
     327           0 :             (segno == high_segno && tli == high_tli && high_ispartial && !ispartial))
     328             :         {
     329           0 :             high_segno = segno;
     330           0 :             high_tli = tli;
     331           0 :             high_ispartial = ispartial;
     332             :         }
     333             :     }
     334             : 
     335           2 :     if (errno)
     336             :     {
     337           0 :         pg_log_error("could not read directory \"%s\": %m", basedir);
     338           0 :         exit(1);
     339             :     }
     340             : 
     341           2 :     close_destination_dir(dir, basedir);
     342             : 
     343           2 :     if (high_segno > 0)
     344             :     {
     345             :         XLogRecPtr  high_ptr;
     346             : 
     347             :         /*
     348             :          * Move the starting pointer to the start of the next segment, if the
     349             :          * highest one we saw was completed. Otherwise start streaming from
     350             :          * the beginning of the .partial segment.
     351             :          */
     352           0 :         if (!high_ispartial)
     353           0 :             high_segno++;
     354             : 
     355           0 :         XLogSegNoOffsetToRecPtr(high_segno, 0, WalSegSz, high_ptr);
     356             : 
     357           0 :         *tli = high_tli;
     358           0 :         return high_ptr;
     359             :     }
     360             :     else
     361           2 :         return InvalidXLogRecPtr;
     362             : }
     363             : 
     364             : /*
     365             :  * Start the log streaming
     366             :  */
     367             : static void
     368           2 : StreamLog(void)
     369             : {
     370             :     XLogRecPtr  serverpos;
     371             :     TimeLineID  servertli;
     372             :     StreamCtl   stream;
     373             : 
     374          20 :     MemSet(&stream, 0, sizeof(stream));
     375             : 
     376             :     /*
     377             :      * Connect in replication mode to the server
     378             :      */
     379           2 :     if (conn == NULL)
     380           0 :         conn = GetConnection();
     381           2 :     if (!conn)
     382             :         /* Error message already written in GetConnection() */
     383           0 :         return;
     384             : 
     385           2 :     if (!CheckServerVersionForStreaming(conn))
     386             :     {
     387             :         /*
     388             :          * Error message already written in CheckServerVersionForStreaming().
     389             :          * There's no hope of recovering from a version mismatch, so don't
     390             :          * retry.
     391             :          */
     392           0 :         exit(1);
     393             :     }
     394             : 
     395             :     /*
     396             :      * Identify server, obtaining start LSN position and current timeline ID
     397             :      * at the same time, necessary if not valid data can be found in the
     398             :      * existing output directory.
     399             :      */
     400           2 :     if (!RunIdentifySystem(conn, NULL, &servertli, &serverpos, NULL))
     401           0 :         exit(1);
     402             : 
     403             :     /*
     404             :      * Figure out where to start streaming.
     405             :      */
     406           2 :     stream.startpos = FindStreamingStart(&stream.timeline);
     407           2 :     if (stream.startpos == InvalidXLogRecPtr)
     408             :     {
     409           2 :         stream.startpos = serverpos;
     410           2 :         stream.timeline = servertli;
     411             :     }
     412             : 
     413             :     /*
     414             :      * Always start streaming at the beginning of a segment
     415             :      */
     416           2 :     stream.startpos -= XLogSegmentOffset(stream.startpos, WalSegSz);
     417             : 
     418             :     /*
     419             :      * Start the replication
     420             :      */
     421           2 :     if (verbose)
     422           2 :         pg_log_info("starting log streaming at %X/%X (timeline %u)",
     423             :                     (uint32) (stream.startpos >> 32), (uint32) stream.startpos,
     424             :                     stream.timeline);
     425             : 
     426           2 :     stream.stream_stop = stop_streaming;
     427           2 :     stream.stop_socket = PGINVALID_SOCKET;
     428           2 :     stream.standby_message_timeout = standby_message_timeout;
     429           2 :     stream.synchronous = synchronous;
     430           2 :     stream.do_sync = do_sync;
     431           2 :     stream.mark_done = false;
     432           4 :     stream.walmethod = CreateWalDirectoryMethod(basedir, compresslevel,
     433           2 :                                                 stream.do_sync);
     434           2 :     stream.partial_suffix = ".partial";
     435           2 :     stream.replication_slot = replication_slot;
     436             : 
     437           2 :     ReceiveXlogStream(conn, &stream);
     438             : 
     439           2 :     if (!stream.walmethod->finish())
     440             :     {
     441           0 :         pg_log_info("could not finish writing WAL files: %m");
     442           0 :         return;
     443             :     }
     444             : 
     445           2 :     PQfinish(conn);
     446           2 :     conn = NULL;
     447             : 
     448           2 :     FreeWalDirectoryMethod();
     449           2 :     pg_free(stream.walmethod);
     450             : 
     451           2 :     conn = NULL;
     452             : }
     453             : 
     454             : /*
     455             :  * When sigint is called, just tell the system to exit at the next possible
     456             :  * moment.
     457             :  */
     458             : #ifndef WIN32
     459             : 
     460             : static void
     461           0 : sigint_handler(int signum)
     462             : {
     463           0 :     time_to_stop = true;
     464           0 : }
     465             : #endif
     466             : 
     467             : int
     468          20 : main(int argc, char **argv)
     469             : {
     470             :     static struct option long_options[] = {
     471             :         {"help", no_argument, NULL, '?'},
     472             :         {"version", no_argument, NULL, 'V'},
     473             :         {"directory", required_argument, NULL, 'D'},
     474             :         {"dbname", required_argument, NULL, 'd'},
     475             :         {"endpos", required_argument, NULL, 'E'},
     476             :         {"host", required_argument, NULL, 'h'},
     477             :         {"port", required_argument, NULL, 'p'},
     478             :         {"username", required_argument, NULL, 'U'},
     479             :         {"no-loop", no_argument, NULL, 'n'},
     480             :         {"no-password", no_argument, NULL, 'w'},
     481             :         {"password", no_argument, NULL, 'W'},
     482             :         {"status-interval", required_argument, NULL, 's'},
     483             :         {"slot", required_argument, NULL, 'S'},
     484             :         {"verbose", no_argument, NULL, 'v'},
     485             :         {"compress", required_argument, NULL, 'Z'},
     486             : /* action */
     487             :         {"create-slot", no_argument, NULL, 1},
     488             :         {"drop-slot", no_argument, NULL, 2},
     489             :         {"if-not-exists", no_argument, NULL, 3},
     490             :         {"synchronous", no_argument, NULL, 4},
     491             :         {"no-sync", no_argument, NULL, 5},
     492             :         {NULL, 0, NULL, 0}
     493             :     };
     494             : 
     495             :     int         c;
     496             :     int         option_index;
     497             :     char       *db_name;
     498             :     uint32      hi,
     499             :                 lo;
     500             : 
     501          20 :     pg_logging_init(argv[0]);
     502          20 :     progname = get_progname(argv[0]);
     503          20 :     set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
     504             : 
     505          20 :     if (argc > 1)
     506             :     {
     507          18 :         if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
     508             :         {
     509           2 :             usage();
     510           2 :             exit(0);
     511             :         }
     512          16 :         else if (strcmp(argv[1], "-V") == 0 ||
     513          16 :                  strcmp(argv[1], "--version") == 0)
     514             :         {
     515           2 :             puts("pg_receivewal (PostgreSQL) " PG_VERSION);
     516           2 :             exit(0);
     517             :         }
     518             :     }
     519             : 
     520          50 :     while ((c = getopt_long(argc, argv, "D:d:E:h:p:U:s:S:nwWvZ:",
     521             :                             long_options, &option_index)) != -1)
     522             :     {
     523          36 :         switch (c)
     524             :         {
     525           8 :             case 'D':
     526           8 :                 basedir = pg_strdup(optarg);
     527           8 :                 break;
     528           0 :             case 'd':
     529           0 :                 connection_string = pg_strdup(optarg);
     530           0 :                 break;
     531           0 :             case 'h':
     532           0 :                 dbhost = pg_strdup(optarg);
     533           0 :                 break;
     534           0 :             case 'p':
     535           0 :                 if (atoi(optarg) <= 0)
     536             :                 {
     537           0 :                     pg_log_error("invalid port number \"%s\"", optarg);
     538           0 :                     exit(1);
     539             :                 }
     540           0 :                 dbport = pg_strdup(optarg);
     541           0 :                 break;
     542           0 :             case 'U':
     543           0 :                 dbuser = pg_strdup(optarg);
     544           0 :                 break;
     545           0 :             case 'w':
     546           0 :                 dbgetpassword = -1;
     547           0 :                 break;
     548           0 :             case 'W':
     549           0 :                 dbgetpassword = 1;
     550           0 :                 break;
     551           0 :             case 's':
     552           0 :                 standby_message_timeout = atoi(optarg) * 1000;
     553           0 :                 if (standby_message_timeout < 0)
     554             :                 {
     555           0 :                     pg_log_error("invalid status interval \"%s\"", optarg);
     556           0 :                     exit(1);
     557             :                 }
     558           0 :                 break;
     559           4 :             case 'S':
     560           4 :                 replication_slot = pg_strdup(optarg);
     561           4 :                 break;
     562           2 :             case 'E':
     563           2 :                 if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
     564             :                 {
     565           0 :                     pg_log_error("could not parse end position \"%s\"", optarg);
     566           0 :                     exit(1);
     567             :                 }
     568           2 :                 endpos = ((uint64) hi) << 32 | lo;
     569           2 :                 break;
     570           2 :             case 'n':
     571           2 :                 noloop = 1;
     572           2 :                 break;
     573           2 :             case 'v':
     574           2 :                 verbose++;
     575           2 :                 break;
     576           0 :             case 'Z':
     577           0 :                 compresslevel = atoi(optarg);
     578           0 :                 if (compresslevel < 0 || compresslevel > 9)
     579             :                 {
     580           0 :                     pg_log_error("invalid compression level \"%s\"", optarg);
     581           0 :                     exit(1);
     582             :                 }
     583           0 :                 break;
     584             : /* action */
     585           6 :             case 1:
     586           6 :                 do_create_slot = true;
     587           6 :                 break;
     588           4 :             case 2:
     589           4 :                 do_drop_slot = true;
     590           4 :                 break;
     591           0 :             case 3:
     592           0 :                 slot_exists_ok = true;
     593           0 :                 break;
     594           4 :             case 4:
     595           4 :                 synchronous = true;
     596           4 :                 break;
     597           2 :             case 5:
     598           2 :                 do_sync = false;
     599           2 :                 break;
     600           2 :             default:
     601             : 
     602             :                 /*
     603             :                  * getopt_long already emitted a complaint
     604             :                  */
     605           2 :                 fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     606             :                         progname);
     607           2 :                 exit(1);
     608             :         }
     609             :     }
     610             : 
     611             :     /*
     612             :      * Any non-option arguments?
     613             :      */
     614          14 :     if (optind < argc)
     615             :     {
     616           0 :         pg_log_error("too many command-line arguments (first is \"%s\")",
     617             :                      argv[optind]);
     618           0 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     619             :                 progname);
     620           0 :         exit(1);
     621             :     }
     622             : 
     623          14 :     if (do_drop_slot && do_create_slot)
     624             :     {
     625           2 :         pg_log_error("cannot use --create-slot together with --drop-slot");
     626           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     627             :                 progname);
     628           2 :         exit(1);
     629             :     }
     630             : 
     631          12 :     if (replication_slot == NULL && (do_drop_slot || do_create_slot))
     632             :     {
     633             :         /* translator: second %s is an option name */
     634           2 :         pg_log_error("%s needs a slot to be specified using --slot",
     635             :                      do_drop_slot ? "--drop-slot" : "--create-slot");
     636           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     637             :                 progname);
     638           2 :         exit(1);
     639             :     }
     640             : 
     641          10 :     if (synchronous && !do_sync)
     642             :     {
     643           2 :         pg_log_error("cannot use --synchronous together with --no-sync");
     644           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     645             :                 progname);
     646           2 :         exit(1);
     647             :     }
     648             : 
     649             :     /*
     650             :      * Required arguments
     651             :      */
     652           8 :     if (basedir == NULL && !do_drop_slot && !do_create_slot)
     653             :     {
     654           2 :         pg_log_error("no target directory specified");
     655           2 :         fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
     656             :                 progname);
     657           2 :         exit(1);
     658             :     }
     659             : 
     660             : #ifndef HAVE_LIBZ
     661             :     if (compresslevel != 0)
     662             :     {
     663             :         pg_log_error("this build does not support compression");
     664             :         exit(1);
     665             :     }
     666             : #endif
     667             : 
     668             :     /*
     669             :      * Check existence of destination folder.
     670             :      */
     671           6 :     if (!do_drop_slot && !do_create_slot)
     672             :     {
     673           2 :         DIR        *dir = get_destination_dir(basedir);
     674             : 
     675           2 :         close_destination_dir(dir, basedir);
     676             :     }
     677             : 
     678             : #ifndef WIN32
     679           6 :     pqsignal(SIGINT, sigint_handler);
     680             : #endif
     681             : 
     682             :     /*
     683             :      * Obtain a connection before doing anything.
     684             :      */
     685           6 :     conn = GetConnection();
     686           6 :     if (!conn)
     687             :         /* error message already written in GetConnection() */
     688           0 :         exit(1);
     689           6 :     atexit(disconnect_atexit);
     690             : 
     691             :     /*
     692             :      * Run IDENTIFY_SYSTEM to make sure we've successfully have established a
     693             :      * replication connection and haven't connected using a database specific
     694             :      * connection.
     695             :      */
     696           6 :     if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
     697           0 :         exit(1);
     698             : 
     699             :     /*
     700             :      * Set umask so that directories/files are created with the same
     701             :      * permissions as directories/files in the source data directory.
     702             :      *
     703             :      * pg_mode_mask is set to owner-only by default and then updated in
     704             :      * GetConnection() where we get the mode from the server-side with
     705             :      * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
     706             :      */
     707           6 :     umask(pg_mode_mask);
     708             : 
     709             :     /* determine remote server's xlog segment size */
     710           6 :     if (!RetrieveWalSegSize(conn))
     711           0 :         exit(1);
     712             : 
     713             :     /*
     714             :      * Check that there is a database associated with connection, none should
     715             :      * be defined in this context.
     716             :      */
     717           6 :     if (db_name)
     718             :     {
     719           0 :         pg_log_error("replication connection using slot \"%s\" is unexpectedly database specific",
     720             :                      replication_slot);
     721           0 :         exit(1);
     722             :     }
     723             : 
     724             :     /*
     725             :      * Drop a replication slot.
     726             :      */
     727           6 :     if (do_drop_slot)
     728             :     {
     729           2 :         if (verbose)
     730           0 :             pg_log_info("dropping replication slot \"%s\"", replication_slot);
     731             : 
     732           2 :         if (!DropReplicationSlot(conn, replication_slot))
     733           0 :             exit(1);
     734           2 :         exit(0);
     735             :     }
     736             : 
     737             :     /* Create a replication slot */
     738           4 :     if (do_create_slot)
     739             :     {
     740           2 :         if (verbose)
     741           0 :             pg_log_info("creating replication slot \"%s\"", replication_slot);
     742             : 
     743           2 :         if (!CreateReplicationSlot(conn, replication_slot, NULL, false, true, false,
     744             :                                    slot_exists_ok))
     745           0 :             exit(1);
     746           2 :         exit(0);
     747             :     }
     748             : 
     749             :     /*
     750             :      * Don't close the connection here so that subsequent StreamLog() can
     751             :      * reuse it.
     752             :      */
     753             : 
     754             :     while (true)
     755             :     {
     756           2 :         StreamLog();
     757           2 :         if (time_to_stop)
     758             :         {
     759             :             /*
     760             :              * We've been Ctrl-C'ed or end of streaming position has been
     761             :              * willingly reached, so exit without an error code.
     762             :              */
     763           2 :             exit(0);
     764             :         }
     765           0 :         else if (noloop)
     766             :         {
     767           0 :             pg_log_error("disconnected");
     768           0 :             exit(1);
     769             :         }
     770             :         else
     771             :         {
     772             :             /* translator: check source for value for %d */
     773           0 :             pg_log_info("disconnected; waiting %d seconds to try again",
     774             :                         RECONNECT_SLEEP_TIME);
     775           0 :             pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
     776             :         }
     777             :     }
     778             : }

Generated by: LCOV version 1.13