LCOV - code coverage report
Current view: top level - src/bin/pg_basebackup - streamutil.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 247 343 72.0 %
Date: 2024-03-29 05:11:05 Functions: 17 17 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * streamutil.c - utility functions for pg_basebackup, pg_receivewal and
       4             :  *                  pg_recvlogical
       5             :  *
       6             :  * Author: Magnus Hagander <magnus@hagander.net>
       7             :  *
       8             :  * Portions Copyright (c) 1996-2024, PostgreSQL Global Development Group
       9             :  *
      10             :  * IDENTIFICATION
      11             :  *        src/bin/pg_basebackup/streamutil.c
      12             :  *-------------------------------------------------------------------------
      13             :  */
      14             : 
      15             : #include "postgres_fe.h"
      16             : 
      17             : #include <sys/time.h>
      18             : #include <unistd.h>
      19             : 
      20             : #include "access/xlog_internal.h"
      21             : #include "common/connect.h"
      22             : #include "common/fe_memutils.h"
      23             : #include "common/file_perm.h"
      24             : #include "common/logging.h"
      25             : #include "common/string.h"
      26             : #include "datatype/timestamp.h"
      27             : #include "port/pg_bswap.h"
      28             : #include "pqexpbuffer.h"
      29             : #include "receivelog.h"
      30             : #include "streamutil.h"
      31             : 
      32             : #define ERRCODE_DUPLICATE_OBJECT  "42710"
      33             : 
      34             : int         WalSegSz;
      35             : 
      36             : static bool RetrieveDataDirCreatePerm(PGconn *conn);
      37             : static char *FindDbnameInConnParams(PQconninfoOption *conn_opts);
      38             : 
      39             : /* SHOW command for replication connection was introduced in version 10 */
      40             : #define MINIMUM_VERSION_FOR_SHOW_CMD 100000
      41             : 
      42             : /*
      43             :  * Group access is supported from version 11.
      44             :  */
      45             : #define MINIMUM_VERSION_FOR_GROUP_ACCESS 110000
      46             : 
      47             : const char *progname;
      48             : char       *connection_string = NULL;
      49             : char       *dbhost = NULL;
      50             : char       *dbuser = NULL;
      51             : char       *dbport = NULL;
      52             : char       *dbname = NULL;
      53             : int         dbgetpassword = 0;  /* 0=auto, -1=never, 1=always */
      54             : static char *password = NULL;
      55             : PGconn     *conn = NULL;
      56             : 
      57             : /*
      58             :  * Connect to the server. Returns a valid PGconn pointer if connected,
      59             :  * or NULL on non-permanent error. On permanent error, the function will
      60             :  * call exit(1) directly.
      61             :  */
      62             : PGconn *
      63         676 : GetConnection(void)
      64             : {
      65             :     PGconn     *tmpconn;
      66         676 :     int         argcount = 7;   /* dbname, replication, fallback_app_name,
      67             :                                  * host, user, port, password */
      68             :     int         i;
      69             :     const char **keywords;
      70             :     const char **values;
      71             :     const char *tmpparam;
      72             :     bool        need_password;
      73         676 :     PQconninfoOption *conn_opts = NULL;
      74             :     PQconninfoOption *conn_opt;
      75         676 :     char       *err_msg = NULL;
      76             : 
      77             :     /* pg_recvlogical uses dbname only; others use connection_string only. */
      78             :     Assert(dbname == NULL || connection_string == NULL);
      79             : 
      80             :     /*
      81             :      * Merge the connection info inputs given in form of connection string,
      82             :      * options and default values (dbname=replication, replication=true, etc.)
      83             :      */
      84         676 :     i = 0;
      85         676 :     if (connection_string)
      86             :     {
      87           8 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
      88           8 :         if (conn_opts == NULL)
      89           0 :             pg_fatal("%s", err_msg);
      90             : 
      91         328 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
      92             :         {
      93         320 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
      94          16 :                 argcount++;
      95             :         }
      96             : 
      97           8 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
      98           8 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
      99             : 
     100             :         /*
     101             :          * Set dbname here already, so it can be overridden by a dbname in the
     102             :          * connection string.
     103             :          */
     104           8 :         keywords[i] = "dbname";
     105           8 :         values[i] = "replication";
     106           8 :         i++;
     107             : 
     108         328 :         for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     109             :         {
     110         320 :             if (conn_opt->val != NULL && conn_opt->val[0] != '\0')
     111             :             {
     112          16 :                 keywords[i] = conn_opt->keyword;
     113          16 :                 values[i] = conn_opt->val;
     114          16 :                 i++;
     115             :             }
     116             :         }
     117             :     }
     118             :     else
     119             :     {
     120         668 :         keywords = pg_malloc0((argcount + 1) * sizeof(*keywords));
     121         668 :         values = pg_malloc0((argcount + 1) * sizeof(*values));
     122         668 :         keywords[i] = "dbname";
     123         668 :         values[i] = dbname;
     124         668 :         i++;
     125             :     }
     126             : 
     127         676 :     keywords[i] = "replication";
     128         676 :     values[i] = dbname == NULL ? "true" : "database";
     129         676 :     i++;
     130         676 :     keywords[i] = "fallback_application_name";
     131         676 :     values[i] = progname;
     132         676 :     i++;
     133             : 
     134         676 :     if (dbhost)
     135             :     {
     136         246 :         keywords[i] = "host";
     137         246 :         values[i] = dbhost;
     138         246 :         i++;
     139             :     }
     140         676 :     if (dbuser)
     141             :     {
     142          14 :         keywords[i] = "user";
     143          14 :         values[i] = dbuser;
     144          14 :         i++;
     145             :     }
     146         676 :     if (dbport)
     147             :     {
     148         246 :         keywords[i] = "port";
     149         246 :         values[i] = dbport;
     150         246 :         i++;
     151             :     }
     152             : 
     153             :     /* If -W was given, force prompt for password, but only the first time */
     154         676 :     need_password = (dbgetpassword == 1 && !password);
     155             : 
     156             :     do
     157             :     {
     158             :         /* Get a new password if appropriate */
     159         676 :         if (need_password)
     160             :         {
     161           0 :             free(password);
     162           0 :             password = simple_prompt("Password: ", false);
     163           0 :             need_password = false;
     164             :         }
     165             : 
     166             :         /* Use (or reuse, on a subsequent connection) password if we have it */
     167         676 :         if (password)
     168             :         {
     169           0 :             keywords[i] = "password";
     170           0 :             values[i] = password;
     171             :         }
     172             :         else
     173             :         {
     174         676 :             keywords[i] = NULL;
     175         676 :             values[i] = NULL;
     176             :         }
     177             : 
     178             :         /*
     179             :          * Only expand dbname when we did not already parse the argument as a
     180             :          * connection string ourselves.
     181             :          */
     182         676 :         tmpconn = PQconnectdbParams(keywords, values, !connection_string);
     183             : 
     184             :         /*
     185             :          * If there is too little memory even to allocate the PGconn object
     186             :          * and PQconnectdbParams returns NULL, we call exit(1) directly.
     187             :          */
     188         676 :         if (!tmpconn)
     189           0 :             pg_fatal("could not connect to server");
     190             : 
     191             :         /* If we need a password and -w wasn't given, loop back and get one */
     192         680 :         if (PQstatus(tmpconn) == CONNECTION_BAD &&
     193           4 :             PQconnectionNeedsPassword(tmpconn) &&
     194           0 :             dbgetpassword != -1)
     195             :         {
     196           0 :             PQfinish(tmpconn);
     197           0 :             need_password = true;
     198             :         }
     199             :     }
     200         676 :     while (need_password);
     201             : 
     202         676 :     if (PQstatus(tmpconn) != CONNECTION_OK)
     203             :     {
     204           4 :         pg_log_error("%s", PQerrorMessage(tmpconn));
     205           4 :         PQfinish(tmpconn);
     206           4 :         free(values);
     207           4 :         free(keywords);
     208           4 :         PQconninfoFree(conn_opts);
     209           4 :         return NULL;
     210             :     }
     211             : 
     212             :     /* Connection ok! */
     213         672 :     free(values);
     214         672 :     free(keywords);
     215         672 :     PQconninfoFree(conn_opts);
     216             : 
     217             :     /*
     218             :      * Set always-secure search path, so malicious users can't get control.
     219             :      * The capacity to run normal SQL queries was added in PostgreSQL 10, so
     220             :      * the search path cannot be changed (by us or attackers) on earlier
     221             :      * versions.
     222             :      */
     223         672 :     if (dbname != NULL && PQserverVersion(tmpconn) >= 100000)
     224             :     {
     225             :         PGresult   *res;
     226             : 
     227          94 :         res = PQexec(tmpconn, ALWAYS_SECURE_SEARCH_PATH_SQL);
     228          94 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     229             :         {
     230           0 :             pg_log_error("could not clear search_path: %s",
     231             :                          PQerrorMessage(tmpconn));
     232           0 :             PQclear(res);
     233           0 :             PQfinish(tmpconn);
     234           0 :             exit(1);
     235             :         }
     236          94 :         PQclear(res);
     237             :     }
     238             : 
     239             :     /*
     240             :      * Ensure we have the same value of integer_datetimes (now always "on") as
     241             :      * the server we are connecting to.
     242             :      */
     243         672 :     tmpparam = PQparameterStatus(tmpconn, "integer_datetimes");
     244         672 :     if (!tmpparam)
     245             :     {
     246           0 :         pg_log_error("could not determine server setting for integer_datetimes");
     247           0 :         PQfinish(tmpconn);
     248           0 :         exit(1);
     249             :     }
     250             : 
     251         672 :     if (strcmp(tmpparam, "on") != 0)
     252             :     {
     253           0 :         pg_log_error("integer_datetimes compile flag does not match server");
     254           0 :         PQfinish(tmpconn);
     255           0 :         exit(1);
     256             :     }
     257             : 
     258             :     /*
     259             :      * Retrieve the source data directory mode and use it to construct a umask
     260             :      * for creating directories and files.
     261             :      */
     262         672 :     if (!RetrieveDataDirCreatePerm(tmpconn))
     263             :     {
     264           0 :         PQfinish(tmpconn);
     265           0 :         exit(1);
     266             :     }
     267             : 
     268         672 :     return tmpconn;
     269             : }
     270             : 
     271             : /*
     272             :  * FindDbnameInConnParams
     273             :  *
     274             :  * This is a helper function for GetDbnameFromConnectionOptions(). Extract
     275             :  * the value of dbname from PQconninfoOption parameters, if it's present.
     276             :  * Returns a strdup'd result or NULL.
     277             :  */
     278             : static char *
     279           6 : FindDbnameInConnParams(PQconninfoOption *conn_opts)
     280             : {
     281             :     PQconninfoOption *conn_opt;
     282             : 
     283          42 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     284             :     {
     285          42 :         if (strcmp(conn_opt->keyword, "dbname") == 0 &&
     286           6 :             conn_opt->val != NULL && conn_opt->val[0] != '\0')
     287           6 :             return pg_strdup(conn_opt->val);
     288             :     }
     289           0 :     return NULL;
     290             : }
     291             : 
     292             : /*
     293             :  * GetDbnameFromConnectionOptions
     294             :  *
     295             :  * This is a special purpose function to retrieve the dbname from either the
     296             :  * connection_string specified by the user or from the environment variables.
     297             :  *
     298             :  * We follow GetConnection() to fetch the dbname from various connection
     299             :  * options.
     300             :  *
     301             :  * Returns NULL, if dbname is not specified by the user in the above
     302             :  * mentioned connection options.
     303             :  */
     304             : char *
     305           6 : GetDbnameFromConnectionOptions(void)
     306             : {
     307             :     PQconninfoOption *conn_opts;
     308           6 :     char       *err_msg = NULL;
     309             :     char       *dbname;
     310             : 
     311             :     /* First try to get the dbname from connection string. */
     312           6 :     if (connection_string)
     313             :     {
     314           2 :         conn_opts = PQconninfoParse(connection_string, &err_msg);
     315           2 :         if (conn_opts == NULL)
     316           0 :             pg_fatal("%s", err_msg);
     317             : 
     318           2 :         dbname = FindDbnameInConnParams(conn_opts);
     319             : 
     320           2 :         PQconninfoFree(conn_opts);
     321           2 :         if (dbname)
     322           2 :             return dbname;
     323             :     }
     324             : 
     325             :     /*
     326             :      * Next try to get the dbname from default values that are available from
     327             :      * the environment.
     328             :      */
     329           4 :     conn_opts = PQconndefaults();
     330           4 :     if (conn_opts == NULL)
     331           0 :         pg_fatal("out of memory");
     332             : 
     333           4 :     dbname = FindDbnameInConnParams(conn_opts);
     334             : 
     335           4 :     PQconninfoFree(conn_opts);
     336           4 :     return dbname;
     337             : }
     338             : 
     339             : /*
     340             :  * From version 10, explicitly set wal segment size using SHOW wal_segment_size
     341             :  * since ControlFile is not accessible here.
     342             :  */
     343             : bool
     344         330 : RetrieveWalSegSize(PGconn *conn)
     345             : {
     346             :     PGresult   *res;
     347             :     char        xlog_unit[3];
     348             :     int         xlog_val,
     349         330 :                 multiplier = 1;
     350             : 
     351             :     /* check connection existence */
     352             :     Assert(conn != NULL);
     353             : 
     354             :     /* for previous versions set the default xlog seg size */
     355         330 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_SHOW_CMD)
     356             :     {
     357           0 :         WalSegSz = DEFAULT_XLOG_SEG_SIZE;
     358           0 :         return true;
     359             :     }
     360             : 
     361         330 :     res = PQexec(conn, "SHOW wal_segment_size");
     362         330 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     363             :     {
     364           0 :         pg_log_error("could not send replication command \"%s\": %s",
     365             :                      "SHOW wal_segment_size", PQerrorMessage(conn));
     366             : 
     367           0 :         PQclear(res);
     368           0 :         return false;
     369             :     }
     370         330 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     371             :     {
     372           0 :         pg_log_error("could not fetch WAL segment size: got %d rows and %d fields, expected %d rows and %d or more fields",
     373             :                      PQntuples(res), PQnfields(res), 1, 1);
     374             : 
     375           0 :         PQclear(res);
     376           0 :         return false;
     377             :     }
     378             : 
     379             :     /* fetch xlog value and unit from the result */
     380         330 :     if (sscanf(PQgetvalue(res, 0, 0), "%d%2s", &xlog_val, xlog_unit) != 2)
     381             :     {
     382           0 :         pg_log_error("WAL segment size could not be parsed");
     383           0 :         PQclear(res);
     384           0 :         return false;
     385             :     }
     386             : 
     387         330 :     PQclear(res);
     388             : 
     389             :     /* set the multiplier based on unit to convert xlog_val to bytes */
     390         330 :     if (strcmp(xlog_unit, "MB") == 0)
     391         330 :         multiplier = 1024 * 1024;
     392           0 :     else if (strcmp(xlog_unit, "GB") == 0)
     393           0 :         multiplier = 1024 * 1024 * 1024;
     394             : 
     395             :     /* convert and set WalSegSz */
     396         330 :     WalSegSz = xlog_val * multiplier;
     397             : 
     398         330 :     if (!IsValidWalSegSize(WalSegSz))
     399             :     {
     400           0 :         pg_log_error(ngettext("remote server reported invalid WAL segment size (%d byte)",
     401             :                               "remote server reported invalid WAL segment size (%d bytes)",
     402             :                               WalSegSz),
     403             :                      WalSegSz);
     404           0 :         pg_log_error_detail("The WAL segment size must be a power of two between 1 MB and 1 GB.");
     405           0 :         return false;
     406             :     }
     407             : 
     408         330 :     return true;
     409             : }
     410             : 
     411             : /*
     412             :  * RetrieveDataDirCreatePerm
     413             :  *
     414             :  * This function is used to determine the privileges on the server's PG data
     415             :  * directory and, based on that, set what the permissions will be for
     416             :  * directories and files we create.
     417             :  *
     418             :  * PG11 added support for (optionally) group read/execute rights to be set on
     419             :  * the data directory.  Prior to PG11, only the owner was allowed to have rights
     420             :  * on the data directory.
     421             :  */
     422             : static bool
     423         672 : RetrieveDataDirCreatePerm(PGconn *conn)
     424             : {
     425             :     PGresult   *res;
     426             :     int         data_directory_mode;
     427             : 
     428             :     /* check connection existence */
     429             :     Assert(conn != NULL);
     430             : 
     431             :     /* for previous versions leave the default group access */
     432         672 :     if (PQserverVersion(conn) < MINIMUM_VERSION_FOR_GROUP_ACCESS)
     433           0 :         return true;
     434             : 
     435         672 :     res = PQexec(conn, "SHOW data_directory_mode");
     436         672 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     437             :     {
     438           0 :         pg_log_error("could not send replication command \"%s\": %s",
     439             :                      "SHOW data_directory_mode", PQerrorMessage(conn));
     440             : 
     441           0 :         PQclear(res);
     442           0 :         return false;
     443             :     }
     444         672 :     if (PQntuples(res) != 1 || PQnfields(res) < 1)
     445             :     {
     446           0 :         pg_log_error("could not fetch group access flag: got %d rows and %d fields, expected %d rows and %d or more fields",
     447             :                      PQntuples(res), PQnfields(res), 1, 1);
     448             : 
     449           0 :         PQclear(res);
     450           0 :         return false;
     451             :     }
     452             : 
     453         672 :     if (sscanf(PQgetvalue(res, 0, 0), "%o", &data_directory_mode) != 1)
     454             :     {
     455           0 :         pg_log_error("group access flag could not be parsed: %s",
     456             :                      PQgetvalue(res, 0, 0));
     457             : 
     458           0 :         PQclear(res);
     459           0 :         return false;
     460             :     }
     461             : 
     462         672 :     SetDataDirectoryCreatePerm(data_directory_mode);
     463             : 
     464         672 :     PQclear(res);
     465         672 :     return true;
     466             : }
     467             : 
     468             : /*
     469             :  * Run IDENTIFY_SYSTEM through a given connection and give back to caller
     470             :  * some result information if requested:
     471             :  * - System identifier
     472             :  * - Current timeline ID
     473             :  * - Start LSN position
     474             :  * - Database name (NULL in servers prior to 9.4)
     475             :  */
     476             : bool
     477         688 : RunIdentifySystem(PGconn *conn, char **sysid, TimeLineID *starttli,
     478             :                   XLogRecPtr *startpos, char **db_name)
     479             : {
     480             :     PGresult   *res;
     481             :     uint32      hi,
     482             :                 lo;
     483             : 
     484             :     /* Check connection existence */
     485             :     Assert(conn != NULL);
     486             : 
     487         688 :     res = PQexec(conn, "IDENTIFY_SYSTEM");
     488         688 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     489             :     {
     490           0 :         pg_log_error("could not send replication command \"%s\": %s",
     491             :                      "IDENTIFY_SYSTEM", PQerrorMessage(conn));
     492             : 
     493           0 :         PQclear(res);
     494           0 :         return false;
     495             :     }
     496         688 :     if (PQntuples(res) != 1 || PQnfields(res) < 3)
     497             :     {
     498           0 :         pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     499             :                      PQntuples(res), PQnfields(res), 1, 3);
     500             : 
     501           0 :         PQclear(res);
     502           0 :         return false;
     503             :     }
     504             : 
     505             :     /* Get system identifier */
     506         688 :     if (sysid != NULL)
     507         576 :         *sysid = pg_strdup(PQgetvalue(res, 0, 0));
     508             : 
     509             :     /* Get timeline ID to start streaming from */
     510         688 :     if (starttli != NULL)
     511         576 :         *starttli = atoi(PQgetvalue(res, 0, 1));
     512             : 
     513             :     /* Get LSN start position if necessary */
     514         688 :     if (startpos != NULL)
     515             :     {
     516          14 :         if (sscanf(PQgetvalue(res, 0, 2), "%X/%X", &hi, &lo) != 2)
     517             :         {
     518           0 :             pg_log_error("could not parse write-ahead log location \"%s\"",
     519             :                          PQgetvalue(res, 0, 2));
     520             : 
     521           0 :             PQclear(res);
     522           0 :             return false;
     523             :         }
     524          14 :         *startpos = ((uint64) hi) << 32 | lo;
     525             :     }
     526             : 
     527             :     /* Get database name, only available in 9.4 and newer versions */
     528         688 :     if (db_name != NULL)
     529             :     {
     530         112 :         *db_name = NULL;
     531         112 :         if (PQserverVersion(conn) >= 90400)
     532             :         {
     533         112 :             if (PQnfields(res) < 4)
     534             :             {
     535           0 :                 pg_log_error("could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields",
     536             :                              PQntuples(res), PQnfields(res), 1, 4);
     537             : 
     538           0 :                 PQclear(res);
     539           0 :                 return false;
     540             :             }
     541         112 :             if (!PQgetisnull(res, 0, 3))
     542          94 :                 *db_name = pg_strdup(PQgetvalue(res, 0, 3));
     543             :         }
     544             :     }
     545             : 
     546         688 :     PQclear(res);
     547         688 :     return true;
     548             : }
     549             : 
     550             : /*
     551             :  * Run READ_REPLICATION_SLOT through a given connection and give back to
     552             :  * caller some result information if requested for this slot:
     553             :  * - Start LSN position, InvalidXLogRecPtr if unknown.
     554             :  * - Current timeline ID, 0 if unknown.
     555             :  * Returns false on failure, and true otherwise.
     556             :  */
     557             : bool
     558           6 : GetSlotInformation(PGconn *conn, const char *slot_name,
     559             :                    XLogRecPtr *restart_lsn, TimeLineID *restart_tli)
     560             : {
     561             :     PGresult   *res;
     562             :     PQExpBuffer query;
     563           6 :     XLogRecPtr  lsn_loc = InvalidXLogRecPtr;
     564           6 :     TimeLineID  tli_loc = 0;
     565             : 
     566           6 :     if (restart_lsn)
     567           6 :         *restart_lsn = lsn_loc;
     568           6 :     if (restart_tli)
     569           6 :         *restart_tli = tli_loc;
     570             : 
     571           6 :     query = createPQExpBuffer();
     572           6 :     appendPQExpBuffer(query, "READ_REPLICATION_SLOT %s", slot_name);
     573           6 :     res = PQexec(conn, query->data);
     574           6 :     destroyPQExpBuffer(query);
     575             : 
     576           6 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     577             :     {
     578           0 :         pg_log_error("could not send replication command \"%s\": %s",
     579             :                      "READ_REPLICATION_SLOT", PQerrorMessage(conn));
     580           0 :         PQclear(res);
     581           0 :         return false;
     582             :     }
     583             : 
     584             :     /* The command should always return precisely one tuple and three fields */
     585           6 :     if (PQntuples(res) != 1 || PQnfields(res) != 3)
     586             :     {
     587           0 :         pg_log_error("could not read replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     588             :                      slot_name, PQntuples(res), PQnfields(res), 1, 3);
     589           0 :         PQclear(res);
     590           0 :         return false;
     591             :     }
     592             : 
     593             :     /*
     594             :      * When the slot doesn't exist, the command returns a tuple with NULL
     595             :      * values.  This checks only the slot type field.
     596             :      */
     597           6 :     if (PQgetisnull(res, 0, 0))
     598             :     {
     599           2 :         pg_log_error("replication slot \"%s\" does not exist", slot_name);
     600           2 :         PQclear(res);
     601           2 :         return false;
     602             :     }
     603             : 
     604             :     /*
     605             :      * Note that this cannot happen as READ_REPLICATION_SLOT supports only
     606             :      * physical slots, but play it safe.
     607             :      */
     608           4 :     if (strcmp(PQgetvalue(res, 0, 0), "physical") != 0)
     609             :     {
     610           0 :         pg_log_error("expected a physical replication slot, got type \"%s\" instead",
     611             :                      PQgetvalue(res, 0, 0));
     612           0 :         PQclear(res);
     613           0 :         return false;
     614             :     }
     615             : 
     616             :     /* restart LSN */
     617           4 :     if (!PQgetisnull(res, 0, 1))
     618             :     {
     619             :         uint32      hi,
     620             :                     lo;
     621             : 
     622           4 :         if (sscanf(PQgetvalue(res, 0, 1), "%X/%X", &hi, &lo) != 2)
     623             :         {
     624           0 :             pg_log_error("could not parse restart_lsn \"%s\" for replication slot \"%s\"",
     625             :                          PQgetvalue(res, 0, 1), slot_name);
     626           0 :             PQclear(res);
     627           0 :             return false;
     628             :         }
     629           4 :         lsn_loc = ((uint64) hi) << 32 | lo;
     630             :     }
     631             : 
     632             :     /* current TLI */
     633           4 :     if (!PQgetisnull(res, 0, 2))
     634           4 :         tli_loc = (TimeLineID) atol(PQgetvalue(res, 0, 2));
     635             : 
     636           4 :     PQclear(res);
     637             : 
     638             :     /* Assign results if requested */
     639           4 :     if (restart_lsn)
     640           4 :         *restart_lsn = lsn_loc;
     641           4 :     if (restart_tli)
     642           4 :         *restart_tli = tli_loc;
     643             : 
     644           4 :     return true;
     645             : }
     646             : 
     647             : /*
     648             :  * Create a replication slot for the given connection. This function
     649             :  * returns true in case of success.
     650             :  */
     651             : bool
     652         282 : CreateReplicationSlot(PGconn *conn, const char *slot_name, const char *plugin,
     653             :                       bool is_temporary, bool is_physical, bool reserve_wal,
     654             :                       bool slot_exists_ok, bool two_phase)
     655             : {
     656             :     PQExpBuffer query;
     657             :     PGresult   *res;
     658         282 :     bool        use_new_option_syntax = (PQserverVersion(conn) >= 150000);
     659             : 
     660         282 :     query = createPQExpBuffer();
     661             : 
     662             :     Assert((is_physical && plugin == NULL) ||
     663             :            (!is_physical && plugin != NULL));
     664             :     Assert(!(two_phase && is_physical));
     665             :     Assert(slot_name != NULL);
     666             : 
     667             :     /* Build base portion of query */
     668         282 :     appendPQExpBuffer(query, "CREATE_REPLICATION_SLOT \"%s\"", slot_name);
     669         282 :     if (is_temporary)
     670         230 :         appendPQExpBufferStr(query, " TEMPORARY");
     671         282 :     if (is_physical)
     672         236 :         appendPQExpBufferStr(query, " PHYSICAL");
     673             :     else
     674          46 :         appendPQExpBuffer(query, " LOGICAL \"%s\"", plugin);
     675             : 
     676             :     /* Add any requested options */
     677         282 :     if (use_new_option_syntax)
     678         282 :         appendPQExpBufferStr(query, " (");
     679         282 :     if (is_physical)
     680             :     {
     681         236 :         if (reserve_wal)
     682         234 :             AppendPlainCommandOption(query, use_new_option_syntax,
     683             :                                      "RESERVE_WAL");
     684             :     }
     685             :     else
     686             :     {
     687          46 :         if (two_phase && PQserverVersion(conn) >= 150000)
     688           2 :             AppendPlainCommandOption(query, use_new_option_syntax,
     689             :                                      "TWO_PHASE");
     690             : 
     691          46 :         if (PQserverVersion(conn) >= 100000)
     692             :         {
     693             :             /* pg_recvlogical doesn't use an exported snapshot, so suppress */
     694          46 :             if (use_new_option_syntax)
     695          46 :                 AppendStringCommandOption(query, use_new_option_syntax,
     696             :                                           "SNAPSHOT", "nothing");
     697             :             else
     698           0 :                 AppendPlainCommandOption(query, use_new_option_syntax,
     699             :                                          "NOEXPORT_SNAPSHOT");
     700             :         }
     701             :     }
     702         282 :     if (use_new_option_syntax)
     703             :     {
     704             :         /* Suppress option list if it would be empty, otherwise terminate */
     705         282 :         if (query->data[query->len - 1] == '(')
     706             :         {
     707           2 :             query->len -= 2;
     708           2 :             query->data[query->len] = '\0';
     709             :         }
     710             :         else
     711         280 :             appendPQExpBufferChar(query, ')');
     712             :     }
     713             : 
     714             :     /* Now run the query */
     715         282 :     res = PQexec(conn, query->data);
     716         282 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     717             :     {
     718           2 :         const char *sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     719             : 
     720           2 :         if (slot_exists_ok &&
     721           0 :             sqlstate &&
     722           0 :             strcmp(sqlstate, ERRCODE_DUPLICATE_OBJECT) == 0)
     723             :         {
     724           0 :             destroyPQExpBuffer(query);
     725           0 :             PQclear(res);
     726           0 :             return true;
     727             :         }
     728             :         else
     729             :         {
     730           2 :             pg_log_error("could not send replication command \"%s\": %s",
     731             :                          query->data, PQerrorMessage(conn));
     732             : 
     733           2 :             destroyPQExpBuffer(query);
     734           2 :             PQclear(res);
     735           2 :             return false;
     736             :         }
     737             :     }
     738             : 
     739         280 :     if (PQntuples(res) != 1 || PQnfields(res) != 4)
     740             :     {
     741           0 :         pg_log_error("could not create replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     742             :                      slot_name,
     743             :                      PQntuples(res), PQnfields(res), 1, 4);
     744             : 
     745           0 :         destroyPQExpBuffer(query);
     746           0 :         PQclear(res);
     747           0 :         return false;
     748             :     }
     749             : 
     750         280 :     destroyPQExpBuffer(query);
     751         280 :     PQclear(res);
     752         280 :     return true;
     753             : }
     754             : 
     755             : /*
     756             :  * Drop a replication slot for the given connection. This function
     757             :  * returns true in case of success.
     758             :  */
     759             : bool
     760           4 : DropReplicationSlot(PGconn *conn, const char *slot_name)
     761             : {
     762             :     PQExpBuffer query;
     763             :     PGresult   *res;
     764             : 
     765             :     Assert(slot_name != NULL);
     766             : 
     767           4 :     query = createPQExpBuffer();
     768             : 
     769             :     /* Build query */
     770           4 :     appendPQExpBuffer(query, "DROP_REPLICATION_SLOT \"%s\"",
     771             :                       slot_name);
     772           4 :     res = PQexec(conn, query->data);
     773           4 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     774             :     {
     775           0 :         pg_log_error("could not send replication command \"%s\": %s",
     776             :                      query->data, PQerrorMessage(conn));
     777             : 
     778           0 :         destroyPQExpBuffer(query);
     779           0 :         PQclear(res);
     780           0 :         return false;
     781             :     }
     782             : 
     783           4 :     if (PQntuples(res) != 0 || PQnfields(res) != 0)
     784             :     {
     785           0 :         pg_log_error("could not drop replication slot \"%s\": got %d rows and %d fields, expected %d rows and %d fields",
     786             :                      slot_name,
     787             :                      PQntuples(res), PQnfields(res), 0, 0);
     788             : 
     789           0 :         destroyPQExpBuffer(query);
     790           0 :         PQclear(res);
     791           0 :         return false;
     792             :     }
     793             : 
     794           4 :     destroyPQExpBuffer(query);
     795           4 :     PQclear(res);
     796           4 :     return true;
     797             : }
     798             : 
     799             : /*
     800             :  * Append a "plain" option - one with no value - to a server command that
     801             :  * is being constructed.
     802             :  *
     803             :  * In the old syntax, all options were parser keywords, so you could just
     804             :  * write things like SOME_COMMAND OPTION1 OPTION2 'opt2value' OPTION3 42. The
     805             :  * new syntax uses a comma-separated list surrounded by parentheses, so the
     806             :  * equivalent is SOME_COMMAND (OPTION1, OPTION2 'optvalue', OPTION3 42).
     807             :  */
     808             : void
     809        2318 : AppendPlainCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     810             :                          char *option_name)
     811             : {
     812        2318 :     if (buf->len > 0 && buf->data[buf->len - 1] != '(')
     813             :     {
     814        1724 :         if (use_new_option_syntax)
     815        1724 :             appendPQExpBufferStr(buf, ", ");
     816             :         else
     817           0 :             appendPQExpBufferChar(buf, ' ');
     818             :     }
     819             : 
     820        2318 :     appendPQExpBuffer(buf, " %s", option_name);
     821        2318 : }
     822             : 
     823             : /*
     824             :  * Append an option with an associated string value to a server command that
     825             :  * is being constructed.
     826             :  *
     827             :  * See comments for AppendPlainCommandOption, above.
     828             :  */
     829             : void
     830        1376 : AppendStringCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     831             :                           char *option_name, char *option_value)
     832             : {
     833        1376 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     834             : 
     835        1376 :     if (option_value != NULL)
     836             :     {
     837        1376 :         size_t      length = strlen(option_value);
     838        1376 :         char       *escaped_value = palloc(1 + 2 * length);
     839             : 
     840        1376 :         PQescapeStringConn(conn, escaped_value, option_value, length, NULL);
     841        1376 :         appendPQExpBuffer(buf, " '%s'", escaped_value);
     842        1376 :         pfree(escaped_value);
     843             :     }
     844        1376 : }
     845             : 
     846             : /*
     847             :  * Append an option with an associated integer value to a server command
     848             :  * is being constructed.
     849             :  *
     850             :  * See comments for AppendPlainCommandOption, above.
     851             :  */
     852             : void
     853         302 : AppendIntegerCommandOption(PQExpBuffer buf, bool use_new_option_syntax,
     854             :                            char *option_name, int32 option_value)
     855             : {
     856         302 :     AppendPlainCommandOption(buf, use_new_option_syntax, option_name);
     857             : 
     858         302 :     appendPQExpBuffer(buf, " %d", option_value);
     859         302 : }
     860             : 
     861             : /*
     862             :  * Frontend version of GetCurrentTimestamp(), since we are not linked with
     863             :  * backend code.
     864             :  */
     865             : TimestampTz
     866        4350 : feGetCurrentTimestamp(void)
     867             : {
     868             :     TimestampTz result;
     869             :     struct timeval tp;
     870             : 
     871        4350 :     gettimeofday(&tp, NULL);
     872             : 
     873        4350 :     result = (TimestampTz) tp.tv_sec -
     874             :         ((POSTGRES_EPOCH_JDATE - UNIX_EPOCH_JDATE) * SECS_PER_DAY);
     875        4350 :     result = (result * USECS_PER_SEC) + tp.tv_usec;
     876             : 
     877        4350 :     return result;
     878             : }
     879             : 
     880             : /*
     881             :  * Frontend version of TimestampDifference(), since we are not linked with
     882             :  * backend code.
     883             :  */
     884             : void
     885        3328 : feTimestampDifference(TimestampTz start_time, TimestampTz stop_time,
     886             :                       long *secs, int *microsecs)
     887             : {
     888        3328 :     TimestampTz diff = stop_time - start_time;
     889             : 
     890        3328 :     if (diff <= 0)
     891             :     {
     892           0 :         *secs = 0;
     893           0 :         *microsecs = 0;
     894             :     }
     895             :     else
     896             :     {
     897        3328 :         *secs = (long) (diff / USECS_PER_SEC);
     898        3328 :         *microsecs = (int) (diff % USECS_PER_SEC);
     899             :     }
     900        3328 : }
     901             : 
     902             : /*
     903             :  * Frontend version of TimestampDifferenceExceeds(), since we are not
     904             :  * linked with backend code.
     905             :  */
     906             : bool
     907        5592 : feTimestampDifferenceExceeds(TimestampTz start_time,
     908             :                              TimestampTz stop_time,
     909             :                              int msec)
     910             : {
     911        5592 :     TimestampTz diff = stop_time - start_time;
     912             : 
     913        5592 :     return (diff >= msec * INT64CONST(1000));
     914             : }
     915             : 
     916             : /*
     917             :  * Converts an int64 to network byte order.
     918             :  */
     919             : void
     920        1176 : fe_sendint64(int64 i, char *buf)
     921             : {
     922        1176 :     uint64      n64 = pg_hton64(i);
     923             : 
     924        1176 :     memcpy(buf, &n64, sizeof(n64));
     925        1176 : }
     926             : 
     927             : /*
     928             :  * Converts an int64 from network byte order to native format.
     929             :  */
     930             : int64
     931        6350 : fe_recvint64(char *buf)
     932             : {
     933             :     uint64      n64;
     934             : 
     935        6350 :     memcpy(&n64, buf, sizeof(n64));
     936             : 
     937        6350 :     return pg_ntoh64(n64);
     938             : }

Generated by: LCOV version 1.14