LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 71.8 % 333 239
Test Date: 2026-06-18 03:16:44 Functions: 100.0 % 16 16
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4              :  *                  pg_recvlogical
       5              :  *
       6              :  * Author: Magnus Hagander <magnus@hagander.net>
       7              :  *
       8              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *        src/bin/pg_basebackup/streamutil.c
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <sys/time.h>
      18              : #include <unistd.h>
      19              : 
      20              : #include "access/xlog_internal.h"
      21              : #include "common/connect.h"
      22              : #include "common/file_perm.h"
      23              : #include "common/logging.h"
      24              : #include "common/string.h"
      25              : #include "datatype/timestamp.h"
      26              : #include "port/pg_bswap.h"
      27              : #include "pqexpbuffer.h"
      28              : #include "streamutil.h"
      29              : 
      30              : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      31              : 
      32              : int         WalSegSz;
      33              : 
      34              : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      35              : 
      36              : /* SHOW command for replication connection was introduced in version 10 */
      37              : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      38              : 
      39              : /*
      40              :  * Group access is supported from version 11.
      41              :  */
      42              : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      43              : 
      44              : const char *progname;
      45              : char       *connection_string = NULL;
      46              : char       *dbhost = NULL;
      47              : char       *dbuser = NULL;
      48              : char       *dbport = NULL;
      49              : char       *dbname = NULL;
      50              : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
      51              : static char *password = NULL;
      52              : PGconn     *conn = NULL;
      53              : 
      54              : /*
      55              :  * Connect to the server. Returns a valid PGconn pointer if connected,
      56              :  * or NULL on non-permanent error. On permanent error, the function will
      57              :  * call exit(1) directly.
      58              :  */
      59              : PGconn *
      60          426 : GetConnection(void)
      61              : {
      62              :     PGconn     *tmpconn;
      63          426 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
      64              :                                  * host, user, port, password */
      65              :     int         i;
      66              :     const char **keywords;
      67              :     const char **values;
      68              :     const char *tmpparam;
      69              :     bool        need_password;
      70          426 :     PQconninfoOption *conn_opts = NULL;
      71              :     PQconninfoOption *conn_opt;
      72          426 :     char       *err_msg = NULL;
      73              : 
      74              :     /*
      75              :      * pg_recvlogical uses dbname only; others use connection_string only.
      76              :      * (Note: both variables will be NULL if there's no command line options.)
      77              :      */
      78              :     Assert(dbname == NULL || connection_string == NULL);
      79              : 
      80              :     /*
      81              :      * Merge the connection info inputs given in form of connection string,
      82              :      * options and default values (dbname=replication, replication=true, etc.)
      83              :      */
      84          426 :     i = 0;
      85          426 :     if (connection_string)
      86              :     {
      87            6 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
      88            6 :         if (conn_opts == NULL)
      89            0 :             pg_fatal("%s", err_msg);
      90              : 
      91          318 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      92              :         {
      93          312 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
      94           10 :                 argcount++;
      95              :         }
      96              : 
      97            6 :         keywords = pg_malloc0_array(const char *, argcount + 1);
      98            6 :         values = pg_malloc0_array(const char *, argcount + 1);
      99              : 
     100              :         /*
     101              :          * Set dbname here already, so it can be overridden by a dbname in the
     102              :          * connection string.
     103              :          */
     104            6 :         keywords[i] = "dbname";
     105            6 :         values[i] = "replication";
     106            6 :         i++;
     107              : 
     108          318 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     109              :         {
     110          312 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     111              :             {
     112           10 :                 keywords[i] = conn_opt->keyword;
     113           10 :                 values[i] = conn_opt->val;
     114           10 :                 i++;
     115              :             }
     116              :         }
     117              :     }
     118              :     else
     119              :     {
     120          420 :         keywords = pg_malloc0_array(const char *, argcount + 1);
     121          420 :         values = pg_malloc0_array(const char *, argcount + 1);
     122          420 :         keywords[i] = "dbname";
     123          420 :         values[i] = (dbname == NULL) ? "replication" : dbname;
     124          420 :         i++;
     125              :     }
     126              : 
     127          426 :     keywords[i] = "replication";
     128          426 :     values[i] = (dbname == NULL) ? "true" : "database";
     129          426 :     i++;
     130          426 :     keywords[i] = "fallback_application_name";
     131          426 :     values[i] = progname;
     132          426 :     i++;
     133              : 
     134          426 :     if (dbhost)
     135              :     {
     136          167 :         keywords[i] = "host";
     137          167 :         values[i] = dbhost;
     138          167 :         i++;
     139              :     }
     140          426 :     if (dbuser)
     141              :     {
     142            7 :         keywords[i] = "user";
     143            7 :         values[i] = dbuser;
     144            7 :         i++;
     145              :     }
     146          426 :     if (dbport)
     147              :     {
     148          167 :         keywords[i] = "port";
     149          167 :         values[i] = dbport;
     150          167 :         i++;
     151              :     }
     152              : 
     153              :     /* If -W was given, force prompt for password, but only the first time */
     154          426 :     need_password = (dbgetpassword == 1 && !password);
     155              : 
     156              :     do
     157              :     {
     158              :         /* Get a new password if appropriate */
     159          426 :         if (need_password)
     160              :         {
     161            0 :             free(password);
     162            0 :             password = simple_prompt("Password: ", false);
     163            0 :             need_password = false;
     164              :         }
     165              : 
     166              :         /* Use (or reuse, on a subsequent connection) password if we have it */
     167          426 :         if (password)
     168              :         {
     169            0 :             keywords[i] = "password";
     170            0 :             values[i] = password;
     171              :         }
     172              :         else
     173              :         {
     174          426 :             keywords[i] = NULL;
     175          426 :             values[i] = NULL;
     176              :         }
     177              : 
     178              :         /*
     179              :          * Only expand dbname when we did not already parse the argument as a
     180              :          * connection string ourselves.
     181              :          */
     182          426 :         tmpconn = PQconnectdbParams(keywords, values, !connection_string);
     183              : 
     184              :         /*
     185              :          * If there is too little memory even to allocate the PGconn object
     186              :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
     187              :          */
     188          426 :         if (!tmpconn)
     189            0 :             pg_fatal("could not connect to server");
     190              : 
     191              :         /* If we need a password and -w wasn't given, loop back and get one */
     192          428 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
     193            2 :             PQconnectionNeedsPassword(tmpconn) &&
     194            0 :             dbgetpassword != -1)
     195              :         {
     196            0 :             PQfinish(tmpconn);
     197            0 :             need_password = true;
     198              :         }
     199              :     }
     200          426 :     while (need_password);
     201              : 
     202          426 :     if (PQstatus(tmpconn) != CONNECTION_OK)
     203              :     {
     204            2 :         pg_log_error("%s", PQerrorMessage(tmpconn));
     205            2 :         PQfinish(tmpconn);
     206            2 :         free(values);
     207            2 :         free(keywords);
     208            2 :         PQconninfoFree(conn_opts);
     209            2 :         return NULL;
     210              :     }
     211              : 
     212              :     /* Connection ok! */
     213          424 :     free(values);
     214          424 :     free(keywords);
     215          424 :     PQconninfoFree(conn_opts);
     216              : 
     217              :     /*
     218              :      * Set always-secure search path, so malicious users can't get control.
     219              :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     220              :      * the search path cannot be changed (by us or attackers) on earlier
     221              :      * versions.
     222              :      */
     223          424 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     224              :     {
     225              :         PGresult   *res;
     226              : 
     227           60 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     228           60 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     229              :         {
     230            0 :             pg_log_error("could not clear \"search_path\": %s",
     231              :                          PQerrorMessage(tmpconn));
     232            0 :             PQclear(res);
     233            0 :             PQfinish(tmpconn);
     234            0 :             exit(1);
     235              :         }
     236           60 :         PQclear(res);
     237              :     }
     238              : 
     239              :     /*
     240              :      * Ensure we have the same value of integer_datetimes (now always "on") as
     241              :      * the server we are connecting to.
     242              :      */
     243          424 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     244          424 :     if (!tmpparam)
     245              :     {
     246            0 :         pg_log_error("could not determine server setting for \"integer_datetimes\"");
     247            0 :         PQfinish(tmpconn);
     248            0 :         exit(1);
     249              :     }
     250              : 
     251          424 :     if (strcmp(tmpparam, "on") != 0)
     252              :     {
     253            0 :         pg_log_error("\"integer_datetimes\" compile flag does not match server");
     254            0 :         PQfinish(tmpconn);
     255            0 :         exit(1);
     256              :     }
     257              : 
     258              :     /*
     259              :      * Retrieve the source data directory mode and use it to construct a umask
     260              :      * for creating directories and files.
     261              :      */
     262          424 :     if (!RetrieveDataDirCreatePerm(tmpconn))
     263              :     {
     264            0 :         PQfinish(tmpconn);
     265            0 :         exit(1);
     266              :     }
     267              : 
     268          424 :     return tmpconn;
     269              : }
     270              : 
     271              : /*
     272              :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     273              :  * since ControlFile is not accessible here.
     274              :  */
     275              : bool
     276          206 : RetrieveWalSegSize(PGconn *conn)
     277              : {
     278              :     PGresult   *res;
     279              :     char        xlog_unit[3];
     280              :     int         xlog_val,
     281          206 :                 multiplier = 1;
     282              : 
     283              :     /* check connection existence */
     284              :     Assert(conn != NULL);
     285              : 
     286              :     /* for previous versions set the default xlog seg size */
     287          206 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     288              :     {
     289            0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     290            0 :         return true;
     291              :     }
     292              : 
     293          206 :     res = PQexec(conn, "SHOW wal_segment_size");
     294          206 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     295              :     {
     296            0 :         pg_log_error("could not send replication command \"%s\": %s",
     297              :                      "SHOW wal_segment_size", PQerrorMessage(conn));
     298              : 
     299            0 :         PQclear(res);
     300            0 :         return false;
     301              :     }
     302          206 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     303              :     {
     304            0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     305              :                      PQntuples(res), PQnfields(res), 1, 1);
     306              : 
     307            0 :         PQclear(res);
     308            0 :         return false;
     309              :     }
     310              : 
     311              :     /* fetch xlog value and unit from the result */
     312          206 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     313              :     {
     314            0 :         pg_log_error("WAL segment size could not be parsed");
     315            0 :         PQclear(res);
     316            0 :         return false;
     317              :     }
     318              : 
     319          206 :     PQclear(res);
     320              : 
     321              :     /* set the multiplier based on unit to convert xlog_val to bytes */
     322          206 :     if (strcmp(xlog_unit, "MB") == 0)
     323          206 :         multiplier = 1024 * 1024;
     324            0 :     else if (strcmp(xlog_unit, "GB") == 0)
     325            0 :         multiplier = 1024 * 1024 * 1024;
     326              : 
     327              :     /* convert and set WalSegSz */
     328          206 :     WalSegSz = xlog_val * multiplier;
     329              : 
     330          206 :     if (!IsValidWalSegSize(WalSegSz))
     331              :     {
     332            0 :         pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
     333              :                               "remote server reported invalid WAL segment size (%d bytes)",
     334              :                               WalSegSz),
     335              :                      WalSegSz);
     336            0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     337            0 :         return false;
     338              :     }
     339              : 
     340          206 :     return true;
     341              : }
     342              : 
     343              : /*
     344              :  * RetrieveDataDirCreatePerm
     345              :  *
     346              :  * This function is used to determine the privileges on the server's PG data
     347              :  * directory and, based on that, set what the permissions will be for
     348              :  * directories and files we create.
     349              :  *
     350              :  * PG11 added support for (optionally) group read/execute rights to be set on
     351              :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     352              :  * on the data directory.
     353              :  */
     354              : static bool
     355          424 : RetrieveDataDirCreatePerm(PGconn *conn)
     356              : {
     357              :     PGresult   *res;
     358              :     int         data_directory_mode;
     359              : 
     360              :     /* check connection existence */
     361              :     Assert(conn != NULL);
     362              : 
     363              :     /* for previous versions leave the default group access */
     364          424 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     365            0 :         return true;
     366              : 
     367          424 :     res = PQexec(conn, "SHOW data_directory_mode");
     368          424 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     369              :     {
     370            0 :         pg_log_error("could not send replication command \"%s\": %s",
     371              :                      "SHOW data_directory_mode", PQerrorMessage(conn));
     372              : 
     373            0 :         PQclear(res);
     374            0 :         return false;
     375              :     }
     376          424 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     377              :     {
     378            0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     379              :                      PQntuples(res), PQnfields(res), 1, 1);
     380              : 
     381            0 :         PQclear(res);
     382            0 :         return false;
     383              :     }
     384              : 
     385          424 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     386              :     {
     387            0 :         pg_log_error("group access flag could not be parsed: %s",
     388              :                      PQgetvalue(res, 0, 0));
     389              : 
     390            0 :         PQclear(res);
     391            0 :         return false;
     392              :     }
     393              : 
     394          424 :     SetDataDirectoryCreatePerm(data_directory_mode);
     395              : 
     396          424 :     PQclear(res);
     397          424 :     return true;
     398              : }
     399              : 
     400              : /*
     401              :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     402              :  * some result information if requested:
     403              :  * - System identifier
     404              :  * - Current timeline ID
     405              :  * - Start LSN position
     406              :  * - Database name (NULL in servers prior to 9.4)
     407              :  */
     408              : bool
     409          431 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     410              :                   XLogRecPtr *startpos, char **db_name)
     411              : {
     412              :     PGresult   *res;
     413              :     uint32      hi,
     414              :                 lo;
     415              : 
     416              :     /* Check connection existence */
     417              :     Assert(conn != NULL);
     418              : 
     419          431 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
     420          431 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     421              :     {
     422            0 :         pg_log_error("could not send replication command \"%s\": %s",
     423              :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     424              : 
     425            0 :         PQclear(res);
     426            0 :         return false;
     427              :     }
     428          431 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
     429              :     {
     430            0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     431              :                      PQntuples(res), PQnfields(res), 1, 3);
     432              : 
     433            0 :         PQclear(res);
     434            0 :         return false;
     435              :     }
     436              : 
     437              :     /* Get system identifier */
     438          431 :     if (sysid != NULL)
     439          362 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     440              : 
     441              :     /* Get timeline ID to start streaming from */
     442          431 :     if (starttli != NULL)
     443          362 :         *starttli = atoi(PQgetvalue(res, 0, 1));
     444              : 
     445              :     /* Get LSN start position if necessary */
     446          431 :     if (startpos != NULL)
     447              :     {
     448            7 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%08X", &hi, &lo) != 2)
     449              :         {
     450            0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
     451              :                          PQgetvalue(res, 0, 2));
     452              : 
     453            0 :             PQclear(res);
     454            0 :             return false;
     455              :         }
     456            7 :         *startpos = ((uint64) hi) << 32 | lo;
     457              :     }
     458              : 
     459              :     /* Get database name, only available in 9.4 and newer versions */
     460          431 :     if (db_name != NULL)
     461              :     {
     462           69 :         *db_name = NULL;
     463           69 :         if (PQserverVersion(conn) >= 90400)
     464              :         {
     465           69 :             if (PQnfields(res) < 4)
     466              :             {
     467            0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     468              :                              PQntuples(res), PQnfields(res), 1, 4);
     469              : 
     470            0 :                 PQclear(res);
     471            0 :                 return false;
     472              :             }
     473           69 :             if (!PQgetisnull(res, 0, 3))
     474           59 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     475              :         }
     476              :     }
     477              : 
     478          431 :     PQclear(res);
     479          431 :     return true;
     480              : }
     481              : 
     482              : /*
     483              :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     484              :  * caller some result information if requested for this slot:
     485              :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     486              :  * - Current timeline ID, 0 if unknown.
     487              :  * Returns false on failure, and true otherwise.
     488              :  */
     489              : bool
     490            3 : GetSlotInformation(PGconn *conn, const char *slot_name,
     491              :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     492              : {
     493              :     PGresult   *res;
     494              :     PQExpBuffer query;
     495            3 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
     496            3 :     TimeLineID  tli_loc = 0;
     497              : 
     498            3 :     if (restart_lsn)
     499            3 :         *restart_lsn = lsn_loc;
     500            3 :     if (restart_tli)
     501            3 :         *restart_tli = tli_loc;
     502              : 
     503            3 :     query = createPQExpBuffer();
     504            3 :     appendPQExpBufferStr(query, "READ_REPLICATION_SLOT ");
     505            3 :     AppendQuotedIdentifier(query, slot_name);
     506            3 :     res = PQexec(conn, query->data);
     507            3 :     destroyPQExpBuffer(query);
     508              : 
     509            3 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     510              :     {
     511            0 :         pg_log_error("could not send replication command \"%s\": %s",
     512              :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     513            0 :         PQclear(res);
     514            0 :         return false;
     515              :     }
     516              : 
     517              :     /* The command should always return precisely one tuple and three fields */
     518            3 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
     519              :     {
     520            0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     521              :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
     522            0 :         PQclear(res);
     523            0 :         return false;
     524              :     }
     525              : 
     526              :     /*
     527              :      * When the slot doesn't exist, the command returns a tuple with NULL
     528              :      * values.  This checks only the slot type field.
     529              :      */
     530            3 :     if (PQgetisnull(res, 0, 0))
     531              :     {
     532            1 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
     533            1 :         PQclear(res);
     534            1 :         return false;
     535              :     }
     536              : 
     537              :     /*
     538              :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     539              :      * physical slots, but play it safe.
     540              :      */
     541            2 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     542              :     {
     543            0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     544              :                      PQgetvalue(res, 0, 0));
     545            0 :         PQclear(res);
     546            0 :         return false;
     547              :     }
     548              : 
     549              :     /* restart LSN */
     550            2 :     if (!PQgetisnull(res, 0, 1))
     551              :     {
     552              :         uint32      hi,
     553              :                     lo;
     554              : 
     555            2 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &hi, &lo) != 2)
     556              :         {
     557            0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     558              :                          PQgetvalue(res, 0, 1), slot_name);
     559            0 :             PQclear(res);
     560            0 :             return false;
     561              :         }
     562            2 :         lsn_loc = ((uint64) hi) << 32 | lo;
     563              :     }
     564              : 
     565              :     /* current TLI */
     566            2 :     if (!PQgetisnull(res, 0, 2))
     567            2 :         tli_loc = (TimeLineID) atoll(PQgetvalue(res, 0, 2));
     568              : 
     569            2 :     PQclear(res);
     570              : 
     571              :     /* Assign results if requested */
     572            2 :     if (restart_lsn)
     573            2 :         *restart_lsn = lsn_loc;
     574            2 :     if (restart_tli)
     575            2 :         *restart_tli = tli_loc;
     576              : 
     577            2 :     return true;
     578              : }
     579              : 
     580              : /*
     581              :  * Create a replication slot for the given connection. This function
     582              :  * returns true in case of success.
     583              :  */
     584              : bool
     585          183 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     586              :                       bool is_temporary, bool is_physical, bool reserve_wal,
     587              :                       bool slot_exists_ok, bool two_phase, bool failover)
     588              : {
     589              :     PQExpBuffer query;
     590              :     PGresult   *res;
     591          183 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     592              : 
     593          183 :     query = createPQExpBuffer();
     594              : 
     595              :     Assert((is_physical && plugin == NULL) ||
     596              :            (!is_physical && plugin != NULL));
     597              :     Assert(!(two_phase && is_physical));
     598              :     Assert(!(failover && is_physical));
     599              :     Assert(slot_name != NULL);
     600              : 
     601              :     /* Build base portion of query */
     602          183 :     appendPQExpBufferStr(query, "CREATE_REPLICATION_SLOT ");
     603          183 :     AppendQuotedIdentifier(query, slot_name);
     604          183 :     if (is_temporary)
     605          148 :         appendPQExpBufferStr(query, " TEMPORARY");
     606          183 :     if (is_physical)
     607          152 :         appendPQExpBufferStr(query, " PHYSICAL");
     608              :     else
     609              :     {
     610           31 :         appendPQExpBufferStr(query, " LOGICAL ");
     611           31 :         AppendQuotedIdentifier(query, plugin);
     612              :     }
     613              : 
     614              :     /* Add any requested options */
     615          183 :     if (use_new_option_syntax)
     616          183 :         appendPQExpBufferStr(query, " (");
     617          183 :     if (is_physical)
     618              :     {
     619          152 :         if (reserve_wal)
     620          151 :             AppendPlainCommandOption(query, use_new_option_syntax,
     621              :                                      "RESERVE_WAL");
     622              :     }
     623              :     else
     624              :     {
     625           31 :         if (failover && PQserverVersion(conn) >= 170000)
     626            1 :             AppendPlainCommandOption(query, use_new_option_syntax,
     627              :                                      "FAILOVER");
     628              : 
     629           31 :         if (two_phase && PQserverVersion(conn) >= 150000)
     630            1 :             AppendPlainCommandOption(query, use_new_option_syntax,
     631              :                                      "TWO_PHASE");
     632              : 
     633           31 :         if (PQserverVersion(conn) >= 100000)
     634              :         {
     635              :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     636           31 :             if (use_new_option_syntax)
     637           31 :                 AppendStringCommandOption(query, use_new_option_syntax,
     638              :                                           "SNAPSHOT", "nothing");
     639              :             else
     640            0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
     641              :                                          "NOEXPORT_SNAPSHOT");
     642              :         }
     643              :     }
     644          183 :     if (use_new_option_syntax)
     645              :     {
     646              :         /* Suppress option list if it would be empty, otherwise terminate */
     647          183 :         if (query->data[query->len - 1] == '(')
     648              :         {
     649            1 :             query->len -= 2;
     650            1 :             query->data[query->len] = '\0';
     651              :         }
     652              :         else
     653          182 :             appendPQExpBufferChar(query, ')');
     654              :     }
     655              : 
     656              :     /* Now run the query */
     657          183 :     res = PQexec(conn, query->data);
     658          183 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     659              :     {
     660            1 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     661              : 
     662            1 :         if (slot_exists_ok &&
     663            0 :             sqlstate &&
     664            0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     665              :         {
     666            0 :             destroyPQExpBuffer(query);
     667            0 :             PQclear(res);
     668            0 :             return true;
     669              :         }
     670              :         else
     671              :         {
     672            1 :             pg_log_error("could not send replication command \"%s\": %s",
     673              :                          query->data, PQerrorMessage(conn));
     674              : 
     675            1 :             destroyPQExpBuffer(query);
     676            1 :             PQclear(res);
     677            1 :             return false;
     678              :         }
     679              :     }
     680              : 
     681          182 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
     682              :     {
     683            0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     684              :                      slot_name,
     685              :                      PQntuples(res), PQnfields(res), 1, 4);
     686              : 
     687            0 :         destroyPQExpBuffer(query);
     688            0 :         PQclear(res);
     689            0 :         return false;
     690              :     }
     691              : 
     692          182 :     destroyPQExpBuffer(query);
     693          182 :     PQclear(res);
     694          182 :     return true;
     695              : }
     696              : 
     697              : /*
     698              :  * Drop a replication slot for the given connection. This function
     699              :  * returns true in case of success.
     700              :  */
     701              : bool
     702            4 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     703              : {
     704              :     PQExpBuffer query;
     705              :     PGresult   *res;
     706              : 
     707              :     Assert(slot_name != NULL);
     708              : 
     709            4 :     query = createPQExpBuffer();
     710              : 
     711              :     /* Build query */
     712            4 :     appendPQExpBufferStr(query, "DROP_REPLICATION_SLOT ");
     713            4 :     AppendQuotedIdentifier(query, slot_name);
     714            4 :     res = PQexec(conn, query->data);
     715            4 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     716              :     {
     717            0 :         pg_log_error("could not send replication command \"%s\": %s",
     718              :                      query->data, PQerrorMessage(conn));
     719              : 
     720            0 :         destroyPQExpBuffer(query);
     721            0 :         PQclear(res);
     722            0 :         return false;
     723              :     }
     724              : 
     725            4 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
     726              :     {
     727            0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     728              :                      slot_name,
     729              :                      PQntuples(res), PQnfields(res), 0, 0);
     730              : 
     731            0 :         destroyPQExpBuffer(query);
     732            0 :         PQclear(res);
     733            0 :         return false;
     734              :     }
     735              : 
     736            4 :     destroyPQExpBuffer(query);
     737            4 :     PQclear(res);
     738            4 :     return true;
     739              : }
     740              : 
     741              : /*
     742              :  * Append a suitably-quoted identifier or string literal to buf.
     743              :  * "quote" should be either a double-quote or single-quote character.
     744              :  *
     745              :  * Caution: this quoting logic is sufficient for identifiers and literals
     746              :  * in the replication grammar, but not always in regular SQL.  Specifically,
     747              :  * it'd fail for a string literal if standard_conforming_strings is off.
     748              :  */
     749              : void
     750         1356 : AppendQuotedString(PQExpBuffer buf, const char *str, char quote)
     751              : {
     752         1356 :     appendPQExpBufferChar(buf, quote);
     753        17957 :     while (*str)
     754              :     {
     755        16601 :         char        c = *str++;
     756              : 
     757        16601 :         if (c == quote)
     758            0 :             appendPQExpBufferChar(buf, c);
     759        16601 :         appendPQExpBufferChar(buf, c);
     760              :     }
     761         1356 :     appendPQExpBufferChar(buf, quote);
     762         1356 : }
     763              : 
     764              : /*
     765              :  * Append a "plain" option - one with no value - to a server command that
     766              :  * is being constructed.
     767              :  *
     768              :  * In the old syntax, all options were parser keywords, so you could just
     769              :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     770              :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     771              :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     772              :  *
     773              :  * Note: we assume option names do not require quotes.  Do not use this
     774              :  * with option names coming from outside sources.
     775              :  */
     776              : void
     777         1479 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     778              :                          const char *option_name)
     779              : {
     780         1479 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     781              :     {
     782         1099 :         if (use_new_option_syntax)
     783         1099 :             appendPQExpBufferStr(buf, ", ");
     784              :         else
     785            0 :             appendPQExpBufferChar(buf, ' ');
     786              :     }
     787              : 
     788         1479 :     appendPQExpBuffer(buf, " %s", option_name);
     789         1479 : }
     790              : 
     791              : /*
     792              :  * Append an option with an associated string value to a server command that
     793              :  * is being constructed.
     794              :  *
     795              :  * See comments for AppendPlainCommandOption, above.
     796              :  */
     797              : void
     798          872 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     799              :                           const char *option_name, const char *option_value)
     800              : {
     801          872 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     802              : 
     803          872 :     if (option_value != NULL)
     804              :     {
     805          872 :         appendPQExpBufferChar(buf, ' ');
     806          872 :         AppendQuotedLiteral(buf, option_value);
     807              :     }
     808          872 : }
     809              : 
     810              : /*
     811              :  * Append an option with an associated integer value to a server command that
     812              :  * is being constructed.
     813              :  *
     814              :  * See comments for AppendPlainCommandOption, above.
     815              :  */
     816              : void
     817          190 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     818              :                            const char *option_name, int32 option_value)
     819              : {
     820          190 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     821              : 
     822          190 :     appendPQExpBuffer(buf, " %d", option_value);
     823          190 : }
     824              : 
     825              : /*
     826              :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     827              :  * backend code.
     828              :  */
     829              : TimestampTz
     830         1167 : feGetCurrentTimestamp(void)
     831              : {
     832              :     TimestampTz result;
     833              :     struct timeval tp;
     834              : 
     835         1167 :     gettimeofday(&tp, NULL);
     836              : 
     837         1167 :     result = (TimestampTz) tp.tv_sec -
     838              :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     839         1167 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
     840              : 
     841         1167 :     return result;
     842              : }
     843              : 
     844              : /*
     845              :  * Frontend version of TimestampDifference(), since we are not linked with
     846              :  * backend code.
     847              :  */
     848              : void
     849          652 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     850              :                       long *secs, int *microsecs)
     851              : {
     852          652 :     TimestampTz diff = stop_time - start_time;
     853              : 
     854          652 :     if (diff <= 0)
     855              :     {
     856            0 :         *secs = 0;
     857            0 :         *microsecs = 0;
     858              :     }
     859              :     else
     860              :     {
     861          652 :         *secs = (long) (diff / USECS_PER_SEC);
     862          652 :         *microsecs = (int) (diff % USECS_PER_SEC);
     863              :     }
     864          652 : }
     865              : 
     866              : /*
     867              :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     868              :  * linked with backend code.
     869              :  */
     870              : bool
     871         1611 : feTimestampDifferenceExceeds(TimestampTz start_time,
     872              :                              TimestampTz stop_time,
     873              :                              int msec)
     874              : {
     875         1611 :     TimestampTz diff = stop_time - start_time;
     876              : 
     877         1611 :     return (diff >= msec * INT64CONST(1000));
     878              : }
     879              : 
     880              : /*
     881              :  * Converts an int64 to network byte order.
     882              :  */
     883              : void
     884          740 : fe_sendint64(int64 i, char *buf)
     885              : {
     886          740 :     uint64      n64 = pg_hton64(i);
     887              : 
     888          740 :     memcpy(buf, &n64, sizeof(n64));
     889          740 : }
     890              : 
     891              : /*
     892              :  * Converts an int64 from network byte order to native format.
     893              :  */
     894              : int64
     895         1830 : fe_recvint64(char *buf)
     896              : {
     897              :     uint64      n64;
     898              : 
     899         1830 :     memcpy(&n64, buf, sizeof(n64));
     900              : 
     901         1830 :     return pg_ntoh64(n64);
     902              : }
        

Generated by: LCOV version 2.0-1