LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 228 321 71.0 %
Date: 2025-04-01 15:15:16 Functions: 15 15 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4             :  *                  pg_recvlogical
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/streamutil.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/time.h>
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xlog_internal.h"
      21             : #include "common/connect.h"
      22             : #include "common/file_perm.h"
      23             : #include "common/logging.h"
      24             : #include "common/string.h"
      25             : #include "datatype/timestamp.h"
      26             : #include "port/pg_bswap.h"
      27             : #include "pqexpbuffer.h"
      28             : #include "streamutil.h"
      29             : 
      30             : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      31             : 
      32             : int         WalSegSz;
      33             : 
      34             : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      35             : 
      36             : /* SHOW command for replication connection was introduced in version 10 */
      37             : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      38             : 
      39             : /*
      40             :  * Group access is supported from version 11.
      41             :  */
      42             : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      43             : 
      44             : const char *progname;
      45             : char       *connection_string = NULL;
      46             : char       *dbhost = NULL;
      47             : char       *dbuser = NULL;
      48             : char       *dbport = NULL;
      49             : char       *dbname = NULL;
      50             : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
      51             : static char *password = NULL;
      52             : PGconn     *conn = NULL;
      53             : 
      54             : /*
      55             :  * Connect to the server. Returns a valid PGconn pointer if connected,
      56             :  * or NULL on non-permanent error. On permanent error, the function will
      57             :  * call exit(1) directly.
      58             :  */
      59             : PGconn *
      60         748 : GetConnection(void)
      61             : {
      62             :     PGconn     *tmpconn;
      63         748 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
      64             :                                  * host, user, port, password */
      65             :     int         i;
      66             :     const char **keywords;
      67             :     const char **values;
      68             :     const char *tmpparam;
      69             :     bool        need_password;
      70         748 :     PQconninfoOption *conn_opts = NULL;
      71             :     PQconninfoOption *conn_opt;
      72         748 :     char       *err_msg = NULL;
      73             : 
      74             :     /*
      75             :      * pg_recvlogical uses dbname only; others use connection_string only.
      76             :      * (Note: both variables will be NULL if there's no command line options.)
      77             :      */
      78             :     Assert(dbname == NULL || connection_string == NULL);
      79             : 
      80             :     /*
      81             :      * Merge the connection info inputs given in form of connection string,
      82             :      * options and default values (dbname=replication, replication=true, etc.)
      83             :      */
      84         748 :     i = 0;
      85         748 :     if (connection_string)
      86             :     {
      87           8 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
      88           8 :         if (conn_opts == NULL)
      89           0 :             pg_fatal("%s", err_msg);
      90             : 
      91         384 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      92             :         {
      93         376 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
      94          16 :                 argcount++;
      95             :         }
      96             : 
      97           8 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
      98           8 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
      99             : 
     100             :         /*
     101             :          * Set dbname here already, so it can be overridden by a dbname in the
     102             :          * connection string.
     103             :          */
     104           8 :         keywords[i] = "dbname";
     105           8 :         values[i] = "replication";
     106           8 :         i++;
     107             : 
     108         384 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     109             :         {
     110         376 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     111             :             {
     112          16 :                 keywords[i] = conn_opt->keyword;
     113          16 :                 values[i] = conn_opt->val;
     114          16 :                 i++;
     115             :             }
     116             :         }
     117             :     }
     118             :     else
     119             :     {
     120         740 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
     121         740 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
     122         740 :         keywords[i] = "dbname";
     123         740 :         values[i] = (dbname == NULL) ? "replication" : dbname;
     124         740 :         i++;
     125             :     }
     126             : 
     127         748 :     keywords[i] = "replication";
     128         748 :     values[i] = (dbname == NULL) ? "true" : "database";
     129         748 :     i++;
     130         748 :     keywords[i] = "fallback_application_name";
     131         748 :     values[i] = progname;
     132         748 :     i++;
     133             : 
     134         748 :     if (dbhost)
     135             :     {
     136         262 :         keywords[i] = "host";
     137         262 :         values[i] = dbhost;
     138         262 :         i++;
     139             :     }
     140         748 :     if (dbuser)
     141             :     {
     142          14 :         keywords[i] = "user";
     143          14 :         values[i] = dbuser;
     144          14 :         i++;
     145             :     }
     146         748 :     if (dbport)
     147             :     {
     148         262 :         keywords[i] = "port";
     149         262 :         values[i] = dbport;
     150         262 :         i++;
     151             :     }
     152             : 
     153             :     /* If -W was given, force prompt for password, but only the first time */
     154         748 :     need_password = (dbgetpassword == 1 && !password);
     155             : 
     156             :     do
     157             :     {
     158             :         /* Get a new password if appropriate */
     159         748 :         if (need_password)
     160             :         {
     161           0 :             free(password);
     162           0 :             password = simple_prompt("Password: ", false);
     163           0 :             need_password = false;
     164             :         }
     165             : 
     166             :         /* Use (or reuse, on a subsequent connection) password if we have it */
     167         748 :         if (password)
     168             :         {
     169           0 :             keywords[i] = "password";
     170           0 :             values[i] = password;
     171             :         }
     172             :         else
     173             :         {
     174         748 :             keywords[i] = NULL;
     175         748 :             values[i] = NULL;
     176             :         }
     177             : 
     178             :         /*
     179             :          * Only expand dbname when we did not already parse the argument as a
     180             :          * connection string ourselves.
     181             :          */
     182         748 :         tmpconn = PQconnectdbParams(keywords, values, !connection_string);
     183             : 
     184             :         /*
     185             :          * If there is too little memory even to allocate the PGconn object
     186             :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
     187             :          */
     188         748 :         if (!tmpconn)
     189           0 :             pg_fatal("could not connect to server");
     190             : 
     191             :         /* If we need a password and -w wasn't given, loop back and get one */
     192         752 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
     193           4 :             PQconnectionNeedsPassword(tmpconn) &&
     194           0 :             dbgetpassword != -1)
     195             :         {
     196           0 :             PQfinish(tmpconn);
     197           0 :             need_password = true;
     198             :         }
     199             :     }
     200         748 :     while (need_password);
     201             : 
     202         748 :     if (PQstatus(tmpconn) != CONNECTION_OK)
     203             :     {
     204           4 :         pg_log_error("%s", PQerrorMessage(tmpconn));
     205           4 :         PQfinish(tmpconn);
     206           4 :         free(values);
     207           4 :         free(keywords);
     208           4 :         PQconninfoFree(conn_opts);
     209           4 :         return NULL;
     210             :     }
     211             : 
     212             :     /* Connection ok! */
     213         744 :     free(values);
     214         744 :     free(keywords);
     215         744 :     PQconninfoFree(conn_opts);
     216             : 
     217             :     /*
     218             :      * Set always-secure search path, so malicious users can't get control.
     219             :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     220             :      * the search path cannot be changed (by us or attackers) on earlier
     221             :      * versions.
     222             :      */
     223         744 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     224             :     {
     225             :         PGresult   *res;
     226             : 
     227          96 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     228          96 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     229             :         {
     230           0 :             pg_log_error("could not clear \"search_path\": %s",
     231             :                          PQerrorMessage(tmpconn));
     232           0 :             PQclear(res);
     233           0 :             PQfinish(tmpconn);
     234           0 :             exit(1);
     235             :         }
     236          96 :         PQclear(res);
     237             :     }
     238             : 
     239             :     /*
     240             :      * Ensure we have the same value of integer_datetimes (now always "on") as
     241             :      * the server we are connecting to.
     242             :      */
     243         744 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     244         744 :     if (!tmpparam)
     245             :     {
     246           0 :         pg_log_error("could not determine server setting for \"integer_datetimes\"");
     247           0 :         PQfinish(tmpconn);
     248           0 :         exit(1);
     249             :     }
     250             : 
     251         744 :     if (strcmp(tmpparam, "on") != 0)
     252             :     {
     253           0 :         pg_log_error("\"integer_datetimes\" compile flag does not match server");
     254           0 :         PQfinish(tmpconn);
     255           0 :         exit(1);
     256             :     }
     257             : 
     258             :     /*
     259             :      * Retrieve the source data directory mode and use it to construct a umask
     260             :      * for creating directories and files.
     261             :      */
     262         744 :     if (!RetrieveDataDirCreatePerm(tmpconn))
     263             :     {
     264           0 :         PQfinish(tmpconn);
     265           0 :         exit(1);
     266             :     }
     267             : 
     268         744 :     return tmpconn;
     269             : }
     270             : 
     271             : /*
     272             :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     273             :  * since ControlFile is not accessible here.
     274             :  */
     275             : bool
     276         370 : RetrieveWalSegSize(PGconn *conn)
     277             : {
     278             :     PGresult   *res;
     279             :     char        xlog_unit[3];
     280             :     int         xlog_val,
     281         370 :                 multiplier = 1;
     282             : 
     283             :     /* check connection existence */
     284             :     Assert(conn != NULL);
     285             : 
     286             :     /* for previous versions set the default xlog seg size */
     287         370 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     288             :     {
     289           0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     290           0 :         return true;
     291             :     }
     292             : 
     293         370 :     res = PQexec(conn, "SHOW wal_segment_size");
     294         370 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     295             :     {
     296           0 :         pg_log_error("could not send replication command \"%s\": %s",
     297             :                      "SHOW wal_segment_size", PQerrorMessage(conn));
     298             : 
     299           0 :         PQclear(res);
     300           0 :         return false;
     301             :     }
     302         370 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     303             :     {
     304           0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     305             :                      PQntuples(res), PQnfields(res), 1, 1);
     306             : 
     307           0 :         PQclear(res);
     308           0 :         return false;
     309             :     }
     310             : 
     311             :     /* fetch xlog value and unit from the result */
     312         370 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     313             :     {
     314           0 :         pg_log_error("WAL segment size could not be parsed");
     315           0 :         PQclear(res);
     316           0 :         return false;
     317             :     }
     318             : 
     319         370 :     PQclear(res);
     320             : 
     321             :     /* set the multiplier based on unit to convert xlog_val to bytes */
     322         370 :     if (strcmp(xlog_unit, "MB") == 0)
     323         370 :         multiplier = 1024 * 1024;
     324           0 :     else if (strcmp(xlog_unit, "GB") == 0)
     325           0 :         multiplier = 1024 * 1024 * 1024;
     326             : 
     327             :     /* convert and set WalSegSz */
     328         370 :     WalSegSz = xlog_val * multiplier;
     329             : 
     330         370 :     if (!IsValidWalSegSize(WalSegSz))
     331             :     {
     332           0 :         pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
     333             :                               "remote server reported invalid WAL segment size (%d bytes)",
     334             :                               WalSegSz),
     335             :                      WalSegSz);
     336           0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     337           0 :         return false;
     338             :     }
     339             : 
     340         370 :     return true;
     341             : }
     342             : 
     343             : /*
     344             :  * RetrieveDataDirCreatePerm
     345             :  *
     346             :  * This function is used to determine the privileges on the server's PG data
     347             :  * directory and, based on that, set what the permissions will be for
     348             :  * directories and files we create.
     349             :  *
     350             :  * PG11 added support for (optionally) group read/execute rights to be set on
     351             :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     352             :  * on the data directory.
     353             :  */
     354             : static bool
     355         744 : RetrieveDataDirCreatePerm(PGconn *conn)
     356             : {
     357             :     PGresult   *res;
     358             :     int         data_directory_mode;
     359             : 
     360             :     /* check connection existence */
     361             :     Assert(conn != NULL);
     362             : 
     363             :     /* for previous versions leave the default group access */
     364         744 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     365           0 :         return true;
     366             : 
     367         744 :     res = PQexec(conn, "SHOW data_directory_mode");
     368         744 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     369             :     {
     370           0 :         pg_log_error("could not send replication command \"%s\": %s",
     371             :                      "SHOW data_directory_mode", PQerrorMessage(conn));
     372             : 
     373           0 :         PQclear(res);
     374           0 :         return false;
     375             :     }
     376         744 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     377             :     {
     378           0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     379             :                      PQntuples(res), PQnfields(res), 1, 1);
     380             : 
     381           0 :         PQclear(res);
     382           0 :         return false;
     383             :     }
     384             : 
     385         744 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     386             :     {
     387           0 :         pg_log_error("group access flag could not be parsed: %s",
     388             :                      PQgetvalue(res, 0, 0));
     389             : 
     390           0 :         PQclear(res);
     391           0 :         return false;
     392             :     }
     393             : 
     394         744 :     SetDataDirectoryCreatePerm(data_directory_mode);
     395             : 
     396         744 :     PQclear(res);
     397         744 :     return true;
     398             : }
     399             : 
     400             : /*
     401             :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     402             :  * some result information if requested:
     403             :  * - System identifier
     404             :  * - Current timeline ID
     405             :  * - Start LSN position
     406             :  * - Database name (NULL in servers prior to 9.4)
     407             :  */
     408             : bool
     409         760 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     410             :                   XLogRecPtr *startpos, char **db_name)
     411             : {
     412             :     PGresult   *res;
     413             :     uint32      hi,
     414             :                 lo;
     415             : 
     416             :     /* Check connection existence */
     417             :     Assert(conn != NULL);
     418             : 
     419         760 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
     420         760 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     421             :     {
     422           0 :         pg_log_error("could not send replication command \"%s\": %s",
     423             :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     424             : 
     425           0 :         PQclear(res);
     426           0 :         return false;
     427             :     }
     428         760 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
     429             :     {
     430           0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     431             :                      PQntuples(res), PQnfields(res), 1, 3);
     432             : 
     433           0 :         PQclear(res);
     434           0 :         return false;
     435             :     }
     436             : 
     437             :     /* Get system identifier */
     438         760 :     if (sysid != NULL)
     439         644 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     440             : 
     441             :     /* Get timeline ID to start streaming from */
     442         760 :     if (starttli != NULL)
     443         644 :         *starttli = atoi(PQgetvalue(res, 0, 1));
     444             : 
     445             :     /* Get LSN start position if necessary */
     446         760 :     if (startpos != NULL)
     447             :     {
     448          14 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
     449             :         {
     450           0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
     451             :                          PQgetvalue(res, 0, 2));
     452             : 
     453           0 :             PQclear(res);
     454           0 :             return false;
     455             :         }
     456          14 :         *startpos = ((uint64) hi) << 32 | lo;
     457             :     }
     458             : 
     459             :     /* Get database name, only available in 9.4 and newer versions */
     460         760 :     if (db_name != NULL)
     461             :     {
     462         116 :         *db_name = NULL;
     463         116 :         if (PQserverVersion(conn) >= 90400)
     464             :         {
     465         116 :             if (PQnfields(res) < 4)
     466             :             {
     467           0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     468             :                              PQntuples(res), PQnfields(res), 1, 4);
     469             : 
     470           0 :                 PQclear(res);
     471           0 :                 return false;
     472             :             }
     473         116 :             if (!PQgetisnull(res, 0, 3))
     474          96 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     475             :         }
     476             :     }
     477             : 
     478         760 :     PQclear(res);
     479         760 :     return true;
     480             : }
     481             : 
     482             : /*
     483             :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     484             :  * caller some result information if requested for this slot:
     485             :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     486             :  * - Current timeline ID, 0 if unknown.
     487             :  * Returns false on failure, and true otherwise.
     488             :  */
     489             : bool
     490           6 : GetSlotInformation(PGconn *conn, const char *slot_name,
     491             :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     492             : {
     493             :     PGresult   *res;
     494             :     PQExpBuffer query;
     495           6 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
     496           6 :     TimeLineID  tli_loc = 0;
     497             : 
     498           6 :     if (restart_lsn)
     499           6 :         *restart_lsn = lsn_loc;
     500           6 :     if (restart_tli)
     501           6 :         *restart_tli = tli_loc;
     502             : 
     503           6 :     query = createPQExpBuffer();
     504           6 :     appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
     505           6 :     res = PQexec(conn, query->data);
     506           6 :     destroyPQExpBuffer(query);
     507             : 
     508           6 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     509             :     {
     510           0 :         pg_log_error("could not send replication command \"%s\": %s",
     511             :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     512           0 :         PQclear(res);
     513           0 :         return false;
     514             :     }
     515             : 
     516             :     /* The command should always return precisely one tuple and three fields */
     517           6 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
     518             :     {
     519           0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     520             :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
     521           0 :         PQclear(res);
     522           0 :         return false;
     523             :     }
     524             : 
     525             :     /*
     526             :      * When the slot doesn't exist, the command returns a tuple with NULL
     527             :      * values.  This checks only the slot type field.
     528             :      */
     529           6 :     if (PQgetisnull(res, 0, 0))
     530             :     {
     531           2 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
     532           2 :         PQclear(res);
     533           2 :         return false;
     534             :     }
     535             : 
     536             :     /*
     537             :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     538             :      * physical slots, but play it safe.
     539             :      */
     540           4 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     541             :     {
     542           0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     543             :                      PQgetvalue(res, 0, 0));
     544           0 :         PQclear(res);
     545           0 :         return false;
     546             :     }
     547             : 
     548             :     /* restart LSN */
     549           4 :     if (!PQgetisnull(res, 0, 1))
     550             :     {
     551             :         uint32      hi,
     552             :                     lo;
     553             : 
     554           4 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
     555             :         {
     556           0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     557             :                          PQgetvalue(res, 0, 1), slot_name);
     558           0 :             PQclear(res);
     559           0 :             return false;
     560             :         }
     561           4 :         lsn_loc = ((uint64) hi) << 32 | lo;
     562             :     }
     563             : 
     564             :     /* current TLI */
     565           4 :     if (!PQgetisnull(res, 0, 2))
     566           4 :         tli_loc = (TimeLineID) atoll(PQgetvalue(res, 0, 2));
     567             : 
     568           4 :     PQclear(res);
     569             : 
     570             :     /* Assign results if requested */
     571           4 :     if (restart_lsn)
     572           4 :         *restart_lsn = lsn_loc;
     573           4 :     if (restart_tli)
     574           4 :         *restart_tli = tli_loc;
     575             : 
     576           4 :     return true;
     577             : }
     578             : 
     579             : /*
     580             :  * Create a replication slot for the given connection. This function
     581             :  * returns true in case of success.
     582             :  */
     583             : bool
     584         314 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     585             :                       bool is_temporary, bool is_physical, bool reserve_wal,
     586             :                       bool slot_exists_ok, bool two_phase)
     587             : {
     588             :     PQExpBuffer query;
     589             :     PGresult   *res;
     590         314 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     591             : 
     592         314 :     query = createPQExpBuffer();
     593             : 
     594             :     Assert((is_physical && plugin == NULL) ||
     595             :            (!is_physical && plugin != NULL));
     596             :     Assert(!(two_phase && is_physical));
     597             :     Assert(slot_name != NULL);
     598             : 
     599             :     /* Build base portion of query */
     600         314 :     appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
     601         314 :     if (is_temporary)
     602         260 :         appendPQExpBufferStr(query, " TEMPORARY");
     603         314 :     if (is_physical)
     604         266 :         appendPQExpBufferStr(query, " PHYSICAL");
     605             :     else
     606          48 :         appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
     607             : 
     608             :     /* Add any requested options */
     609         314 :     if (use_new_option_syntax)
     610         314 :         appendPQExpBufferStr(query, " (");
     611         314 :     if (is_physical)
     612             :     {
     613         266 :         if (reserve_wal)
     614         264 :             AppendPlainCommandOption(query, use_new_option_syntax,
     615             :                                      "RESERVE_WAL");
     616             :     }
     617             :     else
     618             :     {
     619          48 :         if (two_phase && PQserverVersion(conn) >= 150000)
     620           2 :             AppendPlainCommandOption(query, use_new_option_syntax,
     621             :                                      "TWO_PHASE");
     622             : 
     623          48 :         if (PQserverVersion(conn) >= 100000)
     624             :         {
     625             :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     626          48 :             if (use_new_option_syntax)
     627          48 :                 AppendStringCommandOption(query, use_new_option_syntax,
     628             :                                           "SNAPSHOT", "nothing");
     629             :             else
     630           0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
     631             :                                          "NOEXPORT_SNAPSHOT");
     632             :         }
     633             :     }
     634         314 :     if (use_new_option_syntax)
     635             :     {
     636             :         /* Suppress option list if it would be empty, otherwise terminate */
     637         314 :         if (query->data[query->len - 1] == '(')
     638             :         {
     639           2 :             query->len -= 2;
     640           2 :             query->data[query->len] = '\0';
     641             :         }
     642             :         else
     643         312 :             appendPQExpBufferChar(query, ')');
     644             :     }
     645             : 
     646             :     /* Now run the query */
     647         314 :     res = PQexec(conn, query->data);
     648         314 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     649             :     {
     650           2 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     651             : 
     652           2 :         if (slot_exists_ok &&
     653           0 :             sqlstate &&
     654           0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     655             :         {
     656           0 :             destroyPQExpBuffer(query);
     657           0 :             PQclear(res);
     658           0 :             return true;
     659             :         }
     660             :         else
     661             :         {
     662           2 :             pg_log_error("could not send replication command \"%s\": %s",
     663             :                          query->data, PQerrorMessage(conn));
     664             : 
     665           2 :             destroyPQExpBuffer(query);
     666           2 :             PQclear(res);
     667           2 :             return false;
     668             :         }
     669             :     }
     670             : 
     671         312 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
     672             :     {
     673           0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     674             :                      slot_name,
     675             :                      PQntuples(res), PQnfields(res), 1, 4);
     676             : 
     677           0 :         destroyPQExpBuffer(query);
     678           0 :         PQclear(res);
     679           0 :         return false;
     680             :     }
     681             : 
     682         312 :     destroyPQExpBuffer(query);
     683         312 :     PQclear(res);
     684         312 :     return true;
     685             : }
     686             : 
     687             : /*
     688             :  * Drop a replication slot for the given connection. This function
     689             :  * returns true in case of success.
     690             :  */
     691             : bool
     692           6 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     693             : {
     694             :     PQExpBuffer query;
     695             :     PGresult   *res;
     696             : 
     697             :     Assert(slot_name != NULL);
     698             : 
     699           6 :     query = createPQExpBuffer();
     700             : 
     701             :     /* Build query */
     702           6 :     appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
     703             :                       slot_name);
     704           6 :     res = PQexec(conn, query->data);
     705           6 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     706             :     {
     707           0 :         pg_log_error("could not send replication command \"%s\": %s",
     708             :                      query->data, PQerrorMessage(conn));
     709             : 
     710           0 :         destroyPQExpBuffer(query);
     711           0 :         PQclear(res);
     712           0 :         return false;
     713             :     }
     714             : 
     715           6 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
     716             :     {
     717           0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     718             :                      slot_name,
     719             :                      PQntuples(res), PQnfields(res), 0, 0);
     720             : 
     721           0 :         destroyPQExpBuffer(query);
     722           0 :         PQclear(res);
     723           0 :         return false;
     724             :     }
     725             : 
     726           6 :     destroyPQExpBuffer(query);
     727           6 :     PQclear(res);
     728           6 :     return true;
     729             : }
     730             : 
     731             : /*
     732             :  * Append a "plain" option - one with no value - to a server command that
     733             :  * is being constructed.
     734             :  *
     735             :  * In the old syntax, all options were parser keywords, so you could just
     736             :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     737             :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     738             :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     739             :  */
     740             : void
     741        2634 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     742             :                          char *option_name)
     743             : {
     744        2634 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     745             :     {
     746        1968 :         if (use_new_option_syntax)
     747        1968 :             appendPQExpBufferStr(buf, ", ");
     748             :         else
     749           0 :             appendPQExpBufferChar(buf, ' ');
     750             :     }
     751             : 
     752        2634 :     appendPQExpBuffer(buf, " %s", option_name);
     753        2634 : }
     754             : 
     755             : /*
     756             :  * Append an option with an associated string value to a server command that
     757             :  * is being constructed.
     758             :  *
     759             :  * See comments for AppendPlainCommandOption, above.
     760             :  */
     761             : void
     762        1556 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     763             :                           char *option_name, char *option_value)
     764             : {
     765        1556 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     766             : 
     767        1556 :     if (option_value != NULL)
     768             :     {
     769        1556 :         size_t      length = strlen(option_value);
     770        1556 :         char       *escaped_value = palloc(1 + 2 * length);
     771             : 
     772        1556 :         PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
     773        1556 :         appendPQExpBuffer(buf, " '%s'", escaped_value);
     774        1556 :         pfree(escaped_value);
     775             :     }
     776        1556 : }
     777             : 
     778             : /*
     779             :  * Append an option with an associated integer value to a server command
     780             :  * is being constructed.
     781             :  *
     782             :  * See comments for AppendPlainCommandOption, above.
     783             :  */
     784             : void
     785         338 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     786             :                            char *option_name, int32 option_value)
     787             : {
     788         338 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     789             : 
     790         338 :     appendPQExpBuffer(buf, " %d", option_value);
     791         338 : }
     792             : 
     793             : /*
     794             :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     795             :  * backend code.
     796             :  */
     797             : TimestampTz
     798        5518 : feGetCurrentTimestamp(void)
     799             : {
     800             :     TimestampTz result;
     801             :     struct timeval tp;
     802             : 
     803        5518 :     gettimeofday(&tp, NULL);
     804             : 
     805        5518 :     result = (TimestampTz) tp.tv_sec -
     806             :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     807        5518 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
     808             : 
     809        5518 :     return result;
     810             : }
     811             : 
     812             : /*
     813             :  * Frontend version of TimestampDifference(), since we are not linked with
     814             :  * backend code.
     815             :  */
     816             : void
     817        4432 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     818             :                       long *secs, int *microsecs)
     819             : {
     820        4432 :     TimestampTz diff = stop_time - start_time;
     821             : 
     822        4432 :     if (diff <= 0)
     823             :     {
     824           0 :         *secs = 0;
     825           0 :         *microsecs = 0;
     826             :     }
     827             :     else
     828             :     {
     829        4432 :         *secs = (long) (diff / USECS_PER_SEC);
     830        4432 :         *microsecs = (int) (diff % USECS_PER_SEC);
     831             :     }
     832        4432 : }
     833             : 
     834             : /*
     835             :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     836             :  * linked with backend code.
     837             :  */
     838             : bool
     839        6924 : feTimestampDifferenceExceeds(TimestampTz start_time,
     840             :                              TimestampTz stop_time,
     841             :                              int msec)
     842             : {
     843        6924 :     TimestampTz diff = stop_time - start_time;
     844             : 
     845        6924 :     return (diff >= msec * INT64CONST(1000));
     846             : }
     847             : 
     848             : /*
     849             :  * Converts an int64 to network byte order.
     850             :  */
     851             : void
     852        1296 : fe_sendint64(int64 i, char *buf)
     853             : {
     854        1296 :     uint64      n64 = pg_hton64(i);
     855             : 
     856        1296 :     memcpy(buf, &n64, sizeof(n64));
     857        1296 : }
     858             : 
     859             : /*
     860             :  * Converts an int64 from network byte order to native format.
     861             :  */
     862             : int64
     863        8068 : fe_recvint64(char *buf)
     864             : {
     865             :     uint64      n64;
     866             : 
     867        8068 :     memcpy(&n64, buf, sizeof(n64));
     868             : 
     869        8068 :     return pg_ntoh64(n64);
     870             : }

Generated by: LCOV version 1.14