LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18beta1 Lines: 230 323 71.2 %
Date: 2025-05-09 07:15:10 Functions: 15 15 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4             :  *                  pg_recvlogical
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/streamutil.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/time.h>
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xlog_internal.h"
      21             : #include "common/connect.h"
      22             : #include "common/file_perm.h"
      23             : #include "common/logging.h"
      24             : #include "common/string.h"
      25             : #include "datatype/timestamp.h"
      26             : #include "port/pg_bswap.h"
      27             : #include "pqexpbuffer.h"
      28             : #include "streamutil.h"
      29             : 
      30             : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      31             : 
      32             : int         WalSegSz;
      33             : 
      34             : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      35             : 
      36             : /* SHOW command for replication connection was introduced in version 10 */
      37             : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      38             : 
      39             : /*
      40             :  * Group access is supported from version 11.
      41             :  */
      42             : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      43             : 
      44             : const char *progname;
      45             : char       *connection_string = NULL;
      46             : char       *dbhost = NULL;
      47             : char       *dbuser = NULL;
      48             : char       *dbport = NULL;
      49             : char       *dbname = NULL;
      50             : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
      51             : static char *password = NULL;
      52             : PGconn     *conn = NULL;
      53             : 
      54             : /*
      55             :  * Connect to the server. Returns a valid PGconn pointer if connected,
      56             :  * or NULL on non-permanent error. On permanent error, the function will
      57             :  * call exit(1) directly.
      58             :  */
      59             : PGconn *
      60         754 : GetConnection(void)
      61             : {
      62             :     PGconn     *tmpconn;
      63         754 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
      64             :                                  * host, user, port, password */
      65             :     int         i;
      66             :     const char **keywords;
      67             :     const char **values;
      68             :     const char *tmpparam;
      69             :     bool        need_password;
      70         754 :     PQconninfoOption *conn_opts = NULL;
      71             :     PQconninfoOption *conn_opt;
      72         754 :     char       *err_msg = NULL;
      73             : 
      74             :     /*
      75             :      * pg_recvlogical uses dbname only; others use connection_string only.
      76             :      * (Note: both variables will be NULL if there's no command line options.)
      77             :      */
      78             :     Assert(dbname == NULL || connection_string == NULL);
      79             : 
      80             :     /*
      81             :      * Merge the connection info inputs given in form of connection string,
      82             :      * options and default values (dbname=replication, replication=true, etc.)
      83             :      */
      84         754 :     i = 0;
      85         754 :     if (connection_string)
      86             :     {
      87           8 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
      88           8 :         if (conn_opts == NULL)
      89           0 :             pg_fatal("%s", err_msg);
      90             : 
      91         408 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      92             :         {
      93         400 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
      94          16 :                 argcount++;
      95             :         }
      96             : 
      97           8 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
      98           8 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
      99             : 
     100             :         /*
     101             :          * Set dbname here already, so it can be overridden by a dbname in the
     102             :          * connection string.
     103             :          */
     104           8 :         keywords[i] = "dbname";
     105           8 :         values[i] = "replication";
     106           8 :         i++;
     107             : 
     108         408 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     109             :         {
     110         400 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     111             :             {
     112          16 :                 keywords[i] = conn_opt->keyword;
     113          16 :                 values[i] = conn_opt->val;
     114          16 :                 i++;
     115             :             }
     116             :         }
     117             :     }
     118             :     else
     119             :     {
     120         746 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
     121         746 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
     122         746 :         keywords[i] = "dbname";
     123         746 :         values[i] = (dbname == NULL) ? "replication" : dbname;
     124         746 :         i++;
     125             :     }
     126             : 
     127         754 :     keywords[i] = "replication";
     128         754 :     values[i] = (dbname == NULL) ? "true" : "database";
     129         754 :     i++;
     130         754 :     keywords[i] = "fallback_application_name";
     131         754 :     values[i] = progname;
     132         754 :     i++;
     133             : 
     134         754 :     if (dbhost)
     135             :     {
     136         266 :         keywords[i] = "host";
     137         266 :         values[i] = dbhost;
     138         266 :         i++;
     139             :     }
     140         754 :     if (dbuser)
     141             :     {
     142          14 :         keywords[i] = "user";
     143          14 :         values[i] = dbuser;
     144          14 :         i++;
     145             :     }
     146         754 :     if (dbport)
     147             :     {
     148         266 :         keywords[i] = "port";
     149         266 :         values[i] = dbport;
     150         266 :         i++;
     151             :     }
     152             : 
     153             :     /* If -W was given, force prompt for password, but only the first time */
     154         754 :     need_password = (dbgetpassword == 1 && !password);
     155             : 
     156             :     do
     157             :     {
     158             :         /* Get a new password if appropriate */
     159         754 :         if (need_password)
     160             :         {
     161           0 :             free(password);
     162           0 :             password = simple_prompt("Password: ", false);
     163           0 :             need_password = false;
     164             :         }
     165             : 
     166             :         /* Use (or reuse, on a subsequent connection) password if we have it */
     167         754 :         if (password)
     168             :         {
     169           0 :             keywords[i] = "password";
     170           0 :             values[i] = password;
     171             :         }
     172             :         else
     173             :         {
     174         754 :             keywords[i] = NULL;
     175         754 :             values[i] = NULL;
     176             :         }
     177             : 
     178             :         /*
     179             :          * Only expand dbname when we did not already parse the argument as a
     180             :          * connection string ourselves.
     181             :          */
     182         754 :         tmpconn = PQconnectdbParams(keywords, values, !connection_string);
     183             : 
     184             :         /*
     185             :          * If there is too little memory even to allocate the PGconn object
     186             :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
     187             :          */
     188         754 :         if (!tmpconn)
     189           0 :             pg_fatal("could not connect to server");
     190             : 
     191             :         /* If we need a password and -w wasn't given, loop back and get one */
     192         758 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
     193           4 :             PQconnectionNeedsPassword(tmpconn) &&
     194           0 :             dbgetpassword != -1)
     195             :         {
     196           0 :             PQfinish(tmpconn);
     197           0 :             need_password = true;
     198             :         }
     199             :     }
     200         754 :     while (need_password);
     201             : 
     202         754 :     if (PQstatus(tmpconn) != CONNECTION_OK)
     203             :     {
     204           4 :         pg_log_error("%s", PQerrorMessage(tmpconn));
     205           4 :         PQfinish(tmpconn);
     206           4 :         free(values);
     207           4 :         free(keywords);
     208           4 :         PQconninfoFree(conn_opts);
     209           4 :         return NULL;
     210             :     }
     211             : 
     212             :     /* Connection ok! */
     213         750 :     free(values);
     214         750 :     free(keywords);
     215         750 :     PQconninfoFree(conn_opts);
     216             : 
     217             :     /*
     218             :      * Set always-secure search path, so malicious users can't get control.
     219             :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     220             :      * the search path cannot be changed (by us or attackers) on earlier
     221             :      * versions.
     222             :      */
     223         750 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     224             :     {
     225             :         PGresult   *res;
     226             : 
     227          98 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     228          98 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     229             :         {
     230           0 :             pg_log_error("could not clear \"search_path\": %s",
     231             :                          PQerrorMessage(tmpconn));
     232           0 :             PQclear(res);
     233           0 :             PQfinish(tmpconn);
     234           0 :             exit(1);
     235             :         }
     236          98 :         PQclear(res);
     237             :     }
     238             : 
     239             :     /*
     240             :      * Ensure we have the same value of integer_datetimes (now always "on") as
     241             :      * the server we are connecting to.
     242             :      */
     243         750 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     244         750 :     if (!tmpparam)
     245             :     {
     246           0 :         pg_log_error("could not determine server setting for \"integer_datetimes\"");
     247           0 :         PQfinish(tmpconn);
     248           0 :         exit(1);
     249             :     }
     250             : 
     251         750 :     if (strcmp(tmpparam, "on") != 0)
     252             :     {
     253           0 :         pg_log_error("\"integer_datetimes\" compile flag does not match server");
     254           0 :         PQfinish(tmpconn);
     255           0 :         exit(1);
     256             :     }
     257             : 
     258             :     /*
     259             :      * Retrieve the source data directory mode and use it to construct a umask
     260             :      * for creating directories and files.
     261             :      */
     262         750 :     if (!RetrieveDataDirCreatePerm(tmpconn))
     263             :     {
     264           0 :         PQfinish(tmpconn);
     265           0 :         exit(1);
     266             :     }
     267             : 
     268         750 :     return tmpconn;
     269             : }
     270             : 
     271             : /*
     272             :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     273             :  * since ControlFile is not accessible here.
     274             :  */
     275             : bool
     276         372 : RetrieveWalSegSize(PGconn *conn)
     277             : {
     278             :     PGresult   *res;
     279             :     char        xlog_unit[3];
     280             :     int         xlog_val,
     281         372 :                 multiplier = 1;
     282             : 
     283             :     /* check connection existence */
     284             :     Assert(conn != NULL);
     285             : 
     286             :     /* for previous versions set the default xlog seg size */
     287         372 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     288             :     {
     289           0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     290           0 :         return true;
     291             :     }
     292             : 
     293         372 :     res = PQexec(conn, "SHOW wal_segment_size");
     294         372 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     295             :     {
     296           0 :         pg_log_error("could not send replication command \"%s\": %s",
     297             :                      "SHOW wal_segment_size", PQerrorMessage(conn));
     298             : 
     299           0 :         PQclear(res);
     300           0 :         return false;
     301             :     }
     302         372 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     303             :     {
     304           0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     305             :                      PQntuples(res), PQnfields(res), 1, 1);
     306             : 
     307           0 :         PQclear(res);
     308           0 :         return false;
     309             :     }
     310             : 
     311             :     /* fetch xlog value and unit from the result */
     312         372 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     313             :     {
     314           0 :         pg_log_error("WAL segment size could not be parsed");
     315           0 :         PQclear(res);
     316           0 :         return false;
     317             :     }
     318             : 
     319         372 :     PQclear(res);
     320             : 
     321             :     /* set the multiplier based on unit to convert xlog_val to bytes */
     322         372 :     if (strcmp(xlog_unit, "MB") == 0)
     323         372 :         multiplier = 1024 * 1024;
     324           0 :     else if (strcmp(xlog_unit, "GB") == 0)
     325           0 :         multiplier = 1024 * 1024 * 1024;
     326             : 
     327             :     /* convert and set WalSegSz */
     328         372 :     WalSegSz = xlog_val * multiplier;
     329             : 
     330         372 :     if (!IsValidWalSegSize(WalSegSz))
     331             :     {
     332           0 :         pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
     333             :                               "remote server reported invalid WAL segment size (%d bytes)",
     334             :                               WalSegSz),
     335             :                      WalSegSz);
     336           0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     337           0 :         return false;
     338             :     }
     339             : 
     340         372 :     return true;
     341             : }
     342             : 
     343             : /*
     344             :  * RetrieveDataDirCreatePerm
     345             :  *
     346             :  * This function is used to determine the privileges on the server's PG data
     347             :  * directory and, based on that, set what the permissions will be for
     348             :  * directories and files we create.
     349             :  *
     350             :  * PG11 added support for (optionally) group read/execute rights to be set on
     351             :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     352             :  * on the data directory.
     353             :  */
     354             : static bool
     355         750 : RetrieveDataDirCreatePerm(PGconn *conn)
     356             : {
     357             :     PGresult   *res;
     358             :     int         data_directory_mode;
     359             : 
     360             :     /* check connection existence */
     361             :     Assert(conn != NULL);
     362             : 
     363             :     /* for previous versions leave the default group access */
     364         750 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     365           0 :         return true;
     366             : 
     367         750 :     res = PQexec(conn, "SHOW data_directory_mode");
     368         750 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     369             :     {
     370           0 :         pg_log_error("could not send replication command \"%s\": %s",
     371             :                      "SHOW data_directory_mode", PQerrorMessage(conn));
     372             : 
     373           0 :         PQclear(res);
     374           0 :         return false;
     375             :     }
     376         750 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     377             :     {
     378           0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     379             :                      PQntuples(res), PQnfields(res), 1, 1);
     380             : 
     381           0 :         PQclear(res);
     382           0 :         return false;
     383             :     }
     384             : 
     385         750 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     386             :     {
     387           0 :         pg_log_error("group access flag could not be parsed: %s",
     388             :                      PQgetvalue(res, 0, 0));
     389             : 
     390           0 :         PQclear(res);
     391           0 :         return false;
     392             :     }
     393             : 
     394         750 :     SetDataDirectoryCreatePerm(data_directory_mode);
     395             : 
     396         750 :     PQclear(res);
     397         750 :     return true;
     398             : }
     399             : 
     400             : /*
     401             :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     402             :  * some result information if requested:
     403             :  * - System identifier
     404             :  * - Current timeline ID
     405             :  * - Start LSN position
     406             :  * - Database name (NULL in servers prior to 9.4)
     407             :  */
     408             : bool
     409         766 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     410             :                   XLogRecPtr *startpos, char **db_name)
     411             : {
     412             :     PGresult   *res;
     413             :     uint32      hi,
     414             :                 lo;
     415             : 
     416             :     /* Check connection existence */
     417             :     Assert(conn != NULL);
     418             : 
     419         766 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
     420         766 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     421             :     {
     422           0 :         pg_log_error("could not send replication command \"%s\": %s",
     423             :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     424             : 
     425           0 :         PQclear(res);
     426           0 :         return false;
     427             :     }
     428         766 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
     429             :     {
     430           0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     431             :                      PQntuples(res), PQnfields(res), 1, 3);
     432             : 
     433           0 :         PQclear(res);
     434           0 :         return false;
     435             :     }
     436             : 
     437             :     /* Get system identifier */
     438         766 :     if (sysid != NULL)
     439         648 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     440             : 
     441             :     /* Get timeline ID to start streaming from */
     442         766 :     if (starttli != NULL)
     443         648 :         *starttli = atoi(PQgetvalue(res, 0, 1));
     444             : 
     445             :     /* Get LSN start position if necessary */
     446         766 :     if (startpos != NULL)
     447             :     {
     448          14 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
     449             :         {
     450           0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
     451             :                          PQgetvalue(res, 0, 2));
     452             : 
     453           0 :             PQclear(res);
     454           0 :             return false;
     455             :         }
     456          14 :         *startpos = ((uint64) hi) << 32 | lo;
     457             :     }
     458             : 
     459             :     /* Get database name, only available in 9.4 and newer versions */
     460         766 :     if (db_name != NULL)
     461             :     {
     462         118 :         *db_name = NULL;
     463         118 :         if (PQserverVersion(conn) >= 90400)
     464             :         {
     465         118 :             if (PQnfields(res) < 4)
     466             :             {
     467           0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     468             :                              PQntuples(res), PQnfields(res), 1, 4);
     469             : 
     470           0 :                 PQclear(res);
     471           0 :                 return false;
     472             :             }
     473         118 :             if (!PQgetisnull(res, 0, 3))
     474          98 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     475             :         }
     476             :     }
     477             : 
     478         766 :     PQclear(res);
     479         766 :     return true;
     480             : }
     481             : 
     482             : /*
     483             :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     484             :  * caller some result information if requested for this slot:
     485             :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     486             :  * - Current timeline ID, 0 if unknown.
     487             :  * Returns false on failure, and true otherwise.
     488             :  */
     489             : bool
     490           6 : GetSlotInformation(PGconn *conn, const char *slot_name,
     491             :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     492             : {
     493             :     PGresult   *res;
     494             :     PQExpBuffer query;
     495           6 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
     496           6 :     TimeLineID  tli_loc = 0;
     497             : 
     498           6 :     if (restart_lsn)
     499           6 :         *restart_lsn = lsn_loc;
     500           6 :     if (restart_tli)
     501           6 :         *restart_tli = tli_loc;
     502             : 
     503           6 :     query = createPQExpBuffer();
     504           6 :     appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
     505           6 :     res = PQexec(conn, query->data);
     506           6 :     destroyPQExpBuffer(query);
     507             : 
     508           6 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     509             :     {
     510           0 :         pg_log_error("could not send replication command \"%s\": %s",
     511             :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     512           0 :         PQclear(res);
     513           0 :         return false;
     514             :     }
     515             : 
     516             :     /* The command should always return precisely one tuple and three fields */
     517           6 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
     518             :     {
     519           0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     520             :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
     521           0 :         PQclear(res);
     522           0 :         return false;
     523             :     }
     524             : 
     525             :     /*
     526             :      * When the slot doesn't exist, the command returns a tuple with NULL
     527             :      * values.  This checks only the slot type field.
     528             :      */
     529           6 :     if (PQgetisnull(res, 0, 0))
     530             :     {
     531           2 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
     532           2 :         PQclear(res);
     533           2 :         return false;
     534             :     }
     535             : 
     536             :     /*
     537             :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     538             :      * physical slots, but play it safe.
     539             :      */
     540           4 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     541             :     {
     542           0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     543             :                      PQgetvalue(res, 0, 0));
     544           0 :         PQclear(res);
     545           0 :         return false;
     546             :     }
     547             : 
     548             :     /* restart LSN */
     549           4 :     if (!PQgetisnull(res, 0, 1))
     550             :     {
     551             :         uint32      hi,
     552             :                     lo;
     553             : 
     554           4 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
     555             :         {
     556           0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     557             :                          PQgetvalue(res, 0, 1), slot_name);
     558           0 :             PQclear(res);
     559           0 :             return false;
     560             :         }
     561           4 :         lsn_loc = ((uint64) hi) << 32 | lo;
     562             :     }
     563             : 
     564             :     /* current TLI */
     565           4 :     if (!PQgetisnull(res, 0, 2))
     566           4 :         tli_loc = (TimeLineID) atoll(PQgetvalue(res, 0, 2));
     567             : 
     568           4 :     PQclear(res);
     569             : 
     570             :     /* Assign results if requested */
     571           4 :     if (restart_lsn)
     572           4 :         *restart_lsn = lsn_loc;
     573           4 :     if (restart_tli)
     574           4 :         *restart_tli = tli_loc;
     575             : 
     576           4 :     return true;
     577             : }
     578             : 
     579             : /*
     580             :  * Create a replication slot for the given connection. This function
     581             :  * returns true in case of success.
     582             :  */
     583             : bool
     584         318 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     585             :                       bool is_temporary, bool is_physical, bool reserve_wal,
     586             :                       bool slot_exists_ok, bool two_phase, bool failover)
     587             : {
     588             :     PQExpBuffer query;
     589             :     PGresult   *res;
     590         318 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     591             : 
     592         318 :     query = createPQExpBuffer();
     593             : 
     594             :     Assert((is_physical && plugin == NULL) ||
     595             :            (!is_physical && plugin != NULL));
     596             :     Assert(!(two_phase && is_physical));
     597             :     Assert(!(failover && is_physical));
     598             :     Assert(slot_name != NULL);
     599             : 
     600             :     /* Build base portion of query */
     601         318 :     appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
     602         318 :     if (is_temporary)
     603         262 :         appendPQExpBufferStr(query, " TEMPORARY");
     604         318 :     if (is_physical)
     605         268 :         appendPQExpBufferStr(query, " PHYSICAL");
     606             :     else
     607          50 :         appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
     608             : 
     609             :     /* Add any requested options */
     610         318 :     if (use_new_option_syntax)
     611         318 :         appendPQExpBufferStr(query, " (");
     612         318 :     if (is_physical)
     613             :     {
     614         268 :         if (reserve_wal)
     615         266 :             AppendPlainCommandOption(query, use_new_option_syntax,
     616             :                                      "RESERVE_WAL");
     617             :     }
     618             :     else
     619             :     {
     620          50 :         if (failover && PQserverVersion(conn) >= 170000)
     621           2 :             AppendPlainCommandOption(query, use_new_option_syntax,
     622             :                                      "FAILOVER");
     623             : 
     624          50 :         if (two_phase && PQserverVersion(conn) >= 150000)
     625           2 :             AppendPlainCommandOption(query, use_new_option_syntax,
     626             :                                      "TWO_PHASE");
     627             : 
     628          50 :         if (PQserverVersion(conn) >= 100000)
     629             :         {
     630             :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     631          50 :             if (use_new_option_syntax)
     632          50 :                 AppendStringCommandOption(query, use_new_option_syntax,
     633             :                                           "SNAPSHOT", "nothing");
     634             :             else
     635           0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
     636             :                                          "NOEXPORT_SNAPSHOT");
     637             :         }
     638             :     }
     639         318 :     if (use_new_option_syntax)
     640             :     {
     641             :         /* Suppress option list if it would be empty, otherwise terminate */
     642         318 :         if (query->data[query->len - 1] == '(')
     643             :         {
     644           2 :             query->len -= 2;
     645           2 :             query->data[query->len] = '\0';
     646             :         }
     647             :         else
     648         316 :             appendPQExpBufferChar(query, ')');
     649             :     }
     650             : 
     651             :     /* Now run the query */
     652         318 :     res = PQexec(conn, query->data);
     653         318 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     654             :     {
     655           2 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     656             : 
     657           2 :         if (slot_exists_ok &&
     658           0 :             sqlstate &&
     659           0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     660             :         {
     661           0 :             destroyPQExpBuffer(query);
     662           0 :             PQclear(res);
     663           0 :             return true;
     664             :         }
     665             :         else
     666             :         {
     667           2 :             pg_log_error("could not send replication command \"%s\": %s",
     668             :                          query->data, PQerrorMessage(conn));
     669             : 
     670           2 :             destroyPQExpBuffer(query);
     671           2 :             PQclear(res);
     672           2 :             return false;
     673             :         }
     674             :     }
     675             : 
     676         316 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
     677             :     {
     678           0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     679             :                      slot_name,
     680             :                      PQntuples(res), PQnfields(res), 1, 4);
     681             : 
     682           0 :         destroyPQExpBuffer(query);
     683           0 :         PQclear(res);
     684           0 :         return false;
     685             :     }
     686             : 
     687         316 :     destroyPQExpBuffer(query);
     688         316 :     PQclear(res);
     689         316 :     return true;
     690             : }
     691             : 
     692             : /*
     693             :  * Drop a replication slot for the given connection. This function
     694             :  * returns true in case of success.
     695             :  */
     696             : bool
     697           6 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     698             : {
     699             :     PQExpBuffer query;
     700             :     PGresult   *res;
     701             : 
     702             :     Assert(slot_name != NULL);
     703             : 
     704           6 :     query = createPQExpBuffer();
     705             : 
     706             :     /* Build query */
     707           6 :     appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
     708             :                       slot_name);
     709           6 :     res = PQexec(conn, query->data);
     710           6 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     711             :     {
     712           0 :         pg_log_error("could not send replication command \"%s\": %s",
     713             :                      query->data, PQerrorMessage(conn));
     714             : 
     715           0 :         destroyPQExpBuffer(query);
     716           0 :         PQclear(res);
     717           0 :         return false;
     718             :     }
     719             : 
     720           6 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
     721             :     {
     722           0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     723             :                      slot_name,
     724             :                      PQntuples(res), PQnfields(res), 0, 0);
     725             : 
     726           0 :         destroyPQExpBuffer(query);
     727           0 :         PQclear(res);
     728           0 :         return false;
     729             :     }
     730             : 
     731           6 :     destroyPQExpBuffer(query);
     732           6 :     PQclear(res);
     733           6 :     return true;
     734             : }
     735             : 
     736             : /*
     737             :  * Append a "plain" option - one with no value - to a server command that
     738             :  * is being constructed.
     739             :  *
     740             :  * In the old syntax, all options were parser keywords, so you could just
     741             :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     742             :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     743             :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     744             :  */
     745             : void
     746        2652 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     747             :                          char *option_name)
     748             : {
     749        2652 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     750             :     {
     751        1980 :         if (use_new_option_syntax)
     752        1980 :             appendPQExpBufferStr(buf, ", ");
     753             :         else
     754           0 :             appendPQExpBufferChar(buf, ' ');
     755             :     }
     756             : 
     757        2652 :     appendPQExpBuffer(buf, " %s", option_name);
     758        2652 : }
     759             : 
     760             : /*
     761             :  * Append an option with an associated string value to a server command that
     762             :  * is being constructed.
     763             :  *
     764             :  * See comments for AppendPlainCommandOption, above.
     765             :  */
     766             : void
     767        1566 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     768             :                           char *option_name, char *option_value)
     769             : {
     770        1566 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     771             : 
     772        1566 :     if (option_value != NULL)
     773             :     {
     774        1566 :         size_t      length = strlen(option_value);
     775        1566 :         char       *escaped_value = palloc(1 + 2 * length);
     776             : 
     777        1566 :         PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
     778        1566 :         appendPQExpBuffer(buf, " '%s'", escaped_value);
     779        1566 :         pfree(escaped_value);
     780             :     }
     781        1566 : }
     782             : 
     783             : /*
     784             :  * Append an option with an associated integer value to a server command
     785             :  * is being constructed.
     786             :  *
     787             :  * See comments for AppendPlainCommandOption, above.
     788             :  */
     789             : void
     790         340 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     791             :                            char *option_name, int32 option_value)
     792             : {
     793         340 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     794             : 
     795         340 :     appendPQExpBuffer(buf, " %d", option_value);
     796         340 : }
     797             : 
     798             : /*
     799             :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     800             :  * backend code.
     801             :  */
     802             : TimestampTz
     803        5232 : feGetCurrentTimestamp(void)
     804             : {
     805             :     TimestampTz result;
     806             :     struct timeval tp;
     807             : 
     808        5232 :     gettimeofday(&tp, NULL);
     809             : 
     810        5232 :     result = (TimestampTz) tp.tv_sec -
     811             :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     812        5232 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
     813             : 
     814        5232 :     return result;
     815             : }
     816             : 
     817             : /*
     818             :  * Frontend version of TimestampDifference(), since we are not linked with
     819             :  * backend code.
     820             :  */
     821             : void
     822        4248 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     823             :                       long *secs, int *microsecs)
     824             : {
     825        4248 :     TimestampTz diff = stop_time - start_time;
     826             : 
     827        4248 :     if (diff <= 0)
     828             :     {
     829           0 :         *secs = 0;
     830           0 :         *microsecs = 0;
     831             :     }
     832             :     else
     833             :     {
     834        4248 :         *secs = (long) (diff / USECS_PER_SEC);
     835        4248 :         *microsecs = (int) (diff % USECS_PER_SEC);
     836             :     }
     837        4248 : }
     838             : 
     839             : /*
     840             :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     841             :  * linked with backend code.
     842             :  */
     843             : bool
     844        6414 : feTimestampDifferenceExceeds(TimestampTz start_time,
     845             :                              TimestampTz stop_time,
     846             :                              int msec)
     847             : {
     848        6414 :     TimestampTz diff = stop_time - start_time;
     849             : 
     850        6414 :     return (diff >= msec * INT64CONST(1000));
     851             : }
     852             : 
     853             : /*
     854             :  * Converts an int64 to network byte order.
     855             :  */
     856             : void
     857        1304 : fe_sendint64(int64 i, char *buf)
     858             : {
     859        1304 :     uint64      n64 = pg_hton64(i);
     860             : 
     861        1304 :     memcpy(buf, &n64, sizeof(n64));
     862        1304 : }
     863             : 
     864             : /*
     865             :  * Converts an int64 from network byte order to native format.
     866             :  */
     867             : int64
     868        7936 : fe_recvint64(char *buf)
     869             : {
     870             :     uint64      n64;
     871             : 
     872        7936 :     memcpy(&n64, buf, sizeof(n64));
     873             : 
     874        7936 :     return pg_ntoh64(n64);
     875             : }

Generated by: LCOV version 1.14