LCOV - code coverage report
Current view: top level - src/backend/replication/libpqwalreceiver - libpqwalreceiver.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.5 % 424 354
Test Date: 2026-04-06 21:16:29 Functions: 95.5 % 22 21
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * libpqwalreceiver.c
       4              :  *
       5              :  * This file contains the libpq-specific parts of walreceiver. It's
       6              :  * loaded as a dynamic module to avoid linking the main server binary with
       7              :  * libpq.
       8              :  *
       9              :  * Apart from walreceiver, the libpq-specific routines are now being used by
      10              :  * logical replication workers and slot synchronization.
      11              :  *
      12              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      13              :  *
      14              :  *
      15              :  * IDENTIFICATION
      16              :  *    src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
      17              :  *
      18              :  *-------------------------------------------------------------------------
      19              :  */
      20              : #include "postgres.h"
      21              : 
      22              : #include <unistd.h>
      23              : #include <sys/time.h>
      24              : 
      25              : #include "common/connect.h"
      26              : #include "funcapi.h"
      27              : #include "libpq-fe.h"
      28              : #include "libpq/libpq-be-fe-helpers.h"
      29              : #include "mb/pg_wchar.h"
      30              : #include "miscadmin.h"
      31              : #include "pgstat.h"
      32              : #include "pqexpbuffer.h"
      33              : #include "replication/walreceiver.h"
      34              : #include "storage/latch.h"
      35              : #include "utils/builtins.h"
      36              : #include "utils/memutils.h"
      37              : #include "utils/pg_lsn.h"
      38              : #include "utils/tuplestore.h"
      39              : 
      40         1124 : PG_MODULE_MAGIC_EXT(
      41              :                     .name = "libpqwalreceiver",
      42              :                     .version = PG_VERSION
      43              : );
      44              : 
      45              : struct WalReceiverConn
      46              : {
      47              :     /* Current connection to the primary, if any */
      48              :     PGconn     *streamConn;
      49              :     /* Used to remember if the connection is logical or physical */
      50              :     bool        logical;
      51              :     /* Buffer for currently read records */
      52              :     char       *recvBuf;
      53              : };
      54              : 
      55              : /* Prototypes for interface functions */
      56              : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
      57              :                                          bool replication, bool logical,
      58              :                                          bool must_use_password,
      59              :                                          const char *appname, char **err);
      60              : static void libpqrcv_check_conninfo(const char *conninfo,
      61              :                                     bool must_use_password);
      62              : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
      63              : static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
      64              :                                     char **sender_host, int *sender_port);
      65              : static char *libpqrcv_identify_system(WalReceiverConn *conn,
      66              :                                       TimeLineID *primary_tli);
      67              : static char *libpqrcv_get_dbname_from_conninfo(const char *connInfo);
      68              : static char *libpqrcv_get_option_from_conninfo(const char *connInfo,
      69              :                                                const char *keyword);
      70              : static int  libpqrcv_server_version(WalReceiverConn *conn);
      71              : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
      72              :                                              TimeLineID tli, char **filename,
      73              :                                              char **content, int *len);
      74              : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
      75              :                                     const WalRcvStreamOptions *options);
      76              : static void libpqrcv_endstreaming(WalReceiverConn *conn,
      77              :                                   TimeLineID *next_tli);
      78              : static int  libpqrcv_receive(WalReceiverConn *conn, char **buffer,
      79              :                              pgsocket *wait_fd);
      80              : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
      81              :                           int nbytes);
      82              : static char *libpqrcv_create_slot(WalReceiverConn *conn,
      83              :                                   const char *slotname,
      84              :                                   bool temporary,
      85              :                                   bool two_phase,
      86              :                                   bool failover,
      87              :                                   CRSSnapshotAction snapshot_action,
      88              :                                   XLogRecPtr *lsn);
      89              : static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
      90              :                                 const bool *failover, const bool *two_phase);
      91              : static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
      92              : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
      93              :                                        const char *query,
      94              :                                        const int nRetTypes,
      95              :                                        const Oid *retTypes);
      96              : static void libpqrcv_disconnect(WalReceiverConn *conn);
      97              : 
      98              : static WalReceiverFunctionsType PQWalReceiverFunctions = {
      99              :     .walrcv_connect = libpqrcv_connect,
     100              :     .walrcv_check_conninfo = libpqrcv_check_conninfo,
     101              :     .walrcv_get_conninfo = libpqrcv_get_conninfo,
     102              :     .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
     103              :     .walrcv_identify_system = libpqrcv_identify_system,
     104              :     .walrcv_server_version = libpqrcv_server_version,
     105              :     .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
     106              :     .walrcv_startstreaming = libpqrcv_startstreaming,
     107              :     .walrcv_endstreaming = libpqrcv_endstreaming,
     108              :     .walrcv_receive = libpqrcv_receive,
     109              :     .walrcv_send = libpqrcv_send,
     110              :     .walrcv_create_slot = libpqrcv_create_slot,
     111              :     .walrcv_alter_slot = libpqrcv_alter_slot,
     112              :     .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
     113              :     .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
     114              :     .walrcv_exec = libpqrcv_exec,
     115              :     .walrcv_disconnect = libpqrcv_disconnect
     116              : };
     117              : 
     118              : /* Prototypes for private functions */
     119              : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
     120              : 
     121              : /*
     122              :  * Module initialization function
     123              :  */
     124              : void
     125         1124 : _PG_init(void)
     126              : {
     127         1124 :     if (WalReceiverFunctions != NULL)
     128            0 :         elog(ERROR, "libpqwalreceiver already loaded");
     129         1124 :     WalReceiverFunctions = &PQWalReceiverFunctions;
     130         1124 : }
     131              : 
     132              : /*
     133              :  * Establish the connection to the primary server.
     134              :  *
     135              :  * This function can be used for both replication and regular connections.
     136              :  * If it is a replication connection, it could be either logical or physical
     137              :  * based on input argument 'logical'.
     138              :  *
     139              :  * If an error occurs, this function will normally return NULL and set *err
     140              :  * to a palloc'ed error message. However, if must_use_password is true and
     141              :  * the connection fails to use the password, this function will ereport(ERROR).
     142              :  * We do this because in that case the error includes a detail and a hint for
     143              :  * consistency with other parts of the system, and it's not worth adding the
     144              :  * machinery to pass all of those back to the caller just to cover this one
     145              :  * case.
     146              :  */
     147              : static WalReceiverConn *
     148         1050 : libpqrcv_connect(const char *conninfo, bool replication, bool logical,
     149              :                  bool must_use_password, const char *appname, char **err)
     150              : {
     151              :     WalReceiverConn *conn;
     152              :     const char *keys[6];
     153              :     const char *vals[6];
     154         1050 :     int         i = 0;
     155         1050 :     char       *options_val = NULL;
     156              : 
     157              :     /*
     158              :      * Re-validate connection string. The validation already happened at DDL
     159              :      * time, but the subscription owner may have changed. If we don't recheck
     160              :      * with the correct must_use_password, it's possible that the connection
     161              :      * will obtain the password from a different source, such as PGPASSFILE or
     162              :      * PGPASSWORD.
     163              :      */
     164         1050 :     libpqrcv_check_conninfo(conninfo, must_use_password);
     165              : 
     166              :     /*
     167              :      * We use the expand_dbname parameter to process the connection string (or
     168              :      * URI), and pass some extra options.
     169              :      */
     170         1038 :     keys[i] = "dbname";
     171         1038 :     vals[i] = conninfo;
     172              : 
     173              :     /* We can not have logical without replication */
     174              :     Assert(replication || !logical);
     175              : 
     176         1038 :     if (replication)
     177              :     {
     178         1023 :         keys[++i] = "replication";
     179         1023 :         vals[i] = logical ? "database" : "true";
     180              : 
     181         1023 :         if (logical)
     182              :         {
     183          782 :             char       *opt = NULL;
     184              : 
     185              :             /* Tell the publisher to translate to our encoding */
     186          782 :             keys[++i] = "client_encoding";
     187          782 :             vals[i] = GetDatabaseEncodingName();
     188              : 
     189              :             /*
     190              :              * Force assorted GUC parameters to settings that ensure that the
     191              :              * publisher will output data values in a form that is unambiguous
     192              :              * to the subscriber.  (We don't want to modify the subscriber's
     193              :              * GUC settings, since that might surprise user-defined code
     194              :              * running in the subscriber, such as triggers.)  This should
     195              :              * match what pg_dump does.
     196              :              */
     197          782 :             opt = libpqrcv_get_option_from_conninfo(conninfo, "options");
     198          782 :             options_val = psprintf("%s -c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3",
     199              :                                    (opt == NULL) ? "" : opt);
     200          782 :             keys[++i] = "options";
     201          782 :             vals[i] = options_val;
     202          782 :             if (opt != NULL)
     203            9 :                 pfree(opt);
     204              :         }
     205              :         else
     206              :         {
     207              :             /*
     208              :              * The database name is ignored by the server in replication mode,
     209              :              * but specify "replication" for .pgpass lookup.
     210              :              */
     211          241 :             keys[++i] = "dbname";
     212          241 :             vals[i] = "replication";
     213              :         }
     214              :     }
     215              : 
     216         1038 :     keys[++i] = "fallback_application_name";
     217         1038 :     vals[i] = appname;
     218              : 
     219         1038 :     keys[++i] = NULL;
     220         1038 :     vals[i] = NULL;
     221              : 
     222              :     Assert(i < lengthof(keys));
     223              : 
     224         1038 :     conn = palloc0_object(WalReceiverConn);
     225         1038 :     conn->streamConn =
     226         1038 :         libpqsrv_connect_params(keys, vals,
     227              :                                  /* expand_dbname = */ true,
     228              :                                 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
     229              : 
     230         1038 :     if (options_val != NULL)
     231          782 :         pfree(options_val);
     232              : 
     233         1038 :     if (PQstatus(conn->streamConn) != CONNECTION_OK)
     234          133 :         goto bad_connection_errmsg;
     235              : 
     236          905 :     if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
     237              :     {
     238            0 :         libpqsrv_disconnect(conn->streamConn);
     239            0 :         pfree(conn);
     240              : 
     241            0 :         ereport(ERROR,
     242              :                 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     243              :                  errmsg("password is required"),
     244              :                  errdetail("Non-superuser cannot connect if the server does not request a password."),
     245              :                  errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
     246              :     }
     247              : 
     248          905 :     PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver,
     249              :                         "received message via replication");
     250              : 
     251              :     /*
     252              :      * Set always-secure search path for the cases where the connection is
     253              :      * used to run SQL queries, so malicious users can't get control.
     254              :      */
     255          905 :     if (!replication || logical)
     256              :     {
     257              :         PGresult   *res;
     258              : 
     259          756 :         res = libpqsrv_exec(conn->streamConn,
     260              :                             ALWAYS_SECURE_SEARCH_PATH_SQL,
     261              :                             WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     262          756 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     263              :         {
     264            0 :             PQclear(res);
     265            0 :             *err = psprintf(_("could not clear search path: %s"),
     266            0 :                             pchomp(PQerrorMessage(conn->streamConn)));
     267            0 :             goto bad_connection;
     268              :         }
     269          756 :         PQclear(res);
     270              :     }
     271              : 
     272          905 :     conn->logical = logical;
     273              : 
     274          905 :     return conn;
     275              : 
     276              :     /* error path, using libpq's error message */
     277          133 : bad_connection_errmsg:
     278          133 :     *err = pchomp(PQerrorMessage(conn->streamConn));
     279              : 
     280              :     /* error path, error already set */
     281          133 : bad_connection:
     282          133 :     libpqsrv_disconnect(conn->streamConn);
     283          133 :     pfree(conn);
     284          133 :     return NULL;
     285              : }
     286              : 
     287              : /*
     288              :  * Validate connection info string.
     289              :  *
     290              :  * If the connection string can't be parsed, this function will raise
     291              :  * an error. If must_use_password is true, the function raises an error
     292              :  * if no password is provided in the connection string. In any other case
     293              :  * it successfully completes.
     294              :  */
     295              : static void
     296         1275 : libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
     297              : {
     298         1275 :     PQconninfoOption *opts = NULL;
     299              :     PQconninfoOption *opt;
     300         1275 :     char       *err = NULL;
     301              : 
     302         1275 :     opts = PQconninfoParse(conninfo, &err);
     303         1275 :     if (opts == NULL)
     304              :     {
     305              :         /* The error string is malloc'd, so we must free it explicitly */
     306           12 :         char       *errcopy = err ? pstrdup(err) : "out of memory";
     307              : 
     308           12 :         PQfreemem(err);
     309           12 :         ereport(ERROR,
     310              :                 (errcode(ERRCODE_SYNTAX_ERROR),
     311              :                  errmsg("invalid connection string syntax: %s", errcopy)));
     312              :     }
     313              : 
     314         1263 :     if (must_use_password)
     315              :     {
     316           27 :         bool        uses_password = false;
     317              : 
     318          892 :         for (opt = opts; opt->keyword != NULL; ++opt)
     319              :         {
     320              :             /* Ignore connection options that are not present. */
     321          876 :             if (opt->val == NULL)
     322          816 :                 continue;
     323              : 
     324           60 :             if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
     325              :             {
     326           11 :                 uses_password = true;
     327           11 :                 break;
     328              :             }
     329              :         }
     330              : 
     331           27 :         if (!uses_password)
     332              :         {
     333              :             /* malloc'd, so we must free it explicitly */
     334           16 :             PQconninfoFree(opts);
     335              : 
     336           16 :             ereport(ERROR,
     337              :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     338              :                      errmsg("password is required"),
     339              :                      errdetail("Non-superusers must provide a password in the connection string.")));
     340              :         }
     341              :     }
     342              : 
     343         1247 :     PQconninfoFree(opts);
     344         1247 : }
     345              : 
     346              : /*
     347              :  * Return a user-displayable conninfo string.  Any security-sensitive fields
     348              :  * are obfuscated.
     349              :  */
     350              : static char *
     351          149 : libpqrcv_get_conninfo(WalReceiverConn *conn)
     352              : {
     353              :     PQconninfoOption *conn_opts;
     354              :     PQconninfoOption *conn_opt;
     355              :     PQExpBufferData buf;
     356              :     char       *retval;
     357              : 
     358              :     Assert(conn->streamConn != NULL);
     359              : 
     360          149 :     initPQExpBuffer(&buf);
     361          149 :     conn_opts = PQconninfo(conn->streamConn);
     362              : 
     363          149 :     if (conn_opts == NULL)
     364            0 :         ereport(ERROR,
     365              :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     366              :                  errmsg("could not parse connection string: %s",
     367              :                         _("out of memory"))));
     368              : 
     369              :     /* build a clean connection string from pieces */
     370         7897 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     371              :     {
     372              :         bool        obfuscate;
     373              : 
     374              :         /* Skip debug and empty options */
     375         7748 :         if (strchr(conn_opt->dispchar, 'D') ||
     376         7152 :             conn_opt->val == NULL ||
     377         2844 :             conn_opt->val[0] == '\0')
     378         5053 :             continue;
     379              : 
     380              :         /* Obfuscate security-sensitive options */
     381         2695 :         obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
     382              : 
     383         5390 :         appendPQExpBuffer(&buf, "%s%s=%s",
     384         2695 :                           buf.len == 0 ? "" : " ",
     385              :                           conn_opt->keyword,
     386              :                           obfuscate ? "********" : conn_opt->val);
     387              :     }
     388              : 
     389          149 :     PQconninfoFree(conn_opts);
     390              : 
     391          149 :     retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
     392          149 :     termPQExpBuffer(&buf);
     393          149 :     return retval;
     394              : }
     395              : 
     396              : /*
     397              :  * Provides information of sender this WAL receiver is connected to.
     398              :  */
     399              : static void
     400          149 : libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
     401              :                         int *sender_port)
     402              : {
     403          149 :     char       *ret = NULL;
     404              : 
     405          149 :     *sender_host = NULL;
     406          149 :     *sender_port = 0;
     407              : 
     408              :     Assert(conn->streamConn != NULL);
     409              : 
     410          149 :     ret = PQhost(conn->streamConn);
     411          149 :     if (ret && strlen(ret) != 0)
     412          149 :         *sender_host = pstrdup(ret);
     413              : 
     414          149 :     ret = PQport(conn->streamConn);
     415          149 :     if (ret && strlen(ret) != 0)
     416          149 :         *sender_port = atoi(ret);
     417          149 : }
     418              : 
     419              : /*
     420              :  * Check that primary's system identifier matches ours, and fetch the current
     421              :  * timeline ID of the primary.
     422              :  */
     423              : static char *
     424          420 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
     425              : {
     426              :     PGresult   *res;
     427              :     char       *primary_sysid;
     428              : 
     429              :     /*
     430              :      * Get the system identifier and timeline ID as a DataRow message from the
     431              :      * primary server.
     432              :      */
     433          420 :     res = libpqsrv_exec(conn->streamConn,
     434              :                         "IDENTIFY_SYSTEM",
     435              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     436          419 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     437            0 :         ereport(ERROR,
     438              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     439              :                  errmsg("could not receive database system identifier and timeline ID from "
     440              :                         "the primary server: %s",
     441              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     442              : 
     443              :     /*
     444              :      * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
     445              :      * 9.4 and onwards.
     446              :      */
     447          419 :     if (PQnfields(res) < 3 || PQntuples(res) != 1)
     448            0 :         ereport(ERROR,
     449              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     450              :                  errmsg("invalid response from primary server"),
     451              :                  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
     452              :                            PQntuples(res), PQnfields(res), 1, 3)));
     453          419 :     primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
     454          419 :     *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
     455          419 :     PQclear(res);
     456              : 
     457          419 :     return primary_sysid;
     458              : }
     459              : 
     460              : /*
     461              :  * Thin wrapper around libpq to obtain server version.
     462              :  */
     463              : static int
     464         1075 : libpqrcv_server_version(WalReceiverConn *conn)
     465              : {
     466         1075 :     return PQserverVersion(conn->streamConn);
     467              : }
     468              : 
     469              : /*
     470              :  * Get database name from the primary server's conninfo.
     471              :  *
     472              :  * If dbname is not found in connInfo, return NULL value.
     473              :  */
     474              : static char *
     475           16 : libpqrcv_get_dbname_from_conninfo(const char *connInfo)
     476              : {
     477           16 :     return libpqrcv_get_option_from_conninfo(connInfo, "dbname");
     478              : }
     479              : 
     480              : /*
     481              :  * Get the value of the option with the given keyword from the primary
     482              :  * server's conninfo.
     483              :  *
     484              :  * If the option is not found in connInfo, return NULL value.
     485              :  */
     486              : static char *
     487          798 : libpqrcv_get_option_from_conninfo(const char *connInfo, const char *keyword)
     488              : {
     489              :     PQconninfoOption *opts;
     490          798 :     char       *option = NULL;
     491          798 :     char       *err = NULL;
     492              : 
     493          798 :     opts = PQconninfoParse(connInfo, &err);
     494          798 :     if (opts == NULL)
     495              :     {
     496              :         /* The error string is malloc'd, so we must free it explicitly */
     497            0 :         char       *errcopy = err ? pstrdup(err) : "out of memory";
     498              : 
     499            0 :         PQfreemem(err);
     500            0 :         ereport(ERROR,
     501              :                 (errcode(ERRCODE_SYNTAX_ERROR),
     502              :                  errmsg("invalid connection string syntax: %s", errcopy)));
     503              :     }
     504              : 
     505        42294 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     506              :     {
     507              :         /*
     508              :          * If the same option appears multiple times, then the last one will
     509              :          * be returned
     510              :          */
     511        41496 :         if (strcmp(opt->keyword, keyword) == 0 && opt->val &&
     512           24 :             *opt->val)
     513              :         {
     514           24 :             if (option)
     515            0 :                 pfree(option);
     516              : 
     517           24 :             option = pstrdup(opt->val);
     518              :         }
     519              :     }
     520              : 
     521          798 :     PQconninfoFree(opts);
     522          798 :     return option;
     523              : }
     524              : 
     525              : /*
     526              :  * Start streaming WAL data from given streaming options.
     527              :  *
     528              :  * Returns true if we switched successfully to copy-both mode. False
     529              :  * means the server received the command and executed it successfully, but
     530              :  * didn't switch to copy-mode.  That means that there was no WAL on the
     531              :  * requested timeline and starting point, because the server switched to
     532              :  * another timeline at or before the requested starting point. On failure,
     533              :  * throws an ERROR.
     534              :  */
     535              : static bool
     536          619 : libpqrcv_startstreaming(WalReceiverConn *conn,
     537              :                         const WalRcvStreamOptions *options)
     538              : {
     539              :     StringInfoData cmd;
     540              :     PGresult   *res;
     541              : 
     542              :     Assert(options->logical == conn->logical);
     543              :     Assert(options->slotname || !options->logical);
     544              : 
     545          619 :     initStringInfo(&cmd);
     546              : 
     547              :     /* Build the command. */
     548          619 :     appendStringInfoString(&cmd, "START_REPLICATION");
     549          619 :     if (options->slotname != NULL)
     550          510 :         appendStringInfo(&cmd, " SLOT \"%s\"",
     551          510 :                          options->slotname);
     552              : 
     553          619 :     if (options->logical)
     554          459 :         appendStringInfoString(&cmd, " LOGICAL");
     555              : 
     556          619 :     appendStringInfo(&cmd, " %X/%08X", LSN_FORMAT_ARGS(options->startpoint));
     557              : 
     558              :     /*
     559              :      * Additional options are different depending on if we are doing logical
     560              :      * or physical replication.
     561              :      */
     562          619 :     if (options->logical)
     563              :     {
     564              :         char       *pubnames_str;
     565              :         List       *pubnames;
     566              :         char       *pubnames_literal;
     567              : 
     568          459 :         appendStringInfoString(&cmd, " (");
     569              : 
     570          459 :         appendStringInfo(&cmd, "proto_version '%u'",
     571          459 :                          options->proto.logical.proto_version);
     572              : 
     573          459 :         if (options->proto.logical.streaming_str)
     574          451 :             appendStringInfo(&cmd, ", streaming '%s'",
     575          451 :                              options->proto.logical.streaming_str);
     576              : 
     577          468 :         if (options->proto.logical.twophase &&
     578            9 :             PQserverVersion(conn->streamConn) >= 150000)
     579            9 :             appendStringInfoString(&cmd, ", two_phase 'on'");
     580              : 
     581          918 :         if (options->proto.logical.origin &&
     582          459 :             PQserverVersion(conn->streamConn) >= 160000)
     583          459 :             appendStringInfo(&cmd, ", origin '%s'",
     584          459 :                              options->proto.logical.origin);
     585              : 
     586          459 :         pubnames = options->proto.logical.publication_names;
     587          459 :         pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
     588          459 :         if (!pubnames_str)
     589            0 :             ereport(ERROR,
     590              :                     (errcode(ERRCODE_OUT_OF_MEMORY),    /* likely guess */
     591              :                      errmsg("could not start WAL streaming: %s",
     592              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     593          459 :         pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
     594              :                                            strlen(pubnames_str));
     595          459 :         if (!pubnames_literal)
     596            0 :             ereport(ERROR,
     597              :                     (errcode(ERRCODE_OUT_OF_MEMORY),    /* likely guess */
     598              :                      errmsg("could not start WAL streaming: %s",
     599              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     600          459 :         appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
     601          459 :         PQfreemem(pubnames_literal);
     602          459 :         pfree(pubnames_str);
     603              : 
     604          470 :         if (options->proto.logical.binary &&
     605           11 :             PQserverVersion(conn->streamConn) >= 140000)
     606           11 :             appendStringInfoString(&cmd, ", binary 'true'");
     607              : 
     608          459 :         appendStringInfoChar(&cmd, ')');
     609              :     }
     610              :     else
     611          160 :         appendStringInfo(&cmd, " TIMELINE %u",
     612          160 :                          options->proto.physical.startpointTLI);
     613              : 
     614              :     /* Start streaming. */
     615          619 :     res = libpqsrv_exec(conn->streamConn,
     616          619 :                         cmd.data,
     617              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     618          619 :     pfree(cmd.data);
     619              : 
     620          619 :     if (PQresultStatus(res) == PGRES_COMMAND_OK)
     621              :     {
     622            0 :         PQclear(res);
     623            0 :         return false;
     624              :     }
     625          619 :     else if (PQresultStatus(res) != PGRES_COPY_BOTH)
     626            2 :         ereport(ERROR,
     627              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     628              :                  errmsg("could not start WAL streaming: %s",
     629              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     630          617 :     PQclear(res);
     631          617 :     return true;
     632              : }
     633              : 
     634              : /*
     635              :  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
     636              :  * reported by the server, or 0 if it did not report it.
     637              :  */
     638              : static void
     639          254 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
     640              : {
     641              :     PGresult   *res;
     642              : 
     643              :     /*
     644              :      * Send copy-end message.  As in libpqsrv_exec, this could theoretically
     645              :      * block, but the risk seems small.
     646              :      */
     647          466 :     if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
     648          212 :         PQflush(conn->streamConn))
     649           42 :         ereport(ERROR,
     650              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     651              :                  errmsg("could not send end-of-streaming message to primary: %s",
     652              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     653              : 
     654          212 :     *next_tli = 0;
     655              : 
     656              :     /*
     657              :      * After COPY is finished, we should receive a result set indicating the
     658              :      * next timeline's ID, or just CommandComplete if the server was shut
     659              :      * down.
     660              :      *
     661              :      * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
     662              :      * also possible in case we aborted the copy in mid-stream.
     663              :      */
     664          212 :     res = libpqsrv_get_result(conn->streamConn,
     665              :                               WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     666          212 :     if (PQresultStatus(res) == PGRES_TUPLES_OK)
     667              :     {
     668              :         /*
     669              :          * Read the next timeline's ID. The server also sends the timeline's
     670              :          * starting point, but it is ignored.
     671              :          */
     672           12 :         if (PQnfields(res) < 2 || PQntuples(res) != 1)
     673            0 :             ereport(ERROR,
     674              :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     675              :                      errmsg("unexpected result set after end-of-streaming")));
     676           12 :         *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
     677           12 :         PQclear(res);
     678              : 
     679              :         /* the result set should be followed by CommandComplete */
     680           12 :         res = libpqsrv_get_result(conn->streamConn,
     681              :                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     682              :     }
     683          200 :     else if (PQresultStatus(res) == PGRES_COPY_OUT)
     684              :     {
     685          200 :         PQclear(res);
     686              : 
     687              :         /* End the copy */
     688          200 :         if (PQendcopy(conn->streamConn))
     689            0 :             ereport(ERROR,
     690              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     691              :                      errmsg("error while shutting down streaming COPY: %s",
     692              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     693              : 
     694              :         /* CommandComplete should follow */
     695          200 :         res = libpqsrv_get_result(conn->streamConn,
     696              :                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     697              :     }
     698              : 
     699          212 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     700            0 :         ereport(ERROR,
     701              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     702              :                  errmsg("error reading result of streaming command: %s",
     703              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     704          212 :     PQclear(res);
     705              : 
     706              :     /* Verify that there are no more results */
     707          212 :     res = libpqsrv_get_result(conn->streamConn,
     708              :                               WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     709          212 :     if (res != NULL)
     710            0 :         ereport(ERROR,
     711              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     712              :                  errmsg("unexpected result after CommandComplete: %s",
     713              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     714          212 : }
     715              : 
     716              : /*
     717              :  * Fetch the timeline history file for 'tli' from primary.
     718              :  */
     719              : static void
     720           11 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
     721              :                                  TimeLineID tli, char **filename,
     722              :                                  char **content, int *len)
     723              : {
     724              :     PGresult   *res;
     725              :     char        cmd[64];
     726              : 
     727              :     Assert(!conn->logical);
     728              : 
     729              :     /*
     730              :      * Request the primary to send over the history file for given timeline.
     731              :      */
     732           11 :     snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
     733           11 :     res = libpqsrv_exec(conn->streamConn,
     734              :                         cmd,
     735              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     736           11 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     737            0 :         ereport(ERROR,
     738              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     739              :                  errmsg("could not receive timeline history file from "
     740              :                         "the primary server: %s",
     741              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     742           11 :     if (PQnfields(res) != 2 || PQntuples(res) != 1)
     743            0 :         ereport(ERROR,
     744              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     745              :                  errmsg("invalid response from primary server"),
     746              :                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
     747              :                            PQntuples(res), PQnfields(res))));
     748           11 :     *filename = pstrdup(PQgetvalue(res, 0, 0));
     749              : 
     750           11 :     *len = PQgetlength(res, 0, 1);
     751           11 :     *content = palloc(*len);
     752           11 :     memcpy(*content, PQgetvalue(res, 0, 1), *len);
     753           11 :     PQclear(res);
     754           11 : }
     755              : 
     756              : /*
     757              :  * Disconnect connection to primary, if any.
     758              :  */
     759              : static void
     760          905 : libpqrcv_disconnect(WalReceiverConn *conn)
     761              : {
     762          905 :     libpqsrv_disconnect(conn->streamConn);
     763          905 :     PQfreemem(conn->recvBuf);
     764          905 :     pfree(conn);
     765          905 : }
     766              : 
     767              : /*
     768              :  * Receive a message available from XLOG stream.
     769              :  *
     770              :  * Returns:
     771              :  *
     772              :  *   If data was received, returns the length of the data. *buffer is set to
     773              :  *   point to a buffer holding the received message. The buffer is only valid
     774              :  *   until the next libpqrcv_* call.
     775              :  *
     776              :  *   If no data was available immediately, returns 0, and *wait_fd is set to a
     777              :  *   socket descriptor which can be waited on before trying again.
     778              :  *
     779              :  *   -1 if the server ended the COPY.
     780              :  *
     781              :  * ereports on error.
     782              :  */
     783              : static int
     784       423185 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
     785              :                  pgsocket *wait_fd)
     786              : {
     787              :     int         rawlen;
     788              : 
     789       423185 :     PQfreemem(conn->recvBuf);
     790       423185 :     conn->recvBuf = NULL;
     791              : 
     792              :     /* Try to receive a CopyData message */
     793       423185 :     rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
     794       423185 :     if (rawlen == 0)
     795              :     {
     796              :         /* Try consuming some data. */
     797       220009 :         if (PQconsumeInput(conn->streamConn) == 0)
     798           47 :             ereport(ERROR,
     799              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     800              :                      errmsg("could not receive data from WAL stream: %s",
     801              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     802              : 
     803              :         /* Now that we've consumed some input, try again */
     804       219962 :         rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
     805       219962 :         if (rawlen == 0)
     806              :         {
     807              :             /* Tell caller to try again when our socket is ready. */
     808        84290 :             *wait_fd = PQsocket(conn->streamConn);
     809        84290 :             return 0;
     810              :         }
     811              :     }
     812       338848 :     if (rawlen == -1)           /* end-of-streaming or error */
     813              :     {
     814              :         PGresult   *res;
     815              : 
     816          272 :         res = libpqsrv_get_result(conn->streamConn,
     817              :                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     818          272 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     819              :         {
     820          255 :             PQclear(res);
     821              : 
     822              :             /* Verify that there are no more results. */
     823          255 :             res = libpqsrv_get_result(conn->streamConn,
     824              :                                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     825          255 :             if (res != NULL)
     826              :             {
     827           43 :                 PQclear(res);
     828              : 
     829              :                 /*
     830              :                  * If the other side closed the connection orderly (otherwise
     831              :                  * we'd seen an error, or PGRES_COPY_IN) don't report an error
     832              :                  * here, but let callers deal with it.
     833              :                  */
     834           43 :                 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
     835           43 :                     return -1;
     836              : 
     837            0 :                 ereport(ERROR,
     838              :                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
     839              :                          errmsg("unexpected result after CommandComplete: %s",
     840              :                                 PQerrorMessage(conn->streamConn))));
     841              :             }
     842              : 
     843          212 :             return -1;
     844              :         }
     845           17 :         else if (PQresultStatus(res) == PGRES_COPY_IN)
     846              :         {
     847           12 :             PQclear(res);
     848           12 :             return -1;
     849              :         }
     850              :         else
     851            5 :             ereport(ERROR,
     852              :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     853              :                      errmsg("could not receive data from WAL stream: %s",
     854              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     855              :     }
     856       338576 :     if (rawlen < -1)
     857            0 :         ereport(ERROR,
     858              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     859              :                  errmsg("could not receive data from WAL stream: %s",
     860              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     861              : 
     862              :     /* Return received messages to caller */
     863       338576 :     *buffer = conn->recvBuf;
     864       338576 :     return rawlen;
     865              : }
     866              : 
     867              : /*
     868              :  * Send a message to XLOG stream.
     869              :  *
     870              :  * ereports on error.
     871              :  */
     872              : static void
     873       123541 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
     874              : {
     875       247081 :     if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
     876       123540 :         PQflush(conn->streamConn))
     877            2 :         ereport(ERROR,
     878              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     879              :                  errmsg("could not send data to WAL stream: %s",
     880              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     881       123539 : }
     882              : 
     883              : /*
     884              :  * Create new replication slot.
     885              :  * Returns the name of the exported snapshot for logical slot or NULL for
     886              :  * physical slot.
     887              :  */
     888              : static char *
     889          336 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
     890              :                      bool temporary, bool two_phase, bool failover,
     891              :                      CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
     892              : {
     893              :     PGresult   *res;
     894              :     StringInfoData cmd;
     895              :     char       *snapshot;
     896              :     int         use_new_options_syntax;
     897              : 
     898          336 :     use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
     899              : 
     900          336 :     initStringInfo(&cmd);
     901              : 
     902          336 :     appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
     903              : 
     904          336 :     if (temporary)
     905            0 :         appendStringInfoString(&cmd, " TEMPORARY");
     906              : 
     907          336 :     if (conn->logical)
     908              :     {
     909          336 :         appendStringInfoString(&cmd, " LOGICAL pgoutput ");
     910          336 :         if (use_new_options_syntax)
     911          336 :             appendStringInfoChar(&cmd, '(');
     912          336 :         if (two_phase)
     913              :         {
     914            1 :             appendStringInfoString(&cmd, "TWO_PHASE");
     915            1 :             if (use_new_options_syntax)
     916            1 :                 appendStringInfoString(&cmd, ", ");
     917              :             else
     918            0 :                 appendStringInfoChar(&cmd, ' ');
     919              :         }
     920              : 
     921          336 :         if (failover)
     922              :         {
     923           11 :             appendStringInfoString(&cmd, "FAILOVER");
     924           11 :             if (use_new_options_syntax)
     925           11 :                 appendStringInfoString(&cmd, ", ");
     926              :             else
     927            0 :                 appendStringInfoChar(&cmd, ' ');
     928              :         }
     929              : 
     930          336 :         if (use_new_options_syntax)
     931              :         {
     932          336 :             switch (snapshot_action)
     933              :             {
     934            0 :                 case CRS_EXPORT_SNAPSHOT:
     935            0 :                     appendStringInfoString(&cmd, "SNAPSHOT 'export'");
     936            0 :                     break;
     937          120 :                 case CRS_NOEXPORT_SNAPSHOT:
     938          120 :                     appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
     939          120 :                     break;
     940          216 :                 case CRS_USE_SNAPSHOT:
     941          216 :                     appendStringInfoString(&cmd, "SNAPSHOT 'use'");
     942          216 :                     break;
     943              :             }
     944              :         }
     945              :         else
     946              :         {
     947            0 :             switch (snapshot_action)
     948              :             {
     949            0 :                 case CRS_EXPORT_SNAPSHOT:
     950            0 :                     appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
     951            0 :                     break;
     952            0 :                 case CRS_NOEXPORT_SNAPSHOT:
     953            0 :                     appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
     954            0 :                     break;
     955            0 :                 case CRS_USE_SNAPSHOT:
     956            0 :                     appendStringInfoString(&cmd, "USE_SNAPSHOT");
     957            0 :                     break;
     958              :             }
     959              :         }
     960              : 
     961          336 :         if (use_new_options_syntax)
     962          336 :             appendStringInfoChar(&cmd, ')');
     963              :     }
     964              :     else
     965              :     {
     966            0 :         if (use_new_options_syntax)
     967            0 :             appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
     968              :         else
     969            0 :             appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
     970              :     }
     971              : 
     972          336 :     res = libpqsrv_exec(conn->streamConn,
     973          336 :                         cmd.data,
     974              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     975          336 :     pfree(cmd.data);
     976              : 
     977          336 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     978            0 :         ereport(ERROR,
     979              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     980              :                  errmsg("could not create replication slot \"%s\": %s",
     981              :                         slotname, pchomp(PQerrorMessage(conn->streamConn)))));
     982              : 
     983          336 :     if (lsn)
     984          216 :         *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
     985          216 :                                                    CStringGetDatum(PQgetvalue(res, 0, 1))));
     986              : 
     987          336 :     if (!PQgetisnull(res, 0, 2))
     988            0 :         snapshot = pstrdup(PQgetvalue(res, 0, 2));
     989              :     else
     990          336 :         snapshot = NULL;
     991              : 
     992          336 :     PQclear(res);
     993              : 
     994          336 :     return snapshot;
     995              : }
     996              : 
     997              : /*
     998              :  * Change the definition of the replication slot.
     999              :  */
    1000              : static void
    1001            5 : libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
    1002              :                     const bool *failover, const bool *two_phase)
    1003              : {
    1004              :     StringInfoData cmd;
    1005              :     PGresult   *res;
    1006              : 
    1007            5 :     initStringInfo(&cmd);
    1008            5 :     appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
    1009              :                      quote_identifier(slotname));
    1010              : 
    1011            5 :     if (failover)
    1012            4 :         appendStringInfo(&cmd, "FAILOVER %s",
    1013            4 :                          *failover ? "true" : "false");
    1014              : 
    1015            5 :     if (failover && two_phase)
    1016            0 :         appendStringInfoString(&cmd, ", ");
    1017              : 
    1018            5 :     if (two_phase)
    1019            1 :         appendStringInfo(&cmd, "TWO_PHASE %s",
    1020            1 :                          *two_phase ? "true" : "false");
    1021              : 
    1022            5 :     appendStringInfoString(&cmd, " );");
    1023              : 
    1024            5 :     res = libpqsrv_exec(conn->streamConn, cmd.data,
    1025              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    1026            5 :     pfree(cmd.data);
    1027              : 
    1028            5 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1029            0 :         ereport(ERROR,
    1030              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1031              :                  errmsg("could not alter replication slot \"%s\": %s",
    1032              :                         slotname, pchomp(PQerrorMessage(conn->streamConn)))));
    1033              : 
    1034            5 :     PQclear(res);
    1035            5 : }
    1036              : 
    1037              : /*
    1038              :  * Return PID of remote backend process.
    1039              :  */
    1040              : static pid_t
    1041            0 : libpqrcv_get_backend_pid(WalReceiverConn *conn)
    1042              : {
    1043            0 :     return PQbackendPID(conn->streamConn);
    1044              : }
    1045              : 
    1046              : /*
    1047              :  * Convert tuple query result to tuplestore.
    1048              :  */
    1049              : static void
    1050         1261 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
    1051              :                        const int nRetTypes, const Oid *retTypes)
    1052              : {
    1053              :     int         tupn;
    1054              :     int         coln;
    1055         1261 :     int         nfields = PQnfields(pgres);
    1056              :     HeapTuple   tuple;
    1057              :     AttInMetadata *attinmeta;
    1058              :     MemoryContext rowcontext;
    1059              :     MemoryContext oldcontext;
    1060              : 
    1061              :     /* Make sure we got expected number of fields. */
    1062         1261 :     if (nfields != nRetTypes)
    1063            0 :         ereport(ERROR,
    1064              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1065              :                  errmsg("invalid query response"),
    1066              :                  errdetail("Expected %d fields, got %d fields.",
    1067              :                            nRetTypes, nfields)));
    1068              : 
    1069         1261 :     walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1070              : 
    1071              :     /* Create tuple descriptor corresponding to expected result. */
    1072         1261 :     walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
    1073         4748 :     for (coln = 0; coln < nRetTypes; coln++)
    1074         3487 :         TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
    1075         3487 :                            PQfname(pgres, coln), retTypes[coln], -1, 0);
    1076         1261 :     TupleDescFinalize(walres->tupledesc);
    1077         1261 :     attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
    1078              : 
    1079              :     /* No point in doing more here if there were no tuples returned. */
    1080         1261 :     if (PQntuples(pgres) == 0)
    1081           35 :         return;
    1082              : 
    1083              :     /* Create temporary context for local allocations. */
    1084         1226 :     rowcontext = AllocSetContextCreate(CurrentMemoryContext,
    1085              :                                        "libpqrcv query result context",
    1086              :                                        ALLOCSET_DEFAULT_SIZES);
    1087              : 
    1088              :     /* Process returned rows. */
    1089         2839 :     for (tupn = 0; tupn < PQntuples(pgres); tupn++)
    1090              :     {
    1091              :         char       *cstrs[MaxTupleAttributeNumber];
    1092              : 
    1093         1613 :         CHECK_FOR_INTERRUPTS();
    1094              : 
    1095              :         /* Do the allocations in temporary context. */
    1096         1613 :         oldcontext = MemoryContextSwitchTo(rowcontext);
    1097              : 
    1098              :         /*
    1099              :          * Fill cstrs with null-terminated strings of column values.
    1100              :          */
    1101         6804 :         for (coln = 0; coln < nfields; coln++)
    1102              :         {
    1103         5191 :             if (PQgetisnull(pgres, tupn, coln))
    1104          706 :                 cstrs[coln] = NULL;
    1105              :             else
    1106         4485 :                 cstrs[coln] = PQgetvalue(pgres, tupn, coln);
    1107              :         }
    1108              : 
    1109              :         /* Convert row to a tuple, and add it to the tuplestore */
    1110         1613 :         tuple = BuildTupleFromCStrings(attinmeta, cstrs);
    1111         1613 :         tuplestore_puttuple(walres->tuplestore, tuple);
    1112              : 
    1113              :         /* Clean up */
    1114         1613 :         MemoryContextSwitchTo(oldcontext);
    1115         1613 :         MemoryContextReset(rowcontext);
    1116              :     }
    1117              : 
    1118         1226 :     MemoryContextDelete(rowcontext);
    1119              : }
    1120              : 
    1121              : /*
    1122              :  * Public interface for sending generic queries (and commands).
    1123              :  *
    1124              :  * This can only be called from process connected to database.
    1125              :  */
    1126              : static WalRcvExecResult *
    1127         2186 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
    1128              :               const int nRetTypes, const Oid *retTypes)
    1129              : {
    1130         2186 :     PGresult   *pgres = NULL;
    1131         2186 :     WalRcvExecResult *walres = palloc0_object(WalRcvExecResult);
    1132              :     char       *diag_sqlstate;
    1133              : 
    1134         2186 :     if (MyDatabaseId == InvalidOid)
    1135            0 :         ereport(ERROR,
    1136              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1137              :                  errmsg("the query interface requires a database connection")));
    1138              : 
    1139         2186 :     pgres = libpqsrv_exec(conn->streamConn,
    1140              :                           query,
    1141              :                           WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    1142              : 
    1143         2186 :     switch (PQresultStatus(pgres))
    1144              :     {
    1145         1261 :         case PGRES_TUPLES_OK:
    1146              :         case PGRES_SINGLE_TUPLE:
    1147              :         case PGRES_TUPLES_CHUNK:
    1148         1261 :             walres->status = WALRCV_OK_TUPLES;
    1149         1261 :             libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
    1150         1261 :             break;
    1151              : 
    1152            0 :         case PGRES_COPY_IN:
    1153            0 :             walres->status = WALRCV_OK_COPY_IN;
    1154            0 :             break;
    1155              : 
    1156          214 :         case PGRES_COPY_OUT:
    1157          214 :             walres->status = WALRCV_OK_COPY_OUT;
    1158          214 :             break;
    1159              : 
    1160            0 :         case PGRES_COPY_BOTH:
    1161            0 :             walres->status = WALRCV_OK_COPY_BOTH;
    1162            0 :             break;
    1163              : 
    1164          708 :         case PGRES_COMMAND_OK:
    1165          708 :             walres->status = WALRCV_OK_COMMAND;
    1166          708 :             break;
    1167              : 
    1168              :             /* Empty query is considered error. */
    1169            0 :         case PGRES_EMPTY_QUERY:
    1170            0 :             walres->status = WALRCV_ERROR;
    1171            0 :             walres->err = _("empty query");
    1172            0 :             break;
    1173              : 
    1174            0 :         case PGRES_PIPELINE_SYNC:
    1175              :         case PGRES_PIPELINE_ABORTED:
    1176            0 :             walres->status = WALRCV_ERROR;
    1177            0 :             walres->err = _("unexpected pipeline mode");
    1178            0 :             break;
    1179              : 
    1180            3 :         case PGRES_NONFATAL_ERROR:
    1181              :         case PGRES_FATAL_ERROR:
    1182              :         case PGRES_BAD_RESPONSE:
    1183            3 :             walres->status = WALRCV_ERROR;
    1184            3 :             walres->err = pchomp(PQerrorMessage(conn->streamConn));
    1185            3 :             diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
    1186            3 :             if (diag_sqlstate)
    1187            1 :                 walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
    1188              :                                                  diag_sqlstate[1],
    1189              :                                                  diag_sqlstate[2],
    1190              :                                                  diag_sqlstate[3],
    1191              :                                                  diag_sqlstate[4]);
    1192            3 :             break;
    1193              :     }
    1194              : 
    1195         2186 :     PQclear(pgres);
    1196              : 
    1197         2186 :     return walres;
    1198              : }
    1199              : 
    1200              : /*
    1201              :  * Given a List of strings, return it as single comma separated
    1202              :  * string, quoting identifiers as needed.
    1203              :  *
    1204              :  * This is essentially the reverse of SplitIdentifierString.
    1205              :  *
    1206              :  * The caller should free the result.
    1207              :  */
    1208              : static char *
    1209          459 : stringlist_to_identifierstr(PGconn *conn, List *strings)
    1210              : {
    1211              :     ListCell   *lc;
    1212              :     StringInfoData res;
    1213          459 :     bool        first = true;
    1214              : 
    1215          459 :     initStringInfo(&res);
    1216              : 
    1217         1177 :     foreach(lc, strings)
    1218              :     {
    1219          718 :         char       *val = strVal(lfirst(lc));
    1220              :         char       *val_escaped;
    1221              : 
    1222          718 :         if (first)
    1223          459 :             first = false;
    1224              :         else
    1225          259 :             appendStringInfoChar(&res, ',');
    1226              : 
    1227          718 :         val_escaped = PQescapeIdentifier(conn, val, strlen(val));
    1228          718 :         if (!val_escaped)
    1229              :         {
    1230            0 :             free(res.data);
    1231            0 :             return NULL;
    1232              :         }
    1233          718 :         appendStringInfoString(&res, val_escaped);
    1234          718 :         PQfreemem(val_escaped);
    1235              :     }
    1236              : 
    1237          459 :     return res.data;
    1238              : }
        

Generated by: LCOV version 2.0-1