LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 71.2 % 323 230
Test Date: 2026-03-03 14:15:12 Functions: 100.0 % 15 15
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4              :  *                  pg_recvlogical
       5              :  *
       6              :  * Author: Magnus Hagander <magnus@hagander.net>
       7              :  *
       8              :  * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
       9              :  *
      10              :  * IDENTIFICATION
      11              :  *        src/bin/pg_basebackup/streamutil.c
      12              :  *-------------------------------------------------------------------------
      13              :  */
      14              : 
      15              : #include "postgres_fe.h"
      16              : 
      17              : #include <sys/time.h>
      18              : #include <unistd.h>
      19              : 
      20              : #include "access/xlog_internal.h"
      21              : #include "common/connect.h"
      22              : #include "common/file_perm.h"
      23              : #include "common/logging.h"
      24              : #include "common/string.h"
      25              : #include "datatype/timestamp.h"
      26              : #include "port/pg_bswap.h"
      27              : #include "pqexpbuffer.h"
      28              : #include "streamutil.h"
      29              : 
      30              : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      31              : 
      32              : int         WalSegSz;
      33              : 
      34              : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      35              : 
      36              : /* SHOW command for replication connection was introduced in version 10 */
      37              : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      38              : 
      39              : /*
      40              :  * Group access is supported from version 11.
      41              :  */
      42              : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      43              : 
      44              : const char *progname;
      45              : char       *connection_string = NULL;
      46              : char       *dbhost = NULL;
      47              : char       *dbuser = NULL;
      48              : char       *dbport = NULL;
      49              : char       *dbname = NULL;
      50              : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
      51              : static char *password = NULL;
      52              : PGconn     *conn = NULL;
      53              : 
      54              : /*
      55              :  * Connect to the server. Returns a valid PGconn pointer if connected,
      56              :  * or NULL on non-permanent error. On permanent error, the function will
      57              :  * call exit(1) directly.
      58              :  */
      59              : PGconn *
      60          404 : GetConnection(void)
      61              : {
      62              :     PGconn     *tmpconn;
      63          404 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
      64              :                                  * host, user, port, password */
      65              :     int         i;
      66              :     const char **keywords;
      67              :     const char **values;
      68              :     const char *tmpparam;
      69              :     bool        need_password;
      70          404 :     PQconninfoOption *conn_opts = NULL;
      71              :     PQconninfoOption *conn_opt;
      72          404 :     char       *err_msg = NULL;
      73              : 
      74              :     /*
      75              :      * pg_recvlogical uses dbname only; others use connection_string only.
      76              :      * (Note: both variables will be NULL if there's no command line options.)
      77              :      */
      78              :     Assert(dbname == NULL || connection_string == NULL);
      79              : 
      80              :     /*
      81              :      * Merge the connection info inputs given in form of connection string,
      82              :      * options and default values (dbname=replication, replication=true, etc.)
      83              :      */
      84          404 :     i = 0;
      85          404 :     if (connection_string)
      86              :     {
      87            4 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
      88            4 :         if (conn_opts == NULL)
      89            0 :             pg_fatal("%s", err_msg);
      90              : 
      91          208 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      92              :         {
      93          204 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
      94            8 :                 argcount++;
      95              :         }
      96              : 
      97            4 :         keywords = pg_malloc0_array(const char *, argcount + 1);
      98            4 :         values = pg_malloc0_array(const char *, argcount + 1);
      99              : 
     100              :         /*
     101              :          * Set dbname here already, so it can be overridden by a dbname in the
     102              :          * connection string.
     103              :          */
     104            4 :         keywords[i] = "dbname";
     105            4 :         values[i] = "replication";
     106            4 :         i++;
     107              : 
     108          208 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     109              :         {
     110          204 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     111              :             {
     112            8 :                 keywords[i] = conn_opt->keyword;
     113            8 :                 values[i] = conn_opt->val;
     114            8 :                 i++;
     115              :             }
     116              :         }
     117              :     }
     118              :     else
     119              :     {
     120          400 :         keywords = pg_malloc0_array(const char *, argcount + 1);
     121          400 :         values = pg_malloc0_array(const char *, argcount + 1);
     122          400 :         keywords[i] = "dbname";
     123          400 :         values[i] = (dbname == NULL) ? "replication" : dbname;
     124          400 :         i++;
     125              :     }
     126              : 
     127          404 :     keywords[i] = "replication";
     128          404 :     values[i] = (dbname == NULL) ? "true" : "database";
     129          404 :     i++;
     130          404 :     keywords[i] = "fallback_application_name";
     131          404 :     values[i] = progname;
     132          404 :     i++;
     133              : 
     134          404 :     if (dbhost)
     135              :     {
     136          151 :         keywords[i] = "host";
     137          151 :         values[i] = dbhost;
     138          151 :         i++;
     139              :     }
     140          404 :     if (dbuser)
     141              :     {
     142            7 :         keywords[i] = "user";
     143            7 :         values[i] = dbuser;
     144            7 :         i++;
     145              :     }
     146          404 :     if (dbport)
     147              :     {
     148          151 :         keywords[i] = "port";
     149          151 :         values[i] = dbport;
     150          151 :         i++;
     151              :     }
     152              : 
     153              :     /* If -W was given, force prompt for password, but only the first time */
     154          404 :     need_password = (dbgetpassword == 1 && !password);
     155              : 
     156              :     do
     157              :     {
     158              :         /* Get a new password if appropriate */
     159          404 :         if (need_password)
     160              :         {
     161            0 :             free(password);
     162            0 :             password = simple_prompt("Password: ", false);
     163            0 :             need_password = false;
     164              :         }
     165              : 
     166              :         /* Use (or reuse, on a subsequent connection) password if we have it */
     167          404 :         if (password)
     168              :         {
     169            0 :             keywords[i] = "password";
     170            0 :             values[i] = password;
     171              :         }
     172              :         else
     173              :         {
     174          404 :             keywords[i] = NULL;
     175          404 :             values[i] = NULL;
     176              :         }
     177              : 
     178              :         /*
     179              :          * Only expand dbname when we did not already parse the argument as a
     180              :          * connection string ourselves.
     181              :          */
     182          404 :         tmpconn = PQconnectdbParams(keywords, values, !connection_string);
     183              : 
     184              :         /*
     185              :          * If there is too little memory even to allocate the PGconn object
     186              :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
     187              :          */
     188          404 :         if (!tmpconn)
     189            0 :             pg_fatal("could not connect to server");
     190              : 
     191              :         /* If we need a password and -w wasn't given, loop back and get one */
     192          406 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
     193            2 :             PQconnectionNeedsPassword(tmpconn) &&
     194            0 :             dbgetpassword != -1)
     195              :         {
     196            0 :             PQfinish(tmpconn);
     197            0 :             need_password = true;
     198              :         }
     199              :     }
     200          404 :     while (need_password);
     201              : 
     202          404 :     if (PQstatus(tmpconn) != CONNECTION_OK)
     203              :     {
     204            2 :         pg_log_error("%s", PQerrorMessage(tmpconn));
     205            2 :         PQfinish(tmpconn);
     206            2 :         free(values);
     207            2 :         free(keywords);
     208            2 :         PQconninfoFree(conn_opts);
     209            2 :         return NULL;
     210              :     }
     211              : 
     212              :     /* Connection ok! */
     213          402 :     free(values);
     214          402 :     free(keywords);
     215          402 :     PQconninfoFree(conn_opts);
     216              : 
     217              :     /*
     218              :      * Set always-secure search path, so malicious users can't get control.
     219              :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     220              :      * the search path cannot be changed (by us or attackers) on earlier
     221              :      * versions.
     222              :      */
     223          402 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     224              :     {
     225              :         PGresult   *res;
     226              : 
     227           56 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     228           56 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     229              :         {
     230            0 :             pg_log_error("could not clear \"search_path\": %s",
     231              :                          PQerrorMessage(tmpconn));
     232            0 :             PQclear(res);
     233            0 :             PQfinish(tmpconn);
     234            0 :             exit(1);
     235              :         }
     236           56 :         PQclear(res);
     237              :     }
     238              : 
     239              :     /*
     240              :      * Ensure we have the same value of integer_datetimes (now always "on") as
     241              :      * the server we are connecting to.
     242              :      */
     243          402 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     244          402 :     if (!tmpparam)
     245              :     {
     246            0 :         pg_log_error("could not determine server setting for \"integer_datetimes\"");
     247            0 :         PQfinish(tmpconn);
     248            0 :         exit(1);
     249              :     }
     250              : 
     251          402 :     if (strcmp(tmpparam, "on") != 0)
     252              :     {
     253            0 :         pg_log_error("\"integer_datetimes\" compile flag does not match server");
     254            0 :         PQfinish(tmpconn);
     255            0 :         exit(1);
     256              :     }
     257              : 
     258              :     /*
     259              :      * Retrieve the source data directory mode and use it to construct a umask
     260              :      * for creating directories and files.
     261              :      */
     262          402 :     if (!RetrieveDataDirCreatePerm(tmpconn))
     263              :     {
     264            0 :         PQfinish(tmpconn);
     265            0 :         exit(1);
     266              :     }
     267              : 
     268          402 :     return tmpconn;
     269              : }
     270              : 
     271              : /*
     272              :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     273              :  * since ControlFile is not accessible here.
     274              :  */
     275              : bool
     276          197 : RetrieveWalSegSize(PGconn *conn)
     277              : {
     278              :     PGresult   *res;
     279              :     char        xlog_unit[3];
     280              :     int         xlog_val,
     281          197 :                 multiplier = 1;
     282              : 
     283              :     /* check connection existence */
     284              :     Assert(conn != NULL);
     285              : 
     286              :     /* for previous versions set the default xlog seg size */
     287          197 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     288              :     {
     289            0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     290            0 :         return true;
     291              :     }
     292              : 
     293          197 :     res = PQexec(conn, "SHOW wal_segment_size");
     294          197 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     295              :     {
     296            0 :         pg_log_error("could not send replication command \"%s\": %s",
     297              :                      "SHOW wal_segment_size", PQerrorMessage(conn));
     298              : 
     299            0 :         PQclear(res);
     300            0 :         return false;
     301              :     }
     302          197 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     303              :     {
     304            0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     305              :                      PQntuples(res), PQnfields(res), 1, 1);
     306              : 
     307            0 :         PQclear(res);
     308            0 :         return false;
     309              :     }
     310              : 
     311              :     /* fetch xlog value and unit from the result */
     312          197 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     313              :     {
     314            0 :         pg_log_error("WAL segment size could not be parsed");
     315            0 :         PQclear(res);
     316            0 :         return false;
     317              :     }
     318              : 
     319          197 :     PQclear(res);
     320              : 
     321              :     /* set the multiplier based on unit to convert xlog_val to bytes */
     322          197 :     if (strcmp(xlog_unit, "MB") == 0)
     323          197 :         multiplier = 1024 * 1024;
     324            0 :     else if (strcmp(xlog_unit, "GB") == 0)
     325            0 :         multiplier = 1024 * 1024 * 1024;
     326              : 
     327              :     /* convert and set WalSegSz */
     328          197 :     WalSegSz = xlog_val * multiplier;
     329              : 
     330          197 :     if (!IsValidWalSegSize(WalSegSz))
     331              :     {
     332            0 :         pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
     333              :                               "remote server reported invalid WAL segment size (%d bytes)",
     334              :                               WalSegSz),
     335              :                      WalSegSz);
     336            0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     337            0 :         return false;
     338              :     }
     339              : 
     340          197 :     return true;
     341              : }
     342              : 
     343              : /*
     344              :  * RetrieveDataDirCreatePerm
     345              :  *
     346              :  * This function is used to determine the privileges on the server's PG data
     347              :  * directory and, based on that, set what the permissions will be for
     348              :  * directories and files we create.
     349              :  *
     350              :  * PG11 added support for (optionally) group read/execute rights to be set on
     351              :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     352              :  * on the data directory.
     353              :  */
     354              : static bool
     355          402 : RetrieveDataDirCreatePerm(PGconn *conn)
     356              : {
     357              :     PGresult   *res;
     358              :     int         data_directory_mode;
     359              : 
     360              :     /* check connection existence */
     361              :     Assert(conn != NULL);
     362              : 
     363              :     /* for previous versions leave the default group access */
     364          402 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     365            0 :         return true;
     366              : 
     367          402 :     res = PQexec(conn, "SHOW data_directory_mode");
     368          402 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     369              :     {
     370            0 :         pg_log_error("could not send replication command \"%s\": %s",
     371              :                      "SHOW data_directory_mode", PQerrorMessage(conn));
     372              : 
     373            0 :         PQclear(res);
     374            0 :         return false;
     375              :     }
     376          402 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     377              :     {
     378            0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     379              :                      PQntuples(res), PQnfields(res), 1, 1);
     380              : 
     381            0 :         PQclear(res);
     382            0 :         return false;
     383              :     }
     384              : 
     385          402 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     386              :     {
     387            0 :         pg_log_error("group access flag could not be parsed: %s",
     388              :                      PQgetvalue(res, 0, 0));
     389              : 
     390            0 :         PQclear(res);
     391            0 :         return false;
     392              :     }
     393              : 
     394          402 :     SetDataDirectoryCreatePerm(data_directory_mode);
     395              : 
     396          402 :     PQclear(res);
     397          402 :     return true;
     398              : }
     399              : 
     400              : /*
     401              :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     402              :  * some result information if requested:
     403              :  * - System identifier
     404              :  * - Current timeline ID
     405              :  * - Start LSN position
     406              :  * - Database name (NULL in servers prior to 9.4)
     407              :  */
     408              : bool
     409          409 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     410              :                   XLogRecPtr *startpos, char **db_name)
     411              : {
     412              :     PGresult   *res;
     413              :     uint32      hi,
     414              :                 lo;
     415              : 
     416              :     /* Check connection existence */
     417              :     Assert(conn != NULL);
     418              : 
     419          409 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
     420          409 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     421              :     {
     422            0 :         pg_log_error("could not send replication command \"%s\": %s",
     423              :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     424              : 
     425            0 :         PQclear(res);
     426            0 :         return false;
     427              :     }
     428          409 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
     429              :     {
     430            0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     431              :                      PQntuples(res), PQnfields(res), 1, 3);
     432              : 
     433            0 :         PQclear(res);
     434            0 :         return false;
     435              :     }
     436              : 
     437              :     /* Get system identifier */
     438          409 :     if (sysid != NULL)
     439          344 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     440              : 
     441              :     /* Get timeline ID to start streaming from */
     442          409 :     if (starttli != NULL)
     443          344 :         *starttli = atoi(PQgetvalue(res, 0, 1));
     444              : 
     445              :     /* Get LSN start position if necessary */
     446          409 :     if (startpos != NULL)
     447              :     {
     448            7 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%08X", &hi, &lo) != 2)
     449              :         {
     450            0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
     451              :                          PQgetvalue(res, 0, 2));
     452              : 
     453            0 :             PQclear(res);
     454            0 :             return false;
     455              :         }
     456            7 :         *startpos = ((uint64) hi) << 32 | lo;
     457              :     }
     458              : 
     459              :     /* Get database name, only available in 9.4 and newer versions */
     460          409 :     if (db_name != NULL)
     461              :     {
     462           65 :         *db_name = NULL;
     463           65 :         if (PQserverVersion(conn) >= 90400)
     464              :         {
     465           65 :             if (PQnfields(res) < 4)
     466              :             {
     467            0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     468              :                              PQntuples(res), PQnfields(res), 1, 4);
     469              : 
     470            0 :                 PQclear(res);
     471            0 :                 return false;
     472              :             }
     473           65 :             if (!PQgetisnull(res, 0, 3))
     474           55 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     475              :         }
     476              :     }
     477              : 
     478          409 :     PQclear(res);
     479          409 :     return true;
     480              : }
     481              : 
     482              : /*
     483              :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     484              :  * caller some result information if requested for this slot:
     485              :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     486              :  * - Current timeline ID, 0 if unknown.
     487              :  * Returns false on failure, and true otherwise.
     488              :  */
     489              : bool
     490            3 : GetSlotInformation(PGconn *conn, const char *slot_name,
     491              :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     492              : {
     493              :     PGresult   *res;
     494              :     PQExpBuffer query;
     495            3 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
     496            3 :     TimeLineID  tli_loc = 0;
     497              : 
     498            3 :     if (restart_lsn)
     499            3 :         *restart_lsn = lsn_loc;
     500            3 :     if (restart_tli)
     501            3 :         *restart_tli = tli_loc;
     502              : 
     503            3 :     query = createPQExpBuffer();
     504            3 :     appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
     505            3 :     res = PQexec(conn, query->data);
     506            3 :     destroyPQExpBuffer(query);
     507              : 
     508            3 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     509              :     {
     510            0 :         pg_log_error("could not send replication command \"%s\": %s",
     511              :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     512            0 :         PQclear(res);
     513            0 :         return false;
     514              :     }
     515              : 
     516              :     /* The command should always return precisely one tuple and three fields */
     517            3 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
     518              :     {
     519            0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     520              :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
     521            0 :         PQclear(res);
     522            0 :         return false;
     523              :     }
     524              : 
     525              :     /*
     526              :      * When the slot doesn't exist, the command returns a tuple with NULL
     527              :      * values.  This checks only the slot type field.
     528              :      */
     529            3 :     if (PQgetisnull(res, 0, 0))
     530              :     {
     531            1 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
     532            1 :         PQclear(res);
     533            1 :         return false;
     534              :     }
     535              : 
     536              :     /*
     537              :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     538              :      * physical slots, but play it safe.
     539              :      */
     540            2 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     541              :     {
     542            0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     543              :                      PQgetvalue(res, 0, 0));
     544            0 :         PQclear(res);
     545            0 :         return false;
     546              :     }
     547              : 
     548              :     /* restart LSN */
     549            2 :     if (!PQgetisnull(res, 0, 1))
     550              :     {
     551              :         uint32      hi,
     552              :                     lo;
     553              : 
     554            2 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%08X", &hi, &lo) != 2)
     555              :         {
     556            0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     557              :                          PQgetvalue(res, 0, 1), slot_name);
     558            0 :             PQclear(res);
     559            0 :             return false;
     560              :         }
     561            2 :         lsn_loc = ((uint64) hi) << 32 | lo;
     562              :     }
     563              : 
     564              :     /* current TLI */
     565            2 :     if (!PQgetisnull(res, 0, 2))
     566            2 :         tli_loc = (TimeLineID) atoll(PQgetvalue(res, 0, 2));
     567              : 
     568            2 :     PQclear(res);
     569              : 
     570              :     /* Assign results if requested */
     571            2 :     if (restart_lsn)
     572            2 :         *restart_lsn = lsn_loc;
     573            2 :     if (restart_tli)
     574            2 :         *restart_tli = tli_loc;
     575              : 
     576            2 :     return true;
     577              : }
     578              : 
     579              : /*
     580              :  * Create a replication slot for the given connection. This function
     581              :  * returns true in case of success.
     582              :  */
     583              : bool
     584          172 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     585              :                       bool is_temporary, bool is_physical, bool reserve_wal,
     586              :                       bool slot_exists_ok, bool two_phase, bool failover)
     587              : {
     588              :     PQExpBuffer query;
     589              :     PGresult   *res;
     590          172 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     591              : 
     592          172 :     query = createPQExpBuffer();
     593              : 
     594              :     Assert((is_physical && plugin == NULL) ||
     595              :            (!is_physical && plugin != NULL));
     596              :     Assert(!(two_phase && is_physical));
     597              :     Assert(!(failover && is_physical));
     598              :     Assert(slot_name != NULL);
     599              : 
     600              :     /* Build base portion of query */
     601          172 :     appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
     602          172 :     if (is_temporary)
     603          140 :         appendPQExpBufferStr(query, " TEMPORARY");
     604          172 :     if (is_physical)
     605          143 :         appendPQExpBufferStr(query, " PHYSICAL");
     606              :     else
     607           29 :         appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
     608              : 
     609              :     /* Add any requested options */
     610          172 :     if (use_new_option_syntax)
     611          172 :         appendPQExpBufferStr(query, " (");
     612          172 :     if (is_physical)
     613              :     {
     614          143 :         if (reserve_wal)
     615          142 :             AppendPlainCommandOption(query, use_new_option_syntax,
     616              :                                      "RESERVE_WAL");
     617              :     }
     618              :     else
     619              :     {
     620           29 :         if (failover && PQserverVersion(conn) >= 170000)
     621            1 :             AppendPlainCommandOption(query, use_new_option_syntax,
     622              :                                      "FAILOVER");
     623              : 
     624           29 :         if (two_phase && PQserverVersion(conn) >= 150000)
     625            1 :             AppendPlainCommandOption(query, use_new_option_syntax,
     626              :                                      "TWO_PHASE");
     627              : 
     628           29 :         if (PQserverVersion(conn) >= 100000)
     629              :         {
     630              :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     631           29 :             if (use_new_option_syntax)
     632           29 :                 AppendStringCommandOption(query, use_new_option_syntax,
     633              :                                           "SNAPSHOT", "nothing");
     634              :             else
     635            0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
     636              :                                          "NOEXPORT_SNAPSHOT");
     637              :         }
     638              :     }
     639          172 :     if (use_new_option_syntax)
     640              :     {
     641              :         /* Suppress option list if it would be empty, otherwise terminate */
     642          172 :         if (query->data[query->len - 1] == '(')
     643              :         {
     644            1 :             query->len -= 2;
     645            1 :             query->data[query->len] = '\0';
     646              :         }
     647              :         else
     648          171 :             appendPQExpBufferChar(query, ')');
     649              :     }
     650              : 
     651              :     /* Now run the query */
     652          172 :     res = PQexec(conn, query->data);
     653          172 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     654              :     {
     655            1 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     656              : 
     657            1 :         if (slot_exists_ok &&
     658            0 :             sqlstate &&
     659            0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     660              :         {
     661            0 :             destroyPQExpBuffer(query);
     662            0 :             PQclear(res);
     663            0 :             return true;
     664              :         }
     665              :         else
     666              :         {
     667            1 :             pg_log_error("could not send replication command \"%s\": %s",
     668              :                          query->data, PQerrorMessage(conn));
     669              : 
     670            1 :             destroyPQExpBuffer(query);
     671            1 :             PQclear(res);
     672            1 :             return false;
     673              :         }
     674              :     }
     675              : 
     676          171 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
     677              :     {
     678            0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     679              :                      slot_name,
     680              :                      PQntuples(res), PQnfields(res), 1, 4);
     681              : 
     682            0 :         destroyPQExpBuffer(query);
     683            0 :         PQclear(res);
     684            0 :         return false;
     685              :     }
     686              : 
     687          171 :     destroyPQExpBuffer(query);
     688          171 :     PQclear(res);
     689          171 :     return true;
     690              : }
     691              : 
     692              : /*
     693              :  * Drop a replication slot for the given connection. This function
     694              :  * returns true in case of success.
     695              :  */
     696              : bool
     697            4 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     698              : {
     699              :     PQExpBuffer query;
     700              :     PGresult   *res;
     701              : 
     702              :     Assert(slot_name != NULL);
     703              : 
     704            4 :     query = createPQExpBuffer();
     705              : 
     706              :     /* Build query */
     707            4 :     appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
     708              :                       slot_name);
     709            4 :     res = PQexec(conn, query->data);
     710            4 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     711              :     {
     712            0 :         pg_log_error("could not send replication command \"%s\": %s",
     713              :                      query->data, PQerrorMessage(conn));
     714              : 
     715            0 :         destroyPQExpBuffer(query);
     716            0 :         PQclear(res);
     717            0 :         return false;
     718              :     }
     719              : 
     720            4 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
     721              :     {
     722            0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     723              :                      slot_name,
     724              :                      PQntuples(res), PQnfields(res), 0, 0);
     725              : 
     726            0 :         destroyPQExpBuffer(query);
     727            0 :         PQclear(res);
     728            0 :         return false;
     729              :     }
     730              : 
     731            4 :     destroyPQExpBuffer(query);
     732            4 :     PQclear(res);
     733            4 :     return true;
     734              : }
     735              : 
     736              : /*
     737              :  * Append a "plain" option - one with no value - to a server command that
     738              :  * is being constructed.
     739              :  *
     740              :  * In the old syntax, all options were parser keywords, so you could just
     741              :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     742              :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     743              :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     744              :  */
     745              : void
     746         1413 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     747              :                          char *option_name)
     748              : {
     749         1413 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     750              :     {
     751         1053 :         if (use_new_option_syntax)
     752         1053 :             appendPQExpBufferStr(buf, ", ");
     753              :         else
     754            0 :             appendPQExpBufferChar(buf, ' ');
     755              :     }
     756              : 
     757         1413 :     appendPQExpBuffer(buf, " %s", option_name);
     758         1413 : }
     759              : 
     760              : /*
     761              :  * Append an option with an associated string value to a server command that
     762              :  * is being constructed.
     763              :  *
     764              :  * See comments for AppendPlainCommandOption, above.
     765              :  */
     766              : void
     767          834 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     768              :                           char *option_name, char *option_value)
     769              : {
     770          834 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     771              : 
     772          834 :     if (option_value != NULL)
     773              :     {
     774          834 :         size_t      length = strlen(option_value);
     775          834 :         char       *escaped_value = palloc(1 + 2 * length);
     776              : 
     777          834 :         PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
     778          834 :         appendPQExpBuffer(buf, " '%s'", escaped_value);
     779          834 :         pfree(escaped_value);
     780              :     }
     781          834 : }
     782              : 
     783              : /*
     784              :  * Append an option with an associated integer value to a server command
     785              :  * is being constructed.
     786              :  *
     787              :  * See comments for AppendPlainCommandOption, above.
     788              :  */
     789              : void
     790          181 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     791              :                            char *option_name, int32 option_value)
     792              : {
     793          181 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     794              : 
     795          181 :     appendPQExpBuffer(buf, " %d", option_value);
     796          181 : }
     797              : 
     798              : /*
     799              :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     800              :  * backend code.
     801              :  */
     802              : TimestampTz
     803          895 : feGetCurrentTimestamp(void)
     804              : {
     805              :     TimestampTz result;
     806              :     struct timeval tp;
     807              : 
     808          895 :     gettimeofday(&tp, NULL);
     809              : 
     810          895 :     result = (TimestampTz) tp.tv_sec -
     811              :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     812          895 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
     813              : 
     814          895 :     return result;
     815              : }
     816              : 
     817              : /*
     818              :  * Frontend version of TimestampDifference(), since we are not linked with
     819              :  * backend code.
     820              :  */
     821              : void
     822          612 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     823              :                       long *secs, int *microsecs)
     824              : {
     825          612 :     TimestampTz diff = stop_time - start_time;
     826              : 
     827          612 :     if (diff <= 0)
     828              :     {
     829            0 :         *secs = 0;
     830            0 :         *microsecs = 0;
     831              :     }
     832              :     else
     833              :     {
     834          612 :         *secs = (long) (diff / USECS_PER_SEC);
     835          612 :         *microsecs = (int) (diff % USECS_PER_SEC);
     836              :     }
     837          612 : }
     838              : 
     839              : /*
     840              :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     841              :  * linked with backend code.
     842              :  */
     843              : bool
     844         1080 : feTimestampDifferenceExceeds(TimestampTz start_time,
     845              :                              TimestampTz stop_time,
     846              :                              int msec)
     847              : {
     848         1080 :     TimestampTz diff = stop_time - start_time;
     849              : 
     850         1080 :     return (diff >= msec * INT64CONST(1000));
     851              : }
     852              : 
     853              : /*
     854              :  * Converts an int64 to network byte order.
     855              :  */
     856              : void
     857          696 : fe_sendint64(int64 i, char *buf)
     858              : {
     859          696 :     uint64      n64 = pg_hton64(i);
     860              : 
     861          696 :     memcpy(buf, &n64, sizeof(n64));
     862          696 : }
     863              : 
     864              : /*
     865              :  * Converts an int64 from network byte order to native format.
     866              :  */
     867              : int64
     868         1574 : fe_recvint64(char *buf)
     869              : {
     870              :     uint64      n64;
     871              : 
     872         1574 :     memcpy(&n64, buf, sizeof(n64));
     873              : 
     874         1574 :     return pg_ntoh64(n64);
     875              : }
        

Generated by: LCOV version 2.0-1