LCOV - code coverage report
Current view: top level - src/backend/replication/libpqwalreceiver - libpqwalreceiver.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 83.2 % 423 352
Test Date: 2026-02-17 17:20:33 Functions: 95.5 % 22 21
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * libpqwalreceiver.c
       4              :  *
       5              :  * This file contains the libpq-specific parts of walreceiver. It's
       6              :  * loaded as a dynamic module to avoid linking the main server binary with
       7              :  * libpq.
       8              :  *
       9              :  * Apart from walreceiver, the libpq-specific routines are now being used by
      10              :  * logical replication workers and slot synchronization.
      11              :  *
      12              :  * Portions Copyright (c) 2010-2026, PostgreSQL Global Development Group
      13              :  *
      14              :  *
      15              :  * IDENTIFICATION
      16              :  *    src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
      17              :  *
      18              :  *-------------------------------------------------------------------------
      19              :  */
      20              : #include "postgres.h"
      21              : 
      22              : #include <unistd.h>
      23              : #include <sys/time.h>
      24              : 
      25              : #include "common/connect.h"
      26              : #include "funcapi.h"
      27              : #include "libpq-fe.h"
      28              : #include "libpq/libpq-be-fe-helpers.h"
      29              : #include "mb/pg_wchar.h"
      30              : #include "miscadmin.h"
      31              : #include "pgstat.h"
      32              : #include "pqexpbuffer.h"
      33              : #include "replication/walreceiver.h"
      34              : #include "storage/latch.h"
      35              : #include "utils/builtins.h"
      36              : #include "utils/memutils.h"
      37              : #include "utils/pg_lsn.h"
      38              : #include "utils/tuplestore.h"
      39              : 
      40         1020 : PG_MODULE_MAGIC_EXT(
      41              :                     .name = "libpqwalreceiver",
      42              :                     .version = PG_VERSION
      43              : );
      44              : 
      45              : struct WalReceiverConn
      46              : {
      47              :     /* Current connection to the primary, if any */
      48              :     PGconn     *streamConn;
      49              :     /* Used to remember if the connection is logical or physical */
      50              :     bool        logical;
      51              :     /* Buffer for currently read records */
      52              :     char       *recvBuf;
      53              : };
      54              : 
      55              : /* Prototypes for interface functions */
      56              : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
      57              :                                          bool replication, bool logical,
      58              :                                          bool must_use_password,
      59              :                                          const char *appname, char **err);
      60              : static void libpqrcv_check_conninfo(const char *conninfo,
      61              :                                     bool must_use_password);
      62              : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
      63              : static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
      64              :                                     char **sender_host, int *sender_port);
      65              : static char *libpqrcv_identify_system(WalReceiverConn *conn,
      66              :                                       TimeLineID *primary_tli);
      67              : static char *libpqrcv_get_dbname_from_conninfo(const char *connInfo);
      68              : static char *libpqrcv_get_option_from_conninfo(const char *connInfo,
      69              :                                                const char *keyword);
      70              : static int  libpqrcv_server_version(WalReceiverConn *conn);
      71              : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
      72              :                                              TimeLineID tli, char **filename,
      73              :                                              char **content, int *len);
      74              : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
      75              :                                     const WalRcvStreamOptions *options);
      76              : static void libpqrcv_endstreaming(WalReceiverConn *conn,
      77              :                                   TimeLineID *next_tli);
      78              : static int  libpqrcv_receive(WalReceiverConn *conn, char **buffer,
      79              :                              pgsocket *wait_fd);
      80              : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
      81              :                           int nbytes);
      82              : static char *libpqrcv_create_slot(WalReceiverConn *conn,
      83              :                                   const char *slotname,
      84              :                                   bool temporary,
      85              :                                   bool two_phase,
      86              :                                   bool failover,
      87              :                                   CRSSnapshotAction snapshot_action,
      88              :                                   XLogRecPtr *lsn);
      89              : static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
      90              :                                 const bool *failover, const bool *two_phase);
      91              : static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
      92              : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
      93              :                                        const char *query,
      94              :                                        const int nRetTypes,
      95              :                                        const Oid *retTypes);
      96              : static void libpqrcv_disconnect(WalReceiverConn *conn);
      97              : 
      98              : static WalReceiverFunctionsType PQWalReceiverFunctions = {
      99              :     .walrcv_connect = libpqrcv_connect,
     100              :     .walrcv_check_conninfo = libpqrcv_check_conninfo,
     101              :     .walrcv_get_conninfo = libpqrcv_get_conninfo,
     102              :     .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
     103              :     .walrcv_identify_system = libpqrcv_identify_system,
     104              :     .walrcv_server_version = libpqrcv_server_version,
     105              :     .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
     106              :     .walrcv_startstreaming = libpqrcv_startstreaming,
     107              :     .walrcv_endstreaming = libpqrcv_endstreaming,
     108              :     .walrcv_receive = libpqrcv_receive,
     109              :     .walrcv_send = libpqrcv_send,
     110              :     .walrcv_create_slot = libpqrcv_create_slot,
     111              :     .walrcv_alter_slot = libpqrcv_alter_slot,
     112              :     .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
     113              :     .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
     114              :     .walrcv_exec = libpqrcv_exec,
     115              :     .walrcv_disconnect = libpqrcv_disconnect
     116              : };
     117              : 
     118              : /* Prototypes for private functions */
     119              : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
     120              : 
     121              : /*
     122              :  * Module initialization function
     123              :  */
     124              : void
     125         1020 : _PG_init(void)
     126              : {
     127         1020 :     if (WalReceiverFunctions != NULL)
     128            0 :         elog(ERROR, "libpqwalreceiver already loaded");
     129         1020 :     WalReceiverFunctions = &PQWalReceiverFunctions;
     130         1020 : }
     131              : 
     132              : /*
     133              :  * Establish the connection to the primary server.
     134              :  *
     135              :  * This function can be used for both replication and regular connections.
     136              :  * If it is a replication connection, it could be either logical or physical
     137              :  * based on input argument 'logical'.
     138              :  *
     139              :  * If an error occurs, this function will normally return NULL and set *err
     140              :  * to a palloc'ed error message. However, if must_use_password is true and
     141              :  * the connection fails to use the password, this function will ereport(ERROR).
     142              :  * We do this because in that case the error includes a detail and a hint for
     143              :  * consistency with other parts of the system, and it's not worth adding the
     144              :  * machinery to pass all of those back to the caller just to cover this one
     145              :  * case.
     146              :  */
     147              : static WalReceiverConn *
     148          961 : libpqrcv_connect(const char *conninfo, bool replication, bool logical,
     149              :                  bool must_use_password, const char *appname, char **err)
     150              : {
     151              :     WalReceiverConn *conn;
     152              :     const char *keys[6];
     153              :     const char *vals[6];
     154          961 :     int         i = 0;
     155          961 :     char       *options_val = NULL;
     156              : 
     157              :     /*
     158              :      * Re-validate connection string. The validation already happened at DDL
     159              :      * time, but the subscription owner may have changed. If we don't recheck
     160              :      * with the correct must_use_password, it's possible that the connection
     161              :      * will obtain the password from a different source, such as PGPASSFILE or
     162              :      * PGPASSWORD.
     163              :      */
     164          961 :     libpqrcv_check_conninfo(conninfo, must_use_password);
     165              : 
     166              :     /*
     167              :      * We use the expand_dbname parameter to process the connection string (or
     168              :      * URI), and pass some extra options.
     169              :      */
     170          953 :     keys[i] = "dbname";
     171          953 :     vals[i] = conninfo;
     172              : 
     173              :     /* We can not have logical without replication */
     174              :     Assert(replication || !logical);
     175              : 
     176          953 :     if (replication)
     177              :     {
     178          940 :         keys[++i] = "replication";
     179          940 :         vals[i] = logical ? "database" : "true";
     180              : 
     181          940 :         if (logical)
     182              :         {
     183          706 :             char       *opt = NULL;
     184              : 
     185              :             /* Tell the publisher to translate to our encoding */
     186          706 :             keys[++i] = "client_encoding";
     187          706 :             vals[i] = GetDatabaseEncodingName();
     188              : 
     189              :             /*
     190              :              * Force assorted GUC parameters to settings that ensure that the
     191              :              * publisher will output data values in a form that is unambiguous
     192              :              * to the subscriber.  (We don't want to modify the subscriber's
     193              :              * GUC settings, since that might surprise user-defined code
     194              :              * running in the subscriber, such as triggers.)  This should
     195              :              * match what pg_dump does.
     196              :              */
     197          706 :             opt = libpqrcv_get_option_from_conninfo(conninfo, "options");
     198          706 :             options_val = psprintf("%s -c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3",
     199              :                                    (opt == NULL) ? "" : opt);
     200          706 :             keys[++i] = "options";
     201          706 :             vals[i] = options_val;
     202          706 :             if (opt != NULL)
     203            9 :                 pfree(opt);
     204              :         }
     205              :         else
     206              :         {
     207              :             /*
     208              :              * The database name is ignored by the server in replication mode,
     209              :              * but specify "replication" for .pgpass lookup.
     210              :              */
     211          234 :             keys[++i] = "dbname";
     212          234 :             vals[i] = "replication";
     213              :         }
     214              :     }
     215              : 
     216          953 :     keys[++i] = "fallback_application_name";
     217          953 :     vals[i] = appname;
     218              : 
     219          953 :     keys[++i] = NULL;
     220          953 :     vals[i] = NULL;
     221              : 
     222              :     Assert(i < lengthof(keys));
     223              : 
     224          953 :     conn = palloc0_object(WalReceiverConn);
     225          952 :     conn->streamConn =
     226          953 :         libpqsrv_connect_params(keys, vals,
     227              :                                  /* expand_dbname = */ true,
     228              :                                 WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
     229              : 
     230          952 :     if (options_val != NULL)
     231          705 :         pfree(options_val);
     232              : 
     233          952 :     if (PQstatus(conn->streamConn) != CONNECTION_OK)
     234          122 :         goto bad_connection_errmsg;
     235              : 
     236          830 :     if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
     237              :     {
     238            0 :         libpqsrv_disconnect(conn->streamConn);
     239            0 :         pfree(conn);
     240              : 
     241            0 :         ereport(ERROR,
     242              :                 (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     243              :                  errmsg("password is required"),
     244              :                  errdetail("Non-superuser cannot connect if the server does not request a password."),
     245              :                  errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
     246              :     }
     247              : 
     248          830 :     PQsetNoticeReceiver(conn->streamConn, libpqsrv_notice_receiver,
     249              :                         "received message via replication");
     250              : 
     251              :     /*
     252              :      * Set always-secure search path for the cases where the connection is
     253              :      * used to run SQL queries, so malicious users can't get control.
     254              :      */
     255          830 :     if (!replication || logical)
     256              :     {
     257              :         PGresult   *res;
     258              : 
     259          690 :         res = libpqsrv_exec(conn->streamConn,
     260              :                             ALWAYS_SECURE_SEARCH_PATH_SQL,
     261              :                             WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     262          689 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     263              :         {
     264            0 :             PQclear(res);
     265            0 :             *err = psprintf(_("could not clear search path: %s"),
     266            0 :                             pchomp(PQerrorMessage(conn->streamConn)));
     267            0 :             goto bad_connection;
     268              :         }
     269          689 :         PQclear(res);
     270              :     }
     271              : 
     272          829 :     conn->logical = logical;
     273              : 
     274          829 :     return conn;
     275              : 
     276              :     /* error path, using libpq's error message */
     277          122 : bad_connection_errmsg:
     278          122 :     *err = pchomp(PQerrorMessage(conn->streamConn));
     279              : 
     280              :     /* error path, error already set */
     281          122 : bad_connection:
     282          122 :     libpqsrv_disconnect(conn->streamConn);
     283          122 :     pfree(conn);
     284          122 :     return NULL;
     285              : }
     286              : 
     287              : /*
     288              :  * Validate connection info string.
     289              :  *
     290              :  * If the connection string can't be parsed, this function will raise
     291              :  * an error. If must_use_password is true, the function raises an error
     292              :  * if no password is provided in the connection string. In any other case
     293              :  * it successfully completes.
     294              :  */
     295              : static void
     296         1153 : libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
     297              : {
     298         1153 :     PQconninfoOption *opts = NULL;
     299              :     PQconninfoOption *opt;
     300         1153 :     char       *err = NULL;
     301              : 
     302         1153 :     opts = PQconninfoParse(conninfo, &err);
     303         1153 :     if (opts == NULL)
     304              :     {
     305              :         /* The error string is malloc'd, so we must free it explicitly */
     306            9 :         char       *errcopy = err ? pstrdup(err) : "out of memory";
     307              : 
     308            9 :         PQfreemem(err);
     309            9 :         ereport(ERROR,
     310              :                 (errcode(ERRCODE_SYNTAX_ERROR),
     311              :                  errmsg("invalid connection string syntax: %s", errcopy)));
     312              :     }
     313              : 
     314         1144 :     if (must_use_password)
     315              :     {
     316           16 :         bool        uses_password = false;
     317              : 
     318          592 :         for (opt = opts; opt->keyword != NULL; ++opt)
     319              :         {
     320              :             /* Ignore connection options that are not present. */
     321          581 :             if (opt->val == NULL)
     322          545 :                 continue;
     323              : 
     324           36 :             if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
     325              :             {
     326            5 :                 uses_password = true;
     327            5 :                 break;
     328              :             }
     329              :         }
     330              : 
     331           16 :         if (!uses_password)
     332              :         {
     333              :             /* malloc'd, so we must free it explicitly */
     334           11 :             PQconninfoFree(opts);
     335              : 
     336           11 :             ereport(ERROR,
     337              :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     338              :                      errmsg("password is required"),
     339              :                      errdetail("Non-superusers must provide a password in the connection string.")));
     340              :         }
     341              :     }
     342              : 
     343         1133 :     PQconninfoFree(opts);
     344         1133 : }
     345              : 
     346              : /*
     347              :  * Return a user-displayable conninfo string.  Any security-sensitive fields
     348              :  * are obfuscated.
     349              :  */
     350              : static char *
     351          140 : libpqrcv_get_conninfo(WalReceiverConn *conn)
     352              : {
     353              :     PQconninfoOption *conn_opts;
     354              :     PQconninfoOption *conn_opt;
     355              :     PQExpBufferData buf;
     356              :     char       *retval;
     357              : 
     358              :     Assert(conn->streamConn != NULL);
     359              : 
     360          140 :     initPQExpBuffer(&buf);
     361          140 :     conn_opts = PQconninfo(conn->streamConn);
     362              : 
     363          140 :     if (conn_opts == NULL)
     364            0 :         ereport(ERROR,
     365              :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     366              :                  errmsg("could not parse connection string: %s",
     367              :                         _("out of memory"))));
     368              : 
     369              :     /* build a clean connection string from pieces */
     370         7280 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     371              :     {
     372              :         bool        obfuscate;
     373              : 
     374              :         /* Skip debug and empty options */
     375         7140 :         if (strchr(conn_opt->dispchar, 'D') ||
     376         6580 :             conn_opt->val == NULL ||
     377         2672 :             conn_opt->val[0] == '\0')
     378         4608 :             continue;
     379              : 
     380              :         /* Obfuscate security-sensitive options */
     381         2532 :         obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
     382              : 
     383         5064 :         appendPQExpBuffer(&buf, "%s%s=%s",
     384         2532 :                           buf.len == 0 ? "" : " ",
     385              :                           conn_opt->keyword,
     386              :                           obfuscate ? "********" : conn_opt->val);
     387              :     }
     388              : 
     389          140 :     PQconninfoFree(conn_opts);
     390              : 
     391          140 :     retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
     392          140 :     termPQExpBuffer(&buf);
     393          140 :     return retval;
     394              : }
     395              : 
     396              : /*
     397              :  * Provides information of sender this WAL receiver is connected to.
     398              :  */
     399              : static void
     400          140 : libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
     401              :                         int *sender_port)
     402              : {
     403          140 :     char       *ret = NULL;
     404              : 
     405          140 :     *sender_host = NULL;
     406          140 :     *sender_port = 0;
     407              : 
     408              :     Assert(conn->streamConn != NULL);
     409              : 
     410          140 :     ret = PQhost(conn->streamConn);
     411          140 :     if (ret && strlen(ret) != 0)
     412          140 :         *sender_host = pstrdup(ret);
     413              : 
     414          140 :     ret = PQport(conn->streamConn);
     415          140 :     if (ret && strlen(ret) != 0)
     416          140 :         *sender_port = atoi(ret);
     417          140 : }
     418              : 
     419              : /*
     420              :  * Check that primary's system identifier matches ours, and fetch the current
     421              :  * timeline ID of the primary.
     422              :  */
     423              : static char *
     424          376 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
     425              : {
     426              :     PGresult   *res;
     427              :     char       *primary_sysid;
     428              : 
     429              :     /*
     430              :      * Get the system identifier and timeline ID as a DataRow message from the
     431              :      * primary server.
     432              :      */
     433          376 :     res = libpqsrv_exec(conn->streamConn,
     434              :                         "IDENTIFY_SYSTEM",
     435              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     436          376 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     437            0 :         ereport(ERROR,
     438              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     439              :                  errmsg("could not receive database system identifier and timeline ID from "
     440              :                         "the primary server: %s",
     441              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     442              : 
     443              :     /*
     444              :      * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
     445              :      * 9.4 and onwards.
     446              :      */
     447          376 :     if (PQnfields(res) < 3 || PQntuples(res) != 1)
     448            0 :         ereport(ERROR,
     449              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     450              :                  errmsg("invalid response from primary server"),
     451              :                  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
     452              :                            PQntuples(res), PQnfields(res), 1, 3)));
     453          376 :     primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
     454          376 :     *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
     455          376 :     PQclear(res);
     456              : 
     457          376 :     return primary_sysid;
     458              : }
     459              : 
     460              : /*
     461              :  * Thin wrapper around libpq to obtain server version.
     462              :  */
     463              : static int
     464          983 : libpqrcv_server_version(WalReceiverConn *conn)
     465              : {
     466          983 :     return PQserverVersion(conn->streamConn);
     467              : }
     468              : 
     469              : /*
     470              :  * Get database name from the primary server's conninfo.
     471              :  *
     472              :  * If dbname is not found in connInfo, return NULL value.
     473              :  */
     474              : static char *
     475           14 : libpqrcv_get_dbname_from_conninfo(const char *connInfo)
     476              : {
     477           14 :     return libpqrcv_get_option_from_conninfo(connInfo, "dbname");
     478              : }
     479              : 
     480              : /*
     481              :  * Get the value of the option with the given keyword from the primary
     482              :  * server's conninfo.
     483              :  *
     484              :  * If the option is not found in connInfo, return NULL value.
     485              :  */
     486              : static char *
     487          720 : libpqrcv_get_option_from_conninfo(const char *connInfo, const char *keyword)
     488              : {
     489              :     PQconninfoOption *opts;
     490          720 :     char       *option = NULL;
     491          720 :     char       *err = NULL;
     492              : 
     493          720 :     opts = PQconninfoParse(connInfo, &err);
     494          720 :     if (opts == NULL)
     495              :     {
     496              :         /* The error string is malloc'd, so we must free it explicitly */
     497            0 :         char       *errcopy = err ? pstrdup(err) : "out of memory";
     498              : 
     499            0 :         PQfreemem(err);
     500            0 :         ereport(ERROR,
     501              :                 (errcode(ERRCODE_SYNTAX_ERROR),
     502              :                  errmsg("invalid connection string syntax: %s", errcopy)));
     503              :     }
     504              : 
     505        37440 :     for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
     506              :     {
     507              :         /*
     508              :          * If the same option appears multiple times, then the last one will
     509              :          * be returned
     510              :          */
     511        36720 :         if (strcmp(opt->keyword, keyword) == 0 && opt->val &&
     512           22 :             *opt->val)
     513              :         {
     514           22 :             if (option)
     515            0 :                 pfree(option);
     516              : 
     517           22 :             option = pstrdup(opt->val);
     518              :         }
     519              :     }
     520              : 
     521          720 :     PQconninfoFree(opts);
     522          720 :     return option;
     523              : }
     524              : 
     525              : /*
     526              :  * Start streaming WAL data from given streaming options.
     527              :  *
     528              :  * Returns true if we switched successfully to copy-both mode. False
     529              :  * means the server received the command and executed it successfully, but
     530              :  * didn't switch to copy-mode.  That means that there was no WAL on the
     531              :  * requested timeline and starting point, because the server switched to
     532              :  * another timeline at or before the requested starting point. On failure,
     533              :  * throws an ERROR.
     534              :  */
     535              : static bool
     536          562 : libpqrcv_startstreaming(WalReceiverConn *conn,
     537              :                         const WalRcvStreamOptions *options)
     538              : {
     539              :     StringInfoData cmd;
     540              :     PGresult   *res;
     541              : 
     542              :     Assert(options->logical == conn->logical);
     543              :     Assert(options->slotname || !options->logical);
     544              : 
     545          562 :     initStringInfo(&cmd);
     546              : 
     547              :     /* Build the command. */
     548          562 :     appendStringInfoString(&cmd, "START_REPLICATION");
     549          562 :     if (options->slotname != NULL)
     550          455 :         appendStringInfo(&cmd, " SLOT \"%s\"",
     551          455 :                          options->slotname);
     552              : 
     553          562 :     if (options->logical)
     554          409 :         appendStringInfoString(&cmd, " LOGICAL");
     555              : 
     556          562 :     appendStringInfo(&cmd, " %X/%08X", LSN_FORMAT_ARGS(options->startpoint));
     557              : 
     558              :     /*
     559              :      * Additional options are different depending on if we are doing logical
     560              :      * or physical replication.
     561              :      */
     562          562 :     if (options->logical)
     563              :     {
     564              :         char       *pubnames_str;
     565              :         List       *pubnames;
     566              :         char       *pubnames_literal;
     567              : 
     568          409 :         appendStringInfoString(&cmd, " (");
     569              : 
     570          409 :         appendStringInfo(&cmd, "proto_version '%u'",
     571          409 :                          options->proto.logical.proto_version);
     572              : 
     573          409 :         if (options->proto.logical.streaming_str)
     574          401 :             appendStringInfo(&cmd, ", streaming '%s'",
     575          401 :                              options->proto.logical.streaming_str);
     576              : 
     577          416 :         if (options->proto.logical.twophase &&
     578            7 :             PQserverVersion(conn->streamConn) >= 150000)
     579            7 :             appendStringInfoString(&cmd, ", two_phase 'on'");
     580              : 
     581          818 :         if (options->proto.logical.origin &&
     582          409 :             PQserverVersion(conn->streamConn) >= 160000)
     583          409 :             appendStringInfo(&cmd, ", origin '%s'",
     584          409 :                              options->proto.logical.origin);
     585              : 
     586          409 :         pubnames = options->proto.logical.publication_names;
     587          409 :         pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
     588          409 :         if (!pubnames_str)
     589            0 :             ereport(ERROR,
     590              :                     (errcode(ERRCODE_OUT_OF_MEMORY),    /* likely guess */
     591              :                      errmsg("could not start WAL streaming: %s",
     592              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     593          409 :         pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
     594              :                                            strlen(pubnames_str));
     595          409 :         if (!pubnames_literal)
     596            0 :             ereport(ERROR,
     597              :                     (errcode(ERRCODE_OUT_OF_MEMORY),    /* likely guess */
     598              :                      errmsg("could not start WAL streaming: %s",
     599              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     600          409 :         appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
     601          409 :         PQfreemem(pubnames_literal);
     602          409 :         pfree(pubnames_str);
     603              : 
     604          419 :         if (options->proto.logical.binary &&
     605           10 :             PQserverVersion(conn->streamConn) >= 140000)
     606           10 :             appendStringInfoString(&cmd, ", binary 'true'");
     607              : 
     608          409 :         appendStringInfoChar(&cmd, ')');
     609              :     }
     610              :     else
     611          153 :         appendStringInfo(&cmd, " TIMELINE %u",
     612          153 :                          options->proto.physical.startpointTLI);
     613              : 
     614              :     /* Start streaming. */
     615          562 :     res = libpqsrv_exec(conn->streamConn,
     616          562 :                         cmd.data,
     617              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     618          562 :     pfree(cmd.data);
     619              : 
     620          562 :     if (PQresultStatus(res) == PGRES_COMMAND_OK)
     621              :     {
     622            0 :         PQclear(res);
     623            0 :         return false;
     624              :     }
     625          562 :     else if (PQresultStatus(res) != PGRES_COPY_BOTH)
     626            1 :         ereport(ERROR,
     627              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     628              :                  errmsg("could not start WAL streaming: %s",
     629              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     630          561 :     PQclear(res);
     631          561 :     return true;
     632              : }
     633              : 
     634              : /*
     635              :  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
     636              :  * reported by the server, or 0 if it did not report it.
     637              :  */
     638              : static void
     639          238 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
     640              : {
     641              :     PGresult   *res;
     642              : 
     643              :     /*
     644              :      * Send copy-end message.  As in libpqsrv_exec, this could theoretically
     645              :      * block, but the risk seems small.
     646              :      */
     647          437 :     if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
     648          199 :         PQflush(conn->streamConn))
     649           39 :         ereport(ERROR,
     650              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     651              :                  errmsg("could not send end-of-streaming message to primary: %s",
     652              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     653              : 
     654          199 :     *next_tli = 0;
     655              : 
     656              :     /*
     657              :      * After COPY is finished, we should receive a result set indicating the
     658              :      * next timeline's ID, or just CommandComplete if the server was shut
     659              :      * down.
     660              :      *
     661              :      * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
     662              :      * also possible in case we aborted the copy in mid-stream.
     663              :      */
     664          199 :     res = libpqsrv_get_result(conn->streamConn,
     665              :                               WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     666          199 :     if (PQresultStatus(res) == PGRES_TUPLES_OK)
     667              :     {
     668              :         /*
     669              :          * Read the next timeline's ID. The server also sends the timeline's
     670              :          * starting point, but it is ignored.
     671              :          */
     672           13 :         if (PQnfields(res) < 2 || PQntuples(res) != 1)
     673            0 :             ereport(ERROR,
     674              :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     675              :                      errmsg("unexpected result set after end-of-streaming")));
     676           13 :         *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
     677           13 :         PQclear(res);
     678              : 
     679              :         /* the result set should be followed by CommandComplete */
     680           13 :         res = libpqsrv_get_result(conn->streamConn,
     681              :                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     682              :     }
     683          186 :     else if (PQresultStatus(res) == PGRES_COPY_OUT)
     684              :     {
     685          186 :         PQclear(res);
     686              : 
     687              :         /* End the copy */
     688          186 :         if (PQendcopy(conn->streamConn))
     689            0 :             ereport(ERROR,
     690              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     691              :                      errmsg("error while shutting down streaming COPY: %s",
     692              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     693              : 
     694              :         /* CommandComplete should follow */
     695          186 :         res = libpqsrv_get_result(conn->streamConn,
     696              :                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     697              :     }
     698              : 
     699          199 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     700            0 :         ereport(ERROR,
     701              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     702              :                  errmsg("error reading result of streaming command: %s",
     703              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     704          199 :     PQclear(res);
     705              : 
     706              :     /* Verify that there are no more results */
     707          199 :     res = libpqsrv_get_result(conn->streamConn,
     708              :                               WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     709          199 :     if (res != NULL)
     710            0 :         ereport(ERROR,
     711              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     712              :                  errmsg("unexpected result after CommandComplete: %s",
     713              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     714          199 : }
     715              : 
     716              : /*
     717              :  * Fetch the timeline history file for 'tli' from primary.
     718              :  */
     719              : static void
     720           12 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
     721              :                                  TimeLineID tli, char **filename,
     722              :                                  char **content, int *len)
     723              : {
     724              :     PGresult   *res;
     725              :     char        cmd[64];
     726              : 
     727              :     Assert(!conn->logical);
     728              : 
     729              :     /*
     730              :      * Request the primary to send over the history file for given timeline.
     731              :      */
     732           12 :     snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
     733           12 :     res = libpqsrv_exec(conn->streamConn,
     734              :                         cmd,
     735              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     736           12 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     737            0 :         ereport(ERROR,
     738              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     739              :                  errmsg("could not receive timeline history file from "
     740              :                         "the primary server: %s",
     741              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     742           12 :     if (PQnfields(res) != 2 || PQntuples(res) != 1)
     743            0 :         ereport(ERROR,
     744              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     745              :                  errmsg("invalid response from primary server"),
     746              :                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
     747              :                            PQntuples(res), PQnfields(res))));
     748           12 :     *filename = pstrdup(PQgetvalue(res, 0, 0));
     749              : 
     750           12 :     *len = PQgetlength(res, 0, 1);
     751           12 :     *content = palloc(*len);
     752           12 :     memcpy(*content, PQgetvalue(res, 0, 1), *len);
     753           12 :     PQclear(res);
     754           12 : }
     755              : 
     756              : /*
     757              :  * Disconnect connection to primary, if any.
     758              :  */
     759              : static void
     760          829 : libpqrcv_disconnect(WalReceiverConn *conn)
     761              : {
     762          829 :     libpqsrv_disconnect(conn->streamConn);
     763          829 :     PQfreemem(conn->recvBuf);
     764          829 :     pfree(conn);
     765          829 : }
     766              : 
     767              : /*
     768              :  * Receive a message available from XLOG stream.
     769              :  *
     770              :  * Returns:
     771              :  *
     772              :  *   If data was received, returns the length of the data. *buffer is set to
     773              :  *   point to a buffer holding the received message. The buffer is only valid
     774              :  *   until the next libpqrcv_* call.
     775              :  *
     776              :  *   If no data was available immediately, returns 0, and *wait_fd is set to a
     777              :  *   socket descriptor which can be waited on before trying again.
     778              :  *
     779              :  *   -1 if the server ended the COPY.
     780              :  *
     781              :  * ereports on error.
     782              :  */
     783              : static int
     784       368671 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
     785              :                  pgsocket *wait_fd)
     786              : {
     787              :     int         rawlen;
     788              : 
     789       368671 :     PQfreemem(conn->recvBuf);
     790       368671 :     conn->recvBuf = NULL;
     791              : 
     792              :     /* Try to receive a CopyData message */
     793       368671 :     rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
     794       368671 :     if (rawlen == 0)
     795              :     {
     796              :         /* Try consuming some data. */
     797       179860 :         if (PQconsumeInput(conn->streamConn) == 0)
     798           41 :             ereport(ERROR,
     799              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     800              :                      errmsg("could not receive data from WAL stream: %s",
     801              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     802              : 
     803              :         /* Now that we've consumed some input, try again */
     804       179819 :         rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
     805       179819 :         if (rawlen == 0)
     806              :         {
     807              :             /* Tell caller to try again when our socket is ready. */
     808        67061 :             *wait_fd = PQsocket(conn->streamConn);
     809        67061 :             return 0;
     810              :         }
     811              :     }
     812       301569 :     if (rawlen == -1)           /* end-of-streaming or error */
     813              :     {
     814              :         PGresult   *res;
     815              : 
     816          249 :         res = libpqsrv_get_result(conn->streamConn,
     817              :                                   WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     818          249 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     819              :         {
     820          233 :             PQclear(res);
     821              : 
     822              :             /* Verify that there are no more results. */
     823          233 :             res = libpqsrv_get_result(conn->streamConn,
     824              :                                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     825          233 :             if (res != NULL)
     826              :             {
     827           39 :                 PQclear(res);
     828              : 
     829              :                 /*
     830              :                  * If the other side closed the connection orderly (otherwise
     831              :                  * we'd seen an error, or PGRES_COPY_IN) don't report an error
     832              :                  * here, but let callers deal with it.
     833              :                  */
     834           39 :                 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
     835           39 :                     return -1;
     836              : 
     837            0 :                 ereport(ERROR,
     838              :                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
     839              :                          errmsg("unexpected result after CommandComplete: %s",
     840              :                                 PQerrorMessage(conn->streamConn))));
     841              :             }
     842              : 
     843          194 :             return -1;
     844              :         }
     845           16 :         else if (PQresultStatus(res) == PGRES_COPY_IN)
     846              :         {
     847           13 :             PQclear(res);
     848           13 :             return -1;
     849              :         }
     850              :         else
     851            3 :             ereport(ERROR,
     852              :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     853              :                      errmsg("could not receive data from WAL stream: %s",
     854              :                             pchomp(PQerrorMessage(conn->streamConn)))));
     855              :     }
     856       301320 :     if (rawlen < -1)
     857            0 :         ereport(ERROR,
     858              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     859              :                  errmsg("could not receive data from WAL stream: %s",
     860              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     861              : 
     862              :     /* Return received messages to caller */
     863       301320 :     *buffer = conn->recvBuf;
     864       301320 :     return rawlen;
     865              : }
     866              : 
     867              : /*
     868              :  * Send a message to XLOG stream.
     869              :  *
     870              :  * ereports on error.
     871              :  */
     872              : static void
     873        98032 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
     874              : {
     875       196064 :     if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
     876        98032 :         PQflush(conn->streamConn))
     877            0 :         ereport(ERROR,
     878              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     879              :                  errmsg("could not send data to WAL stream: %s",
     880              :                         pchomp(PQerrorMessage(conn->streamConn)))));
     881        98032 : }
     882              : 
     883              : /*
     884              :  * Create new replication slot.
     885              :  * Returns the name of the exported snapshot for logical slot or NULL for
     886              :  * physical slot.
     887              :  */
     888              : static char *
     889          313 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
     890              :                      bool temporary, bool two_phase, bool failover,
     891              :                      CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
     892              : {
     893              :     PGresult   *res;
     894              :     StringInfoData cmd;
     895              :     char       *snapshot;
     896              :     int         use_new_options_syntax;
     897              : 
     898          313 :     use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
     899              : 
     900          313 :     initStringInfo(&cmd);
     901              : 
     902          313 :     appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
     903              : 
     904          313 :     if (temporary)
     905            0 :         appendStringInfoString(&cmd, " TEMPORARY");
     906              : 
     907          313 :     if (conn->logical)
     908              :     {
     909          313 :         appendStringInfoString(&cmd, " LOGICAL pgoutput ");
     910          313 :         if (use_new_options_syntax)
     911          313 :             appendStringInfoChar(&cmd, '(');
     912          313 :         if (two_phase)
     913              :         {
     914            1 :             appendStringInfoString(&cmd, "TWO_PHASE");
     915            1 :             if (use_new_options_syntax)
     916            1 :                 appendStringInfoString(&cmd, ", ");
     917              :             else
     918            0 :                 appendStringInfoChar(&cmd, ' ');
     919              :         }
     920              : 
     921          313 :         if (failover)
     922              :         {
     923            9 :             appendStringInfoString(&cmd, "FAILOVER");
     924            9 :             if (use_new_options_syntax)
     925            9 :                 appendStringInfoString(&cmd, ", ");
     926              :             else
     927            0 :                 appendStringInfoChar(&cmd, ' ');
     928              :         }
     929              : 
     930          313 :         if (use_new_options_syntax)
     931              :         {
     932          313 :             switch (snapshot_action)
     933              :             {
     934            0 :                 case CRS_EXPORT_SNAPSHOT:
     935            0 :                     appendStringInfoString(&cmd, "SNAPSHOT 'export'");
     936            0 :                     break;
     937          114 :                 case CRS_NOEXPORT_SNAPSHOT:
     938          114 :                     appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
     939          114 :                     break;
     940          199 :                 case CRS_USE_SNAPSHOT:
     941          199 :                     appendStringInfoString(&cmd, "SNAPSHOT 'use'");
     942          199 :                     break;
     943              :             }
     944              :         }
     945              :         else
     946              :         {
     947            0 :             switch (snapshot_action)
     948              :             {
     949            0 :                 case CRS_EXPORT_SNAPSHOT:
     950            0 :                     appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
     951            0 :                     break;
     952            0 :                 case CRS_NOEXPORT_SNAPSHOT:
     953            0 :                     appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
     954            0 :                     break;
     955            0 :                 case CRS_USE_SNAPSHOT:
     956            0 :                     appendStringInfoString(&cmd, "USE_SNAPSHOT");
     957            0 :                     break;
     958              :             }
     959              :         }
     960              : 
     961          313 :         if (use_new_options_syntax)
     962          313 :             appendStringInfoChar(&cmd, ')');
     963              :     }
     964              :     else
     965              :     {
     966            0 :         if (use_new_options_syntax)
     967            0 :             appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
     968              :         else
     969            0 :             appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
     970              :     }
     971              : 
     972          313 :     res = libpqsrv_exec(conn->streamConn,
     973          313 :                         cmd.data,
     974              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     975          313 :     pfree(cmd.data);
     976              : 
     977          313 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     978            0 :         ereport(ERROR,
     979              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     980              :                  errmsg("could not create replication slot \"%s\": %s",
     981              :                         slotname, pchomp(PQerrorMessage(conn->streamConn)))));
     982              : 
     983          313 :     if (lsn)
     984          199 :         *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
     985          199 :                                                    CStringGetDatum(PQgetvalue(res, 0, 1))));
     986              : 
     987          313 :     if (!PQgetisnull(res, 0, 2))
     988            0 :         snapshot = pstrdup(PQgetvalue(res, 0, 2));
     989              :     else
     990          313 :         snapshot = NULL;
     991              : 
     992          313 :     PQclear(res);
     993              : 
     994          313 :     return snapshot;
     995              : }
     996              : 
     997              : /*
     998              :  * Change the definition of the replication slot.
     999              :  */
    1000              : static void
    1001            5 : libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
    1002              :                     const bool *failover, const bool *two_phase)
    1003              : {
    1004              :     StringInfoData cmd;
    1005              :     PGresult   *res;
    1006              : 
    1007            5 :     initStringInfo(&cmd);
    1008            5 :     appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( ",
    1009              :                      quote_identifier(slotname));
    1010              : 
    1011            5 :     if (failover)
    1012            4 :         appendStringInfo(&cmd, "FAILOVER %s",
    1013            4 :                          *failover ? "true" : "false");
    1014              : 
    1015            5 :     if (failover && two_phase)
    1016            0 :         appendStringInfoString(&cmd, ", ");
    1017              : 
    1018            5 :     if (two_phase)
    1019            1 :         appendStringInfo(&cmd, "TWO_PHASE %s",
    1020            1 :                          *two_phase ? "true" : "false");
    1021              : 
    1022            5 :     appendStringInfoString(&cmd, " );");
    1023              : 
    1024            5 :     res = libpqsrv_exec(conn->streamConn, cmd.data,
    1025              :                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    1026            5 :     pfree(cmd.data);
    1027              : 
    1028            5 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
    1029            0 :         ereport(ERROR,
    1030              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1031              :                  errmsg("could not alter replication slot \"%s\": %s",
    1032              :                         slotname, pchomp(PQerrorMessage(conn->streamConn)))));
    1033              : 
    1034            5 :     PQclear(res);
    1035            5 : }
    1036              : 
    1037              : /*
    1038              :  * Return PID of remote backend process.
    1039              :  */
    1040              : static pid_t
    1041            0 : libpqrcv_get_backend_pid(WalReceiverConn *conn)
    1042              : {
    1043            0 :     return PQbackendPID(conn->streamConn);
    1044              : }
    1045              : 
    1046              : /*
    1047              :  * Convert tuple query result to tuplestore.
    1048              :  */
    1049              : static void
    1050         1160 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
    1051              :                        const int nRetTypes, const Oid *retTypes)
    1052              : {
    1053              :     int         tupn;
    1054              :     int         coln;
    1055         1160 :     int         nfields = PQnfields(pgres);
    1056              :     HeapTuple   tuple;
    1057              :     AttInMetadata *attinmeta;
    1058              :     MemoryContext rowcontext;
    1059              :     MemoryContext oldcontext;
    1060              : 
    1061              :     /* Make sure we got expected number of fields. */
    1062         1160 :     if (nfields != nRetTypes)
    1063            0 :         ereport(ERROR,
    1064              :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
    1065              :                  errmsg("invalid query response"),
    1066              :                  errdetail("Expected %d fields, got %d fields.",
    1067              :                            nRetTypes, nfields)));
    1068              : 
    1069         1160 :     walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1070              : 
    1071              :     /* Create tuple descriptor corresponding to expected result. */
    1072         1160 :     walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
    1073         4282 :     for (coln = 0; coln < nRetTypes; coln++)
    1074         3122 :         TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
    1075         3122 :                            PQfname(pgres, coln), retTypes[coln], -1, 0);
    1076         1160 :     attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
    1077              : 
    1078              :     /* No point in doing more here if there were no tuples returned. */
    1079         1160 :     if (PQntuples(pgres) == 0)
    1080           35 :         return;
    1081              : 
    1082              :     /* Create temporary context for local allocations. */
    1083         1125 :     rowcontext = AllocSetContextCreate(CurrentMemoryContext,
    1084              :                                        "libpqrcv query result context",
    1085              :                                        ALLOCSET_DEFAULT_SIZES);
    1086              : 
    1087              :     /* Process returned rows. */
    1088         2620 :     for (tupn = 0; tupn < PQntuples(pgres); tupn++)
    1089              :     {
    1090              :         char       *cstrs[MaxTupleAttributeNumber];
    1091              : 
    1092         1495 :         CHECK_FOR_INTERRUPTS();
    1093              : 
    1094              :         /* Do the allocations in temporary context. */
    1095         1495 :         oldcontext = MemoryContextSwitchTo(rowcontext);
    1096              : 
    1097              :         /*
    1098              :          * Fill cstrs with null-terminated strings of column values.
    1099              :          */
    1100         6209 :         for (coln = 0; coln < nfields; coln++)
    1101              :         {
    1102         4714 :             if (PQgetisnull(pgres, tupn, coln))
    1103          613 :                 cstrs[coln] = NULL;
    1104              :             else
    1105         4101 :                 cstrs[coln] = PQgetvalue(pgres, tupn, coln);
    1106              :         }
    1107              : 
    1108              :         /* Convert row to a tuple, and add it to the tuplestore */
    1109         1495 :         tuple = BuildTupleFromCStrings(attinmeta, cstrs);
    1110         1495 :         tuplestore_puttuple(walres->tuplestore, tuple);
    1111              : 
    1112              :         /* Clean up */
    1113         1495 :         MemoryContextSwitchTo(oldcontext);
    1114         1495 :         MemoryContextReset(rowcontext);
    1115              :     }
    1116              : 
    1117         1125 :     MemoryContextDelete(rowcontext);
    1118              : }
    1119              : 
    1120              : /*
    1121              :  * Public interface for sending generic queries (and commands).
    1122              :  *
    1123              :  * This can only be called from process connected to database.
    1124              :  */
    1125              : static WalRcvExecResult *
    1126         2014 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
    1127              :               const int nRetTypes, const Oid *retTypes)
    1128              : {
    1129         2014 :     PGresult   *pgres = NULL;
    1130         2014 :     WalRcvExecResult *walres = palloc0_object(WalRcvExecResult);
    1131              :     char       *diag_sqlstate;
    1132              : 
    1133         2014 :     if (MyDatabaseId == InvalidOid)
    1134            0 :         ereport(ERROR,
    1135              :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1136              :                  errmsg("the query interface requires a database connection")));
    1137              : 
    1138         2014 :     pgres = libpqsrv_exec(conn->streamConn,
    1139              :                           query,
    1140              :                           WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    1141              : 
    1142         2014 :     switch (PQresultStatus(pgres))
    1143              :     {
    1144         1160 :         case PGRES_TUPLES_OK:
    1145              :         case PGRES_SINGLE_TUPLE:
    1146              :         case PGRES_TUPLES_CHUNK:
    1147         1160 :             walres->status = WALRCV_OK_TUPLES;
    1148         1160 :             libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
    1149         1160 :             break;
    1150              : 
    1151            0 :         case PGRES_COPY_IN:
    1152            0 :             walres->status = WALRCV_OK_COPY_IN;
    1153            0 :             break;
    1154              : 
    1155          196 :         case PGRES_COPY_OUT:
    1156          196 :             walres->status = WALRCV_OK_COPY_OUT;
    1157          196 :             break;
    1158              : 
    1159            0 :         case PGRES_COPY_BOTH:
    1160            0 :             walres->status = WALRCV_OK_COPY_BOTH;
    1161            0 :             break;
    1162              : 
    1163          655 :         case PGRES_COMMAND_OK:
    1164          655 :             walres->status = WALRCV_OK_COMMAND;
    1165          655 :             break;
    1166              : 
    1167              :             /* Empty query is considered error. */
    1168            0 :         case PGRES_EMPTY_QUERY:
    1169            0 :             walres->status = WALRCV_ERROR;
    1170            0 :             walres->err = _("empty query");
    1171            0 :             break;
    1172              : 
    1173            0 :         case PGRES_PIPELINE_SYNC:
    1174              :         case PGRES_PIPELINE_ABORTED:
    1175            0 :             walres->status = WALRCV_ERROR;
    1176            0 :             walres->err = _("unexpected pipeline mode");
    1177            0 :             break;
    1178              : 
    1179            3 :         case PGRES_NONFATAL_ERROR:
    1180              :         case PGRES_FATAL_ERROR:
    1181              :         case PGRES_BAD_RESPONSE:
    1182            3 :             walres->status = WALRCV_ERROR;
    1183            3 :             walres->err = pchomp(PQerrorMessage(conn->streamConn));
    1184            3 :             diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
    1185            3 :             if (diag_sqlstate)
    1186            3 :                 walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
    1187              :                                                  diag_sqlstate[1],
    1188              :                                                  diag_sqlstate[2],
    1189              :                                                  diag_sqlstate[3],
    1190              :                                                  diag_sqlstate[4]);
    1191            3 :             break;
    1192              :     }
    1193              : 
    1194         2014 :     PQclear(pgres);
    1195              : 
    1196         2014 :     return walres;
    1197              : }
    1198              : 
    1199              : /*
    1200              :  * Given a List of strings, return it as single comma separated
    1201              :  * string, quoting identifiers as needed.
    1202              :  *
    1203              :  * This is essentially the reverse of SplitIdentifierString.
    1204              :  *
    1205              :  * The caller should free the result.
    1206              :  */
    1207              : static char *
    1208          409 : stringlist_to_identifierstr(PGconn *conn, List *strings)
    1209              : {
    1210              :     ListCell   *lc;
    1211              :     StringInfoData res;
    1212          409 :     bool        first = true;
    1213              : 
    1214          409 :     initStringInfo(&res);
    1215              : 
    1216         1069 :     foreach(lc, strings)
    1217              :     {
    1218          660 :         char       *val = strVal(lfirst(lc));
    1219              :         char       *val_escaped;
    1220              : 
    1221          660 :         if (first)
    1222          409 :             first = false;
    1223              :         else
    1224          251 :             appendStringInfoChar(&res, ',');
    1225              : 
    1226          660 :         val_escaped = PQescapeIdentifier(conn, val, strlen(val));
    1227          660 :         if (!val_escaped)
    1228              :         {
    1229            0 :             free(res.data);
    1230            0 :             return NULL;
    1231              :         }
    1232          660 :         appendStringInfoString(&res, val_escaped);
    1233          660 :         PQfreemem(val_escaped);
    1234              :     }
    1235              : 
    1236          409 :     return res.data;
    1237              : }
        

Generated by: LCOV version 2.0-1