LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 228 321 71.0 %
Date: 2023-10-02 07:10:39 Functions: 15 15 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4             :  *                  pg_recvlogical
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/streamutil.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/time.h>
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xlog_internal.h"
      21             : #include "common/connect.h"
      22             : #include "common/fe_memutils.h"
      23             : #include "common/file_perm.h"
      24             : #include "common/logging.h"
      25             : #include "common/string.h"
      26             : #include "datatype/timestamp.h"
      27             : #include "port/pg_bswap.h"
      28             : #include "pqexpbuffer.h"
      29             : #include "receivelog.h"
      30             : #include "streamutil.h"
      31             : 
      32             : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      33             : 
      34             : int         WalSegSz;
      35             : 
      36             : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      37             : 
      38             : /* SHOW command for replication connection was introduced in version 10 */
      39             : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      40             : 
      41             : /*
      42             :  * Group access is supported from version 11.
      43             :  */
      44             : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      45             : 
      46             : const char *progname;
      47             : char       *connection_string = NULL;
      48             : char       *dbhost = NULL;
      49             : char       *dbuser = NULL;
      50             : char       *dbport = NULL;
      51             : char       *dbname = NULL;
      52             : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
      53             : static char *password = NULL;
      54             : PGconn     *conn = NULL;
      55             : 
      56             : /*
      57             :  * Connect to the server. Returns a valid PGconn pointer if connected,
      58             :  * or NULL on non-permanent error. On permanent error, the function will
      59             :  * call exit(1) directly.
      60             :  */
      61             : PGconn *
      62         582 : GetConnection(void)
      63             : {
      64             :     PGconn     *tmpconn;
      65         582 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
      66             :                                  * host, user, port, password */
      67             :     int         i;
      68             :     const char **keywords;
      69             :     const char **values;
      70             :     const char *tmpparam;
      71             :     bool        need_password;
      72         582 :     PQconninfoOption *conn_opts = NULL;
      73             :     PQconninfoOption *conn_opt;
      74         582 :     char       *err_msg = NULL;
      75             : 
      76             :     /* pg_recvlogical uses dbname only; others use connection_string only. */
      77             :     Assert(dbname == NULL || connection_string == NULL);
      78             : 
      79             :     /*
      80             :      * Merge the connection info inputs given in form of connection string,
      81             :      * options and default values (dbname=replication, replication=true, etc.)
      82             :      */
      83         582 :     i = 0;
      84         582 :     if (connection_string)
      85             :     {
      86           4 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
      87           4 :         if (conn_opts == NULL)
      88           0 :             pg_fatal("%s", err_msg);
      89             : 
      90         164 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      91             :         {
      92         160 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
      93          12 :                 argcount++;
      94             :         }
      95             : 
      96           4 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
      97           4 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
      98             : 
      99             :         /*
     100             :          * Set dbname here already, so it can be overridden by a dbname in the
     101             :          * connection string.
     102             :          */
     103           4 :         keywords[i] = "dbname";
     104           4 :         values[i] = "replication";
     105           4 :         i++;
     106             : 
     107         164 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     108             :         {
     109         160 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     110             :             {
     111          12 :                 keywords[i] = conn_opt->keyword;
     112          12 :                 values[i] = conn_opt->val;
     113          12 :                 i++;
     114             :             }
     115             :         }
     116             :     }
     117             :     else
     118             :     {
     119         578 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
     120         578 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
     121         578 :         keywords[i] = "dbname";
     122         578 :         values[i] = dbname;
     123         578 :         i++;
     124             :     }
     125             : 
     126         582 :     keywords[i] = "replication";
     127         582 :     values[i] = dbname == NULL ? "true" : "database";
     128         582 :     i++;
     129         582 :     keywords[i] = "fallback_application_name";
     130         582 :     values[i] = progname;
     131         582 :     i++;
     132             : 
     133         582 :     if (dbhost)
     134             :     {
     135         214 :         keywords[i] = "host";
     136         214 :         values[i] = dbhost;
     137         214 :         i++;
     138             :     }
     139         582 :     if (dbuser)
     140             :     {
     141          14 :         keywords[i] = "user";
     142          14 :         values[i] = dbuser;
     143          14 :         i++;
     144             :     }
     145         582 :     if (dbport)
     146             :     {
     147         214 :         keywords[i] = "port";
     148         214 :         values[i] = dbport;
     149         214 :         i++;
     150             :     }
     151             : 
     152             :     /* If -W was given, force prompt for password, but only the first time */
     153         582 :     need_password = (dbgetpassword == 1 && !password);
     154             : 
     155             :     do
     156             :     {
     157             :         /* Get a new password if appropriate */
     158         582 :         if (need_password)
     159             :         {
     160           0 :             free(password);
     161           0 :             password = simple_prompt("Password: ", false);
     162           0 :             need_password = false;
     163             :         }
     164             : 
     165             :         /* Use (or reuse, on a subsequent connection) password if we have it */
     166         582 :         if (password)
     167             :         {
     168           0 :             keywords[i] = "password";
     169           0 :             values[i] = password;
     170             :         }
     171             :         else
     172             :         {
     173         582 :             keywords[i] = NULL;
     174         582 :             values[i] = NULL;
     175             :         }
     176             : 
     177             :         /*
     178             :          * Only expand dbname when we did not already parse the argument as a
     179             :          * connection string ourselves.
     180             :          */
     181         582 :         tmpconn = PQconnectdbParams(keywords, values, !connection_string);
     182             : 
     183             :         /*
     184             :          * If there is too little memory even to allocate the PGconn object
     185             :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
     186             :          */
     187         582 :         if (!tmpconn)
     188           0 :             pg_fatal("could not connect to server");
     189             : 
     190             :         /* If we need a password and -w wasn't given, loop back and get one */
     191         586 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
     192           4 :             PQconnectionNeedsPassword(tmpconn) &&
     193           0 :             dbgetpassword != -1)
     194             :         {
     195           0 :             PQfinish(tmpconn);
     196           0 :             need_password = true;
     197             :         }
     198             :     }
     199         582 :     while (need_password);
     200             : 
     201         582 :     if (PQstatus(tmpconn) != CONNECTION_OK)
     202             :     {
     203           4 :         pg_log_error("%s", PQerrorMessage(tmpconn));
     204           4 :         PQfinish(tmpconn);
     205           4 :         free(values);
     206           4 :         free(keywords);
     207           4 :         PQconninfoFree(conn_opts);
     208           4 :         return NULL;
     209             :     }
     210             : 
     211             :     /* Connection ok! */
     212         578 :     free(values);
     213         578 :     free(keywords);
     214         578 :     PQconninfoFree(conn_opts);
     215             : 
     216             :     /*
     217             :      * Set always-secure search path, so malicious users can't get control.
     218             :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     219             :      * the search path cannot be changed (by us or attackers) on earlier
     220             :      * versions.
     221             :      */
     222         578 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     223             :     {
     224             :         PGresult   *res;
     225             : 
     226          94 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     227          94 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     228             :         {
     229           0 :             pg_log_error("could not clear search_path: %s",
     230             :                          PQerrorMessage(tmpconn));
     231           0 :             PQclear(res);
     232           0 :             PQfinish(tmpconn);
     233           0 :             exit(1);
     234             :         }
     235          94 :         PQclear(res);
     236             :     }
     237             : 
     238             :     /*
     239             :      * Ensure we have the same value of integer_datetimes (now always "on") as
     240             :      * the server we are connecting to.
     241             :      */
     242         578 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     243         578 :     if (!tmpparam)
     244             :     {
     245           0 :         pg_log_error("could not determine server setting for integer_datetimes");
     246           0 :         PQfinish(tmpconn);
     247           0 :         exit(1);
     248             :     }
     249             : 
     250         578 :     if (strcmp(tmpparam, "on") != 0)
     251             :     {
     252           0 :         pg_log_error("integer_datetimes compile flag does not match server");
     253           0 :         PQfinish(tmpconn);
     254           0 :         exit(1);
     255             :     }
     256             : 
     257             :     /*
     258             :      * Retrieve the source data directory mode and use it to construct a umask
     259             :      * for creating directories and files.
     260             :      */
     261         578 :     if (!RetrieveDataDirCreatePerm(tmpconn))
     262             :     {
     263           0 :         PQfinish(tmpconn);
     264           0 :         exit(1);
     265             :     }
     266             : 
     267         578 :     return tmpconn;
     268             : }
     269             : 
     270             : /*
     271             :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     272             :  * since ControlFile is not accessible here.
     273             :  */
     274             : bool
     275         282 : RetrieveWalSegSize(PGconn *conn)
     276             : {
     277             :     PGresult   *res;
     278             :     char        xlog_unit[3];
     279             :     int         xlog_val,
     280         282 :                 multiplier = 1;
     281             : 
     282             :     /* check connection existence */
     283             :     Assert(conn != NULL);
     284             : 
     285             :     /* for previous versions set the default xlog seg size */
     286         282 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     287             :     {
     288           0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     289           0 :         return true;
     290             :     }
     291             : 
     292         282 :     res = PQexec(conn, "SHOW wal_segment_size");
     293         282 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     294             :     {
     295           0 :         pg_log_error("could not send replication command \"%s\": %s",
     296             :                      "SHOW wal_segment_size", PQerrorMessage(conn));
     297             : 
     298           0 :         PQclear(res);
     299           0 :         return false;
     300             :     }
     301         282 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     302             :     {
     303           0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     304             :                      PQntuples(res), PQnfields(res), 1, 1);
     305             : 
     306           0 :         PQclear(res);
     307           0 :         return false;
     308             :     }
     309             : 
     310             :     /* fetch xlog value and unit from the result */
     311         282 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     312             :     {
     313           0 :         pg_log_error("WAL segment size could not be parsed");
     314           0 :         PQclear(res);
     315           0 :         return false;
     316             :     }
     317             : 
     318         282 :     PQclear(res);
     319             : 
     320             :     /* set the multiplier based on unit to convert xlog_val to bytes */
     321         282 :     if (strcmp(xlog_unit, "MB") == 0)
     322         282 :         multiplier = 1024 * 1024;
     323           0 :     else if (strcmp(xlog_unit, "GB") == 0)
     324           0 :         multiplier = 1024 * 1024 * 1024;
     325             : 
     326             :     /* convert and set WalSegSz */
     327         282 :     WalSegSz = xlog_val * multiplier;
     328             : 
     329         282 :     if (!IsValidWalSegSize(WalSegSz))
     330             :     {
     331           0 :         pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
     332             :                               "remote server reported invalid WAL segment size (%d bytes)",
     333             :                               WalSegSz),
     334             :                      WalSegSz);
     335           0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     336           0 :         return false;
     337             :     }
     338             : 
     339         282 :     return true;
     340             : }
     341             : 
     342             : /*
     343             :  * RetrieveDataDirCreatePerm
     344             :  *
     345             :  * This function is used to determine the privileges on the server's PG data
     346             :  * directory and, based on that, set what the permissions will be for
     347             :  * directories and files we create.
     348             :  *
     349             :  * PG11 added support for (optionally) group read/execute rights to be set on
     350             :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     351             :  * on the data directory.
     352             :  */
     353             : static bool
     354         578 : RetrieveDataDirCreatePerm(PGconn *conn)
     355             : {
     356             :     PGresult   *res;
     357             :     int         data_directory_mode;
     358             : 
     359             :     /* check connection existence */
     360             :     Assert(conn != NULL);
     361             : 
     362             :     /* for previous versions leave the default group access */
     363         578 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     364           0 :         return true;
     365             : 
     366         578 :     res = PQexec(conn, "SHOW data_directory_mode");
     367         578 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     368             :     {
     369           0 :         pg_log_error("could not send replication command \"%s\": %s",
     370             :                      "SHOW data_directory_mode", PQerrorMessage(conn));
     371             : 
     372           0 :         PQclear(res);
     373           0 :         return false;
     374             :     }
     375         578 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     376             :     {
     377           0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     378             :                      PQntuples(res), PQnfields(res), 1, 1);
     379             : 
     380           0 :         PQclear(res);
     381           0 :         return false;
     382             :     }
     383             : 
     384         578 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     385             :     {
     386           0 :         pg_log_error("group access flag could not be parsed: %s",
     387             :                      PQgetvalue(res, 0, 0));
     388             : 
     389           0 :         PQclear(res);
     390           0 :         return false;
     391             :     }
     392             : 
     393         578 :     SetDataDirectoryCreatePerm(data_directory_mode);
     394             : 
     395         578 :     PQclear(res);
     396         578 :     return true;
     397             : }
     398             : 
     399             : /*
     400             :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     401             :  * some result information if requested:
     402             :  * - System identifier
     403             :  * - Current timeline ID
     404             :  * - Start LSN position
     405             :  * - Database name (NULL in servers prior to 9.4)
     406             :  */
     407             : bool
     408         594 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     409             :                   XLogRecPtr *startpos, char **db_name)
     410             : {
     411             :     PGresult   *res;
     412             :     uint32      hi,
     413             :                 lo;
     414             : 
     415             :     /* Check connection existence */
     416             :     Assert(conn != NULL);
     417             : 
     418         594 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
     419         594 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     420             :     {
     421           0 :         pg_log_error("could not send replication command \"%s\": %s",
     422             :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     423             : 
     424           0 :         PQclear(res);
     425           0 :         return false;
     426             :     }
     427         594 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
     428             :     {
     429           0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     430             :                      PQntuples(res), PQnfields(res), 1, 3);
     431             : 
     432           0 :         PQclear(res);
     433           0 :         return false;
     434             :     }
     435             : 
     436             :     /* Get system identifier */
     437         594 :     if (sysid != NULL)
     438         482 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     439             : 
     440             :     /* Get timeline ID to start streaming from */
     441         594 :     if (starttli != NULL)
     442         482 :         *starttli = atoi(PQgetvalue(res, 0, 1));
     443             : 
     444             :     /* Get LSN start position if necessary */
     445         594 :     if (startpos != NULL)
     446             :     {
     447          14 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
     448             :         {
     449           0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
     450             :                          PQgetvalue(res, 0, 2));
     451             : 
     452           0 :             PQclear(res);
     453           0 :             return false;
     454             :         }
     455          14 :         *startpos = ((uint64) hi) << 32 | lo;
     456             :     }
     457             : 
     458             :     /* Get database name, only available in 9.4 and newer versions */
     459         594 :     if (db_name != NULL)
     460             :     {
     461         112 :         *db_name = NULL;
     462         112 :         if (PQserverVersion(conn) >= 90400)
     463             :         {
     464         112 :             if (PQnfields(res) < 4)
     465             :             {
     466           0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     467             :                              PQntuples(res), PQnfields(res), 1, 4);
     468             : 
     469           0 :                 PQclear(res);
     470           0 :                 return false;
     471             :             }
     472         112 :             if (!PQgetisnull(res, 0, 3))
     473          94 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     474             :         }
     475             :     }
     476             : 
     477         594 :     PQclear(res);
     478         594 :     return true;
     479             : }
     480             : 
     481             : /*
     482             :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     483             :  * caller some result information if requested for this slot:
     484             :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     485             :  * - Current timeline ID, 0 if unknown.
     486             :  * Returns false on failure, and true otherwise.
     487             :  */
     488             : bool
     489           6 : GetSlotInformation(PGconn *conn, const char *slot_name,
     490             :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     491             : {
     492             :     PGresult   *res;
     493             :     PQExpBuffer query;
     494           6 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
     495           6 :     TimeLineID  tli_loc = 0;
     496             : 
     497           6 :     if (restart_lsn)
     498           6 :         *restart_lsn = lsn_loc;
     499           6 :     if (restart_tli)
     500           6 :         *restart_tli = tli_loc;
     501             : 
     502           6 :     query = createPQExpBuffer();
     503           6 :     appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
     504           6 :     res = PQexec(conn, query->data);
     505           6 :     destroyPQExpBuffer(query);
     506             : 
     507           6 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     508             :     {
     509           0 :         pg_log_error("could not send replication command \"%s\": %s",
     510             :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     511           0 :         PQclear(res);
     512           0 :         return false;
     513             :     }
     514             : 
     515             :     /* The command should always return precisely one tuple and three fields */
     516           6 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
     517             :     {
     518           0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     519             :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
     520           0 :         PQclear(res);
     521           0 :         return false;
     522             :     }
     523             : 
     524             :     /*
     525             :      * When the slot doesn't exist, the command returns a tuple with NULL
     526             :      * values.  This checks only the slot type field.
     527             :      */
     528           6 :     if (PQgetisnull(res, 0, 0))
     529             :     {
     530           2 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
     531           2 :         PQclear(res);
     532           2 :         return false;
     533             :     }
     534             : 
     535             :     /*
     536             :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     537             :      * physical slots, but play it safe.
     538             :      */
     539           4 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     540             :     {
     541           0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     542             :                      PQgetvalue(res, 0, 0));
     543           0 :         PQclear(res);
     544           0 :         return false;
     545             :     }
     546             : 
     547             :     /* restart LSN */
     548           4 :     if (!PQgetisnull(res, 0, 1))
     549             :     {
     550             :         uint32      hi,
     551             :                     lo;
     552             : 
     553           4 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
     554             :         {
     555           0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     556             :                          PQgetvalue(res, 0, 1), slot_name);
     557           0 :             PQclear(res);
     558           0 :             return false;
     559             :         }
     560           4 :         lsn_loc = ((uint64) hi) << 32 | lo;
     561             :     }
     562             : 
     563             :     /* current TLI */
     564           4 :     if (!PQgetisnull(res, 0, 2))
     565           4 :         tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2));
     566             : 
     567           4 :     PQclear(res);
     568             : 
     569             :     /* Assign results if requested */
     570           4 :     if (restart_lsn)
     571           4 :         *restart_lsn = lsn_loc;
     572           4 :     if (restart_tli)
     573           4 :         *restart_tli = tli_loc;
     574             : 
     575           4 :     return true;
     576             : }
     577             : 
     578             : /*
     579             :  * Create a replication slot for the given connection. This function
     580             :  * returns true in case of success.
     581             :  */
     582             : bool
     583         236 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     584             :                       bool is_temporary, bool is_physical, bool reserve_wal,
     585             :                       bool slot_exists_ok, bool two_phase)
     586             : {
     587             :     PQExpBuffer query;
     588             :     PGresult   *res;
     589         236 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     590             : 
     591         236 :     query = createPQExpBuffer();
     592             : 
     593             :     Assert((is_physical && plugin == NULL) ||
     594             :            (!is_physical && plugin != NULL));
     595             :     Assert(!(two_phase && is_physical));
     596             :     Assert(slot_name != NULL);
     597             : 
     598             :     /* Build base portion of query */
     599         236 :     appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
     600         236 :     if (is_temporary)
     601         184 :         appendPQExpBufferStr(query, " TEMPORARY");
     602         236 :     if (is_physical)
     603         190 :         appendPQExpBufferStr(query, " PHYSICAL");
     604             :     else
     605          46 :         appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
     606             : 
     607             :     /* Add any requested options */
     608         236 :     if (use_new_option_syntax)
     609         236 :         appendPQExpBufferStr(query, " (");
     610         236 :     if (is_physical)
     611             :     {
     612         190 :         if (reserve_wal)
     613         188 :             AppendPlainCommandOption(query, use_new_option_syntax,
     614             :                                      "RESERVE_WAL");
     615             :     }
     616             :     else
     617             :     {
     618          46 :         if (two_phase && PQserverVersion(conn) >= 150000)
     619           2 :             AppendPlainCommandOption(query, use_new_option_syntax,
     620             :                                      "TWO_PHASE");
     621             : 
     622          46 :         if (PQserverVersion(conn) >= 100000)
     623             :         {
     624             :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     625          46 :             if (use_new_option_syntax)
     626          46 :                 AppendStringCommandOption(query, use_new_option_syntax,
     627             :                                           "SNAPSHOT", "nothing");
     628             :             else
     629           0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
     630             :                                          "NOEXPORT_SNAPSHOT");
     631             :         }
     632             :     }
     633         236 :     if (use_new_option_syntax)
     634             :     {
     635             :         /* Suppress option list if it would be empty, otherwise terminate */
     636         236 :         if (query->data[query->len - 1] == '(')
     637             :         {
     638           2 :             query->len -= 2;
     639           2 :             query->data[query->len] = '\0';
     640             :         }
     641             :         else
     642         234 :             appendPQExpBufferChar(query, ')');
     643             :     }
     644             : 
     645             :     /* Now run the query */
     646         236 :     res = PQexec(conn, query->data);
     647         236 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     648             :     {
     649           2 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     650             : 
     651           2 :         if (slot_exists_ok &&
     652           0 :             sqlstate &&
     653           0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     654             :         {
     655           0 :             destroyPQExpBuffer(query);
     656           0 :             PQclear(res);
     657           0 :             return true;
     658             :         }
     659             :         else
     660             :         {
     661           2 :             pg_log_error("could not send replication command \"%s\": %s",
     662             :                          query->data, PQerrorMessage(conn));
     663             : 
     664           2 :             destroyPQExpBuffer(query);
     665           2 :             PQclear(res);
     666           2 :             return false;
     667             :         }
     668             :     }
     669             : 
     670         234 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
     671             :     {
     672           0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     673             :                      slot_name,
     674             :                      PQntuples(res), PQnfields(res), 1, 4);
     675             : 
     676           0 :         destroyPQExpBuffer(query);
     677           0 :         PQclear(res);
     678           0 :         return false;
     679             :     }
     680             : 
     681         234 :     destroyPQExpBuffer(query);
     682         234 :     PQclear(res);
     683         234 :     return true;
     684             : }
     685             : 
     686             : /*
     687             :  * Drop a replication slot for the given connection. This function
     688             :  * returns true in case of success.
     689             :  */
     690             : bool
     691           4 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     692             : {
     693             :     PQExpBuffer query;
     694             :     PGresult   *res;
     695             : 
     696             :     Assert(slot_name != NULL);
     697             : 
     698           4 :     query = createPQExpBuffer();
     699             : 
     700             :     /* Build query */
     701           4 :     appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
     702             :                       slot_name);
     703           4 :     res = PQexec(conn, query->data);
     704           4 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     705             :     {
     706           0 :         pg_log_error("could not send replication command \"%s\": %s",
     707             :                      query->data, PQerrorMessage(conn));
     708             : 
     709           0 :         destroyPQExpBuffer(query);
     710           0 :         PQclear(res);
     711           0 :         return false;
     712             :     }
     713             : 
     714           4 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
     715             :     {
     716           0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     717             :                      slot_name,
     718             :                      PQntuples(res), PQnfields(res), 0, 0);
     719             : 
     720           0 :         destroyPQExpBuffer(query);
     721           0 :         PQclear(res);
     722           0 :         return false;
     723             :     }
     724             : 
     725           4 :     destroyPQExpBuffer(query);
     726           4 :     PQclear(res);
     727           4 :     return true;
     728             : }
     729             : 
     730             : /*
     731             :  * Append a "plain" option - one with no value - to a server command that
     732             :  * is being constructed.
     733             :  *
     734             :  * In the old syntax, all options were parser keywords, so you could just
     735             :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     736             :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     737             :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     738             :  */
     739             : void
     740        1982 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     741             :                          char *option_name)
     742             : {
     743        1982 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     744             :     {
     745        1480 :         if (use_new_option_syntax)
     746        1480 :             appendPQExpBufferStr(buf, ", ");
     747             :         else
     748           0 :             appendPQExpBufferChar(buf, ' ');
     749             :     }
     750             : 
     751        1982 :     appendPQExpBuffer(buf, " %s", option_name);
     752        1982 : }
     753             : 
     754             : /*
     755             :  * Append an option with an associated string value to a server command that
     756             :  * is being constructed.
     757             :  *
     758             :  * See comments for AppendPlainCommandOption, above.
     759             :  */
     760             : void
     761        1192 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     762             :                           char *option_name, char *option_value)
     763             : {
     764        1192 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     765             : 
     766        1192 :     if (option_value != NULL)
     767             :     {
     768        1192 :         size_t      length = strlen(option_value);
     769        1192 :         char       *escaped_value = palloc(1 + 2 * length);
     770             : 
     771        1192 :         PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
     772        1192 :         appendPQExpBuffer(buf, " '%s'", escaped_value);
     773        1192 :         pfree(escaped_value);
     774             :     }
     775        1192 : }
     776             : 
     777             : /*
     778             :  * Append an option with an associated integer value to a server command
     779             :  * is being constructed.
     780             :  *
     781             :  * See comments for AppendPlainCommandOption, above.
     782             :  */
     783             : void
     784         256 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     785             :                            char *option_name, int32 option_value)
     786             : {
     787         256 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     788             : 
     789         256 :     appendPQExpBuffer(buf, " %d", option_value);
     790         256 : }
     791             : 
     792             : /*
     793             :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     794             :  * backend code.
     795             :  */
     796             : TimestampTz
     797        4428 : feGetCurrentTimestamp(void)
     798             : {
     799             :     TimestampTz result;
     800             :     struct timeval tp;
     801             : 
     802        4428 :     gettimeofday(&tp, NULL);
     803             : 
     804        4428 :     result = (TimestampTz) tp.tv_sec -
     805             :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     806        4428 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
     807             : 
     808        4428 :     return result;
     809             : }
     810             : 
     811             : /*
     812             :  * Frontend version of TimestampDifference(), since we are not linked with
     813             :  * backend code.
     814             :  */
     815             : void
     816        3420 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     817             :                       long *secs, int *microsecs)
     818             : {
     819        3420 :     TimestampTz diff = stop_time - start_time;
     820             : 
     821        3420 :     if (diff <= 0)
     822             :     {
     823           0 :         *secs = 0;
     824           0 :         *microsecs = 0;
     825             :     }
     826             :     else
     827             :     {
     828        3420 :         *secs = (long) (diff / USECS_PER_SEC);
     829        3420 :         *microsecs = (int) (diff % USECS_PER_SEC);
     830             :     }
     831        3420 : }
     832             : 
     833             : /*
     834             :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     835             :  * linked with backend code.
     836             :  */
     837             : bool
     838        5800 : feTimestampDifferenceExceeds(TimestampTz start_time,
     839             :                              TimestampTz stop_time,
     840             :                              int msec)
     841             : {
     842        5800 :     TimestampTz diff = stop_time - start_time;
     843             : 
     844        5800 :     return (diff >= msec * INT64CONST(1000));
     845             : }
     846             : 
     847             : /*
     848             :  * Converts an int64 to network byte order.
     849             :  */
     850             : void
     851         992 : fe_sendint64(int64 i, char *buf)
     852             : {
     853         992 :     uint64      n64 = pg_hton64(i);
     854             : 
     855         992 :     memcpy(buf, &n64, sizeof(n64));
     856         992 : }
     857             : 
     858             : /*
     859             :  * Converts an int64 from network byte order to native format.
     860             :  */
     861             : int64
     862        6692 : fe_recvint64(char *buf)
     863             : {
     864             :     uint64      n64;
     865             : 
     866        6692 :     memcpy(&n64, buf, sizeof(n64));
     867             : 
     868        6692 :     return pg_ntoh64(n64);
     869             : }

Generated by: LCOV version 1.14