LCOV - code coverage report
Current view: top level - src/backend/replication/libpqwalreceiver - libpqwalreceiver.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 312 393 79.4 %
Date: 2021-11-29 04:09:17 Functions: 20 21 95.2 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * libpqwalreceiver.c
       4             :  *
       5             :  * This file contains the libpq-specific parts of walreceiver. It's
       6             :  * loaded as a dynamic module to avoid linking the main server binary with
       7             :  * libpq.
       8             :  *
       9             :  * Portions Copyright (c) 2010-2021, PostgreSQL Global Development Group
      10             :  *
      11             :  *
      12             :  * IDENTIFICATION
      13             :  *    src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
      14             :  *
      15             :  *-------------------------------------------------------------------------
      16             :  */
      17             : #include "postgres.h"
      18             : 
      19             : #include <unistd.h>
      20             : #include <sys/time.h>
      21             : 
      22             : #include "access/xlog.h"
      23             : #include "catalog/pg_type.h"
      24             : #include "common/connect.h"
      25             : #include "funcapi.h"
      26             : #include "libpq-fe.h"
      27             : #include "mb/pg_wchar.h"
      28             : #include "miscadmin.h"
      29             : #include "pgstat.h"
      30             : #include "pqexpbuffer.h"
      31             : #include "replication/walreceiver.h"
      32             : #include "utils/builtins.h"
      33             : #include "utils/memutils.h"
      34             : #include "utils/pg_lsn.h"
      35             : #include "utils/tuplestore.h"
      36             : 
      37         704 : PG_MODULE_MAGIC;
      38             : 
      39             : void        _PG_init(void);
      40             : 
      41             : struct WalReceiverConn
      42             : {
      43             :     /* Current connection to the primary, if any */
      44             :     PGconn     *streamConn;
      45             :     /* Used to remember if the connection is logical or physical */
      46             :     bool        logical;
      47             :     /* Buffer for currently read records */
      48             :     char       *recvBuf;
      49             : };
      50             : 
      51             : /* Prototypes for interface functions */
      52             : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
      53             :                                          bool logical, const char *appname,
      54             :                                          char **err);
      55             : static void libpqrcv_check_conninfo(const char *conninfo);
      56             : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
      57             : static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
      58             :                                     char **sender_host, int *sender_port);
      59             : static char *libpqrcv_identify_system(WalReceiverConn *conn,
      60             :                                       TimeLineID *primary_tli);
      61             : static int  libpqrcv_server_version(WalReceiverConn *conn);
      62             : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
      63             :                                              TimeLineID tli, char **filename,
      64             :                                              char **content, int *len);
      65             : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
      66             :                                     const WalRcvStreamOptions *options);
      67             : static void libpqrcv_endstreaming(WalReceiverConn *conn,
      68             :                                   TimeLineID *next_tli);
      69             : static int  libpqrcv_receive(WalReceiverConn *conn, char **buffer,
      70             :                              pgsocket *wait_fd);
      71             : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
      72             :                           int nbytes);
      73             : static char *libpqrcv_create_slot(WalReceiverConn *conn,
      74             :                                   const char *slotname,
      75             :                                   bool temporary,
      76             :                                   bool two_phase,
      77             :                                   CRSSnapshotAction snapshot_action,
      78             :                                   XLogRecPtr *lsn);
      79             : static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
      80             : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
      81             :                                        const char *query,
      82             :                                        const int nRetTypes,
      83             :                                        const Oid *retTypes);
      84             : static void libpqrcv_disconnect(WalReceiverConn *conn);
      85             : 
      86             : static WalReceiverFunctionsType PQWalReceiverFunctions = {
      87             :     libpqrcv_connect,
      88             :     libpqrcv_check_conninfo,
      89             :     libpqrcv_get_conninfo,
      90             :     libpqrcv_get_senderinfo,
      91             :     libpqrcv_identify_system,
      92             :     libpqrcv_server_version,
      93             :     libpqrcv_readtimelinehistoryfile,
      94             :     libpqrcv_startstreaming,
      95             :     libpqrcv_endstreaming,
      96             :     libpqrcv_receive,
      97             :     libpqrcv_send,
      98             :     libpqrcv_create_slot,
      99             :     libpqrcv_get_backend_pid,
     100             :     libpqrcv_exec,
     101             :     libpqrcv_disconnect
     102             : };
     103             : 
     104             : /* Prototypes for private functions */
     105             : static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
     106             : static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
     107             : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
     108             : 
     109             : /*
     110             :  * Module initialization function
     111             :  */
     112             : void
     113         704 : _PG_init(void)
     114             : {
     115         704 :     if (WalReceiverFunctions != NULL)
     116           0 :         elog(ERROR, "libpqwalreceiver already loaded");
     117         704 :     WalReceiverFunctions = &PQWalReceiverFunctions;
     118         704 : }
     119             : 
     120             : /*
     121             :  * Establish the connection to the primary server for XLOG streaming
     122             :  *
     123             :  * Returns NULL on error and fills the err with palloc'ed error message.
     124             :  */
     125             : static WalReceiverConn *
     126         684 : libpqrcv_connect(const char *conninfo, bool logical, const char *appname,
     127             :                  char **err)
     128             : {
     129             :     WalReceiverConn *conn;
     130             :     PostgresPollingStatusType status;
     131             :     const char *keys[6];
     132             :     const char *vals[6];
     133         684 :     int         i = 0;
     134             : 
     135             :     /*
     136             :      * We use the expand_dbname parameter to process the connection string (or
     137             :      * URI), and pass some extra options.
     138             :      */
     139         684 :     keys[i] = "dbname";
     140         684 :     vals[i] = conninfo;
     141         684 :     keys[++i] = "replication";
     142         684 :     vals[i] = logical ? "database" : "true";
     143         684 :     if (!logical)
     144             :     {
     145             :         /*
     146             :          * The database name is ignored by the server in replication mode, but
     147             :          * specify "replication" for .pgpass lookup.
     148             :          */
     149         250 :         keys[++i] = "dbname";
     150         250 :         vals[i] = "replication";
     151             :     }
     152         684 :     keys[++i] = "fallback_application_name";
     153         684 :     vals[i] = appname;
     154         684 :     if (logical)
     155             :     {
     156             :         /* Tell the publisher to translate to our encoding */
     157         434 :         keys[++i] = "client_encoding";
     158         434 :         vals[i] = GetDatabaseEncodingName();
     159             : 
     160             :         /*
     161             :          * Force assorted GUC parameters to settings that ensure that the
     162             :          * publisher will output data values in a form that is unambiguous to
     163             :          * the subscriber.  (We don't want to modify the subscriber's GUC
     164             :          * settings, since that might surprise user-defined code running in
     165             :          * the subscriber, such as triggers.)  This should match what pg_dump
     166             :          * does.
     167             :          */
     168         434 :         keys[++i] = "options";
     169         434 :         vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
     170             :     }
     171         684 :     keys[++i] = NULL;
     172         684 :     vals[i] = NULL;
     173             : 
     174             :     Assert(i < sizeof(keys));
     175             : 
     176         684 :     conn = palloc0(sizeof(WalReceiverConn));
     177         684 :     conn->streamConn = PQconnectStartParams(keys, vals,
     178             :                                              /* expand_dbname = */ true);
     179         684 :     if (PQstatus(conn->streamConn) == CONNECTION_BAD)
     180             :     {
     181          82 :         *err = pchomp(PQerrorMessage(conn->streamConn));
     182          82 :         return NULL;
     183             :     }
     184             : 
     185             :     /*
     186             :      * Poll connection until we have OK or FAILED status.
     187             :      *
     188             :      * Per spec for PQconnectPoll, first wait till socket is write-ready.
     189             :      */
     190         602 :     status = PGRES_POLLING_WRITING;
     191             :     do
     192             :     {
     193             :         int         io_flag;
     194             :         int         rc;
     195             : 
     196        1682 :         if (status == PGRES_POLLING_READING)
     197         602 :             io_flag = WL_SOCKET_READABLE;
     198             : #ifdef WIN32
     199             :         /* Windows needs a different test while waiting for connection-made */
     200             :         else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
     201             :             io_flag = WL_SOCKET_CONNECTED;
     202             : #endif
     203             :         else
     204        1080 :             io_flag = WL_SOCKET_WRITEABLE;
     205             : 
     206        1682 :         rc = WaitLatchOrSocket(MyLatch,
     207             :                                WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
     208        1682 :                                PQsocket(conn->streamConn),
     209             :                                0,
     210             :                                WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
     211             : 
     212             :         /* Interrupted? */
     213        1682 :         if (rc & WL_LATCH_SET)
     214             :         {
     215         482 :             ResetLatch(MyLatch);
     216         482 :             ProcessWalRcvInterrupts();
     217             :         }
     218             : 
     219             :         /* If socket is ready, advance the libpq state machine */
     220        1678 :         if (rc & io_flag)
     221        1200 :             status = PQconnectPoll(conn->streamConn);
     222        1678 :     } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
     223             : 
     224         598 :     if (PQstatus(conn->streamConn) != CONNECTION_OK)
     225             :     {
     226          14 :         *err = pchomp(PQerrorMessage(conn->streamConn));
     227          14 :         return NULL;
     228             :     }
     229             : 
     230         584 :     if (logical)
     231             :     {
     232             :         PGresult   *res;
     233             : 
     234         430 :         res = libpqrcv_PQexec(conn->streamConn,
     235             :                               ALWAYS_SECURE_SEARCH_PATH_SQL);
     236         430 :         if (PQresultStatus(res) != PGRES_TUPLES_OK)
     237             :         {
     238           0 :             PQclear(res);
     239           0 :             ereport(ERROR,
     240             :                     (errmsg("could not clear search path: %s",
     241             :                             pchomp(PQerrorMessage(conn->streamConn)))));
     242             :         }
     243         430 :         PQclear(res);
     244             :     }
     245             : 
     246         584 :     conn->logical = logical;
     247             : 
     248         584 :     return conn;
     249             : }
     250             : 
     251             : /*
     252             :  * Validate connection info string (just try to parse it)
     253             :  */
     254             : static void
     255         138 : libpqrcv_check_conninfo(const char *conninfo)
     256             : {
     257         138 :     PQconninfoOption *opts = NULL;
     258         138 :     char       *err = NULL;
     259             : 
     260         138 :     opts = PQconninfoParse(conninfo, &err);
     261         138 :     if (opts == NULL)
     262             :     {
     263             :         /* The error string is malloc'd, so we must free it explicitly */
     264           8 :         char       *errcopy = err ? pstrdup(err) : "out of memory";
     265             : 
     266           8 :         PQfreemem(err);
     267           8 :         ereport(ERROR,
     268             :                 (errcode(ERRCODE_SYNTAX_ERROR),
     269             :                  errmsg("invalid connection string syntax: %s", errcopy)));
     270             :     }
     271             : 
     272         130 :     PQconninfoFree(opts);
     273         130 : }
     274             : 
     275             : /*
     276             :  * Return a user-displayable conninfo string.  Any security-sensitive fields
     277             :  * are obfuscated.
     278             :  */
     279             : static char *
     280         154 : libpqrcv_get_conninfo(WalReceiverConn *conn)
     281             : {
     282             :     PQconninfoOption *conn_opts;
     283             :     PQconninfoOption *conn_opt;
     284             :     PQExpBufferData buf;
     285             :     char       *retval;
     286             : 
     287             :     Assert(conn->streamConn != NULL);
     288             : 
     289         154 :     initPQExpBuffer(&buf);
     290         154 :     conn_opts = PQconninfo(conn->streamConn);
     291             : 
     292         154 :     if (conn_opts == NULL)
     293           0 :         ereport(ERROR,
     294             :                 (errcode(ERRCODE_OUT_OF_MEMORY),
     295             :                  errmsg("could not parse connection string: %s",
     296             :                         _("out of memory"))));
     297             : 
     298             :     /* build a clean connection string from pieces */
     299        5698 :     for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
     300             :     {
     301             :         bool        obfuscate;
     302             : 
     303             :         /* Skip debug and empty options */
     304        5544 :         if (strchr(conn_opt->dispchar, 'D') ||
     305        5390 :             conn_opt->val == NULL ||
     306        2310 :             conn_opt->val[0] == '\0')
     307        3388 :             continue;
     308             : 
     309             :         /* Obfuscate security-sensitive options */
     310        2156 :         obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
     311             : 
     312        4312 :         appendPQExpBuffer(&buf, "%s%s=%s",
     313        2156 :                           buf.len == 0 ? "" : " ",
     314             :                           conn_opt->keyword,
     315             :                           obfuscate ? "********" : conn_opt->val);
     316             :     }
     317             : 
     318         154 :     PQconninfoFree(conn_opts);
     319             : 
     320         154 :     retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
     321         154 :     termPQExpBuffer(&buf);
     322         154 :     return retval;
     323             : }
     324             : 
     325             : /*
     326             :  * Provides information of sender this WAL receiver is connected to.
     327             :  */
     328             : static void
     329         154 : libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
     330             :                         int *sender_port)
     331             : {
     332         154 :     char       *ret = NULL;
     333             : 
     334         154 :     *sender_host = NULL;
     335         154 :     *sender_port = 0;
     336             : 
     337             :     Assert(conn->streamConn != NULL);
     338             : 
     339         154 :     ret = PQhost(conn->streamConn);
     340         154 :     if (ret && strlen(ret) != 0)
     341         154 :         *sender_host = pstrdup(ret);
     342             : 
     343         154 :     ret = PQport(conn->streamConn);
     344         154 :     if (ret && strlen(ret) != 0)
     345         154 :         *sender_port = atoi(ret);
     346         154 : }
     347             : 
     348             : /*
     349             :  * Check that primary's system identifier matches ours, and fetch the current
     350             :  * timeline ID of the primary.
     351             :  */
     352             : static char *
     353         294 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
     354             : {
     355             :     PGresult   *res;
     356             :     char       *primary_sysid;
     357             : 
     358             :     /*
     359             :      * Get the system identifier and timeline ID as a DataRow message from the
     360             :      * primary server.
     361             :      */
     362         294 :     res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
     363         294 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     364             :     {
     365           0 :         PQclear(res);
     366           0 :         ereport(ERROR,
     367             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     368             :                  errmsg("could not receive database system identifier and timeline ID from "
     369             :                         "the primary server: %s",
     370             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     371             :     }
     372         294 :     if (PQnfields(res) < 3 || PQntuples(res) != 1)
     373             :     {
     374           0 :         int         ntuples = PQntuples(res);
     375           0 :         int         nfields = PQnfields(res);
     376             : 
     377           0 :         PQclear(res);
     378           0 :         ereport(ERROR,
     379             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     380             :                  errmsg("invalid response from primary server"),
     381             :                  errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
     382             :                            ntuples, nfields, 3, 1)));
     383             :     }
     384         294 :     primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
     385         294 :     *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
     386         294 :     PQclear(res);
     387             : 
     388         294 :     return primary_sysid;
     389             : }
     390             : 
     391             : /*
     392             :  * Thin wrapper around libpq to obtain server version.
     393             :  */
     394             : static int
     395         452 : libpqrcv_server_version(WalReceiverConn *conn)
     396             : {
     397         452 :     return PQserverVersion(conn->streamConn);
     398             : }
     399             : 
     400             : /*
     401             :  * Start streaming WAL data from given streaming options.
     402             :  *
     403             :  * Returns true if we switched successfully to copy-both mode. False
     404             :  * means the server received the command and executed it successfully, but
     405             :  * didn't switch to copy-mode.  That means that there was no WAL on the
     406             :  * requested timeline and starting point, because the server switched to
     407             :  * another timeline at or before the requested starting point. On failure,
     408             :  * throws an ERROR.
     409             :  */
     410             : static bool
     411         456 : libpqrcv_startstreaming(WalReceiverConn *conn,
     412             :                         const WalRcvStreamOptions *options)
     413             : {
     414             :     StringInfoData cmd;
     415             :     PGresult   *res;
     416             : 
     417             :     Assert(options->logical == conn->logical);
     418             :     Assert(options->slotname || !options->logical);
     419             : 
     420         456 :     initStringInfo(&cmd);
     421             : 
     422             :     /* Build the command. */
     423         456 :     appendStringInfoString(&cmd, "START_REPLICATION");
     424         456 :     if (options->slotname != NULL)
     425         306 :         appendStringInfo(&cmd, " SLOT \"%s\"",
     426             :                          options->slotname);
     427             : 
     428         456 :     if (options->logical)
     429         282 :         appendStringInfoString(&cmd, " LOGICAL");
     430             : 
     431         456 :     appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
     432             : 
     433             :     /*
     434             :      * Additional options are different depending on if we are doing logical
     435             :      * or physical replication.
     436             :      */
     437         456 :     if (options->logical)
     438             :     {
     439             :         char       *pubnames_str;
     440             :         List       *pubnames;
     441             :         char       *pubnames_literal;
     442             : 
     443         282 :         appendStringInfoString(&cmd, " (");
     444             : 
     445         282 :         appendStringInfo(&cmd, "proto_version '%u'",
     446             :                          options->proto.logical.proto_version);
     447             : 
     448         314 :         if (options->proto.logical.streaming &&
     449          32 :             PQserverVersion(conn->streamConn) >= 140000)
     450          32 :             appendStringInfoString(&cmd, ", streaming 'on'");
     451             : 
     452         286 :         if (options->proto.logical.twophase &&
     453           4 :             PQserverVersion(conn->streamConn) >= 150000)
     454           4 :             appendStringInfoString(&cmd, ", two_phase 'on'");
     455             : 
     456         282 :         pubnames = options->proto.logical.publication_names;
     457         282 :         pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
     458         282 :         if (!pubnames_str)
     459           0 :             ereport(ERROR,
     460             :                     (errcode(ERRCODE_OUT_OF_MEMORY),    /* likely guess */
     461             :                      errmsg("could not start WAL streaming: %s",
     462             :                             pchomp(PQerrorMessage(conn->streamConn)))));
     463         282 :         pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
     464             :                                            strlen(pubnames_str));
     465         282 :         if (!pubnames_literal)
     466           0 :             ereport(ERROR,
     467             :                     (errcode(ERRCODE_OUT_OF_MEMORY),    /* likely guess */
     468             :                      errmsg("could not start WAL streaming: %s",
     469             :                             pchomp(PQerrorMessage(conn->streamConn)))));
     470         282 :         appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
     471         282 :         PQfreemem(pubnames_literal);
     472         282 :         pfree(pubnames_str);
     473             : 
     474         292 :         if (options->proto.logical.binary &&
     475          10 :             PQserverVersion(conn->streamConn) >= 140000)
     476          10 :             appendStringInfoString(&cmd, ", binary 'true'");
     477             : 
     478         282 :         appendStringInfoChar(&cmd, ')');
     479             :     }
     480             :     else
     481         174 :         appendStringInfo(&cmd, " TIMELINE %u",
     482             :                          options->proto.physical.startpointTLI);
     483             : 
     484             :     /* Start streaming. */
     485         456 :     res = libpqrcv_PQexec(conn->streamConn, cmd.data);
     486         456 :     pfree(cmd.data);
     487             : 
     488         456 :     if (PQresultStatus(res) == PGRES_COMMAND_OK)
     489             :     {
     490           0 :         PQclear(res);
     491           0 :         return false;
     492             :     }
     493         456 :     else if (PQresultStatus(res) != PGRES_COPY_BOTH)
     494             :     {
     495           0 :         PQclear(res);
     496           0 :         ereport(ERROR,
     497             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     498             :                  errmsg("could not start WAL streaming: %s",
     499             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     500             :     }
     501         456 :     PQclear(res);
     502         456 :     return true;
     503             : }
     504             : 
     505             : /*
     506             :  * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
     507             :  * reported by the server, or 0 if it did not report it.
     508             :  */
     509             : static void
     510         228 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
     511             : {
     512             :     PGresult   *res;
     513             : 
     514             :     /*
     515             :      * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
     516             :      * block, but the risk seems small.
     517             :      */
     518         410 :     if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
     519         182 :         PQflush(conn->streamConn))
     520          46 :         ereport(ERROR,
     521             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     522             :                  errmsg("could not send end-of-streaming message to primary: %s",
     523             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     524             : 
     525         182 :     *next_tli = 0;
     526             : 
     527             :     /*
     528             :      * After COPY is finished, we should receive a result set indicating the
     529             :      * next timeline's ID, or just CommandComplete if the server was shut
     530             :      * down.
     531             :      *
     532             :      * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
     533             :      * also possible in case we aborted the copy in mid-stream.
     534             :      */
     535         182 :     res = libpqrcv_PQgetResult(conn->streamConn);
     536         182 :     if (PQresultStatus(res) == PGRES_TUPLES_OK)
     537             :     {
     538             :         /*
     539             :          * Read the next timeline's ID. The server also sends the timeline's
     540             :          * starting point, but it is ignored.
     541             :          */
     542          20 :         if (PQnfields(res) < 2 || PQntuples(res) != 1)
     543           0 :             ereport(ERROR,
     544             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     545             :                      errmsg("unexpected result set after end-of-streaming")));
     546          20 :         *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
     547          20 :         PQclear(res);
     548             : 
     549             :         /* the result set should be followed by CommandComplete */
     550          20 :         res = libpqrcv_PQgetResult(conn->streamConn);
     551             :     }
     552         162 :     else if (PQresultStatus(res) == PGRES_COPY_OUT)
     553             :     {
     554         162 :         PQclear(res);
     555             : 
     556             :         /* End the copy */
     557         162 :         if (PQendcopy(conn->streamConn))
     558           0 :             ereport(ERROR,
     559             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     560             :                      errmsg("error while shutting down streaming COPY: %s",
     561             :                             pchomp(PQerrorMessage(conn->streamConn)))));
     562             : 
     563             :         /* CommandComplete should follow */
     564         162 :         res = libpqrcv_PQgetResult(conn->streamConn);
     565             :     }
     566             : 
     567         182 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     568           0 :         ereport(ERROR,
     569             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     570             :                  errmsg("error reading result of streaming command: %s",
     571             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     572         182 :     PQclear(res);
     573             : 
     574             :     /* Verify that there are no more results */
     575         182 :     res = libpqrcv_PQgetResult(conn->streamConn);
     576         182 :     if (res != NULL)
     577           0 :         ereport(ERROR,
     578             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     579             :                  errmsg("unexpected result after CommandComplete: %s",
     580             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     581         182 : }
     582             : 
     583             : /*
     584             :  * Fetch the timeline history file for 'tli' from primary.
     585             :  */
     586             : static void
     587          18 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
     588             :                                  TimeLineID tli, char **filename,
     589             :                                  char **content, int *len)
     590             : {
     591             :     PGresult   *res;
     592             :     char        cmd[64];
     593             : 
     594             :     Assert(!conn->logical);
     595             : 
     596             :     /*
     597             :      * Request the primary to send over the history file for given timeline.
     598             :      */
     599          18 :     snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
     600          18 :     res = libpqrcv_PQexec(conn->streamConn, cmd);
     601          18 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     602             :     {
     603           0 :         PQclear(res);
     604           0 :         ereport(ERROR,
     605             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     606             :                  errmsg("could not receive timeline history file from "
     607             :                         "the primary server: %s",
     608             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     609             :     }
     610          18 :     if (PQnfields(res) != 2 || PQntuples(res) != 1)
     611             :     {
     612           0 :         int         ntuples = PQntuples(res);
     613           0 :         int         nfields = PQnfields(res);
     614             : 
     615           0 :         PQclear(res);
     616           0 :         ereport(ERROR,
     617             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     618             :                  errmsg("invalid response from primary server"),
     619             :                  errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
     620             :                            ntuples, nfields)));
     621             :     }
     622          18 :     *filename = pstrdup(PQgetvalue(res, 0, 0));
     623             : 
     624          18 :     *len = PQgetlength(res, 0, 1);
     625          18 :     *content = palloc(*len);
     626          18 :     memcpy(*content, PQgetvalue(res, 0, 1), *len);
     627          18 :     PQclear(res);
     628          18 : }
     629             : 
     630             : /*
     631             :  * Send a query and wait for the results by using the asynchronous libpq
     632             :  * functions and socket readiness events.
     633             :  *
     634             :  * We must not use the regular blocking libpq functions like PQexec()
     635             :  * since they are uninterruptible by signals on some platforms, such as
     636             :  * Windows.
     637             :  *
     638             :  * The function is modeled on PQexec() in libpq, but only implements
     639             :  * those parts that are in use in the walreceiver api.
     640             :  *
     641             :  * May return NULL, rather than an error result, on failure.
     642             :  */
     643             : static PGresult *
     644        2602 : libpqrcv_PQexec(PGconn *streamConn, const char *query)
     645             : {
     646        2602 :     PGresult   *lastResult = NULL;
     647             : 
     648             :     /*
     649             :      * PQexec() silently discards any prior query results on the connection.
     650             :      * This is not required for this function as it's expected that the caller
     651             :      * (which is this library in all cases) will behave correctly and we don't
     652             :      * have to be backwards compatible with old libpq.
     653             :      */
     654             : 
     655             :     /*
     656             :      * Submit the query.  Since we don't use non-blocking mode, this could
     657             :      * theoretically block.  In practice, since we don't send very long query
     658             :      * strings, the risk seems negligible.
     659             :      */
     660        2602 :     if (!PQsendQuery(streamConn, query))
     661           0 :         return NULL;
     662             : 
     663             :     for (;;)
     664        1976 :     {
     665             :         /* Wait for, and collect, the next PGresult. */
     666             :         PGresult   *result;
     667             : 
     668        4578 :         result = libpqrcv_PQgetResult(streamConn);
     669        4578 :         if (result == NULL)
     670        1976 :             break;              /* query is complete, or failure */
     671             : 
     672             :         /*
     673             :          * Emulate PQexec()'s behavior of returning the last result when there
     674             :          * are many.  We are fine with returning just last error message.
     675             :          */
     676        2602 :         PQclear(lastResult);
     677        2602 :         lastResult = result;
     678             : 
     679        5204 :         if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
     680        5034 :             PQresultStatus(lastResult) == PGRES_COPY_OUT ||
     681        4408 :             PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
     682        1976 :             PQstatus(streamConn) == CONNECTION_BAD)
     683             :             break;
     684             :     }
     685             : 
     686        2602 :     return lastResult;
     687             : }
     688             : 
     689             : /*
     690             :  * Perform the equivalent of PQgetResult(), but watch for interrupts.
     691             :  */
     692             : static PGresult *
     693        5584 : libpqrcv_PQgetResult(PGconn *streamConn)
     694             : {
     695             :     /*
     696             :      * Collect data until PQgetResult is ready to get the result without
     697             :      * blocking.
     698             :      */
     699        8206 :     while (PQisBusy(streamConn))
     700             :     {
     701             :         int         rc;
     702             : 
     703             :         /*
     704             :          * We don't need to break down the sleep into smaller increments,
     705             :          * since we'll get interrupted by signals and can handle any
     706             :          * interrupts here.
     707             :          */
     708        2668 :         rc = WaitLatchOrSocket(MyLatch,
     709             :                                WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
     710             :                                WL_LATCH_SET,
     711             :                                PQsocket(streamConn),
     712             :                                0,
     713             :                                WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
     714             : 
     715             :         /* Interrupted? */
     716        2668 :         if (rc & WL_LATCH_SET)
     717             :         {
     718           2 :             ResetLatch(MyLatch);
     719           2 :             ProcessWalRcvInterrupts();
     720             :         }
     721             : 
     722             :         /* Consume whatever data is available from the socket */
     723        2668 :         if (PQconsumeInput(streamConn) == 0)
     724             :         {
     725             :             /* trouble; return NULL */
     726          46 :             return NULL;
     727             :         }
     728             :     }
     729             : 
     730             :     /* Now we can collect and return the next PGresult */
     731        5538 :     return PQgetResult(streamConn);
     732             : }
     733             : 
     734             : /*
     735             :  * Disconnect connection to primary, if any.
     736             :  */
     737             : static void
     738         584 : libpqrcv_disconnect(WalReceiverConn *conn)
     739             : {
     740         584 :     PQfinish(conn->streamConn);
     741         584 :     if (conn->recvBuf != NULL)
     742          10 :         PQfreemem(conn->recvBuf);
     743         584 :     pfree(conn);
     744         584 : }
     745             : 
     746             : /*
     747             :  * Receive a message available from XLOG stream.
     748             :  *
     749             :  * Returns:
     750             :  *
     751             :  *   If data was received, returns the length of the data. *buffer is set to
     752             :  *   point to a buffer holding the received message. The buffer is only valid
     753             :  *   until the next libpqrcv_* call.
     754             :  *
     755             :  *   If no data was available immediately, returns 0, and *wait_fd is set to a
     756             :  *   socket descriptor which can be waited on before trying again.
     757             :  *
     758             :  *   -1 if the server ended the COPY.
     759             :  *
     760             :  * ereports on error.
     761             :  */
     762             : static int
     763      351058 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
     764             :                  pgsocket *wait_fd)
     765             : {
     766             :     int         rawlen;
     767             : 
     768      351058 :     if (conn->recvBuf != NULL)
     769      288130 :         PQfreemem(conn->recvBuf);
     770      351058 :     conn->recvBuf = NULL;
     771             : 
     772             :     /* Try to receive a CopyData message */
     773      351058 :     rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
     774      351058 :     if (rawlen == 0)
     775             :     {
     776             :         /* Try consuming some data. */
     777      184612 :         if (PQconsumeInput(conn->streamConn) == 0)
     778          58 :             ereport(ERROR,
     779             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
     780             :                      errmsg("could not receive data from WAL stream: %s",
     781             :                             pchomp(PQerrorMessage(conn->streamConn)))));
     782             : 
     783             :         /* Now that we've consumed some input, try again */
     784      184554 :         rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
     785      184554 :         if (rawlen == 0)
     786             :         {
     787             :             /* Tell caller to try again when our socket is ready. */
     788       62616 :             *wait_fd = PQsocket(conn->streamConn);
     789       62616 :             return 0;
     790             :         }
     791             :     }
     792      288384 :     if (rawlen == -1)           /* end-of-streaming or error */
     793             :     {
     794             :         PGresult   *res;
     795             : 
     796         244 :         res = libpqrcv_PQgetResult(conn->streamConn);
     797         244 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     798             :         {
     799         216 :             PQclear(res);
     800             : 
     801             :             /* Verify that there are no more results. */
     802         216 :             res = libpqrcv_PQgetResult(conn->streamConn);
     803         216 :             if (res != NULL)
     804             :             {
     805           0 :                 PQclear(res);
     806             : 
     807             :                 /*
     808             :                  * If the other side closed the connection orderly (otherwise
     809             :                  * we'd seen an error, or PGRES_COPY_IN) don't report an error
     810             :                  * here, but let callers deal with it.
     811             :                  */
     812           0 :                 if (PQstatus(conn->streamConn) == CONNECTION_BAD)
     813           0 :                     return -1;
     814             : 
     815           0 :                 ereport(ERROR,
     816             :                         (errcode(ERRCODE_PROTOCOL_VIOLATION),
     817             :                          errmsg("unexpected result after CommandComplete: %s",
     818             :                                 PQerrorMessage(conn->streamConn))));
     819             :             }
     820             : 
     821         216 :             return -1;
     822             :         }
     823          28 :         else if (PQresultStatus(res) == PGRES_COPY_IN)
     824             :         {
     825          20 :             PQclear(res);
     826          20 :             return -1;
     827             :         }
     828             :         else
     829             :         {
     830           8 :             PQclear(res);
     831           8 :             ereport(ERROR,
     832             :                     (errcode(ERRCODE_PROTOCOL_VIOLATION),
     833             :                      errmsg("could not receive data from WAL stream: %s",
     834             :                             pchomp(PQerrorMessage(conn->streamConn)))));
     835             :         }
     836             :     }
     837      288140 :     if (rawlen < -1)
     838           0 :         ereport(ERROR,
     839             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     840             :                  errmsg("could not receive data from WAL stream: %s",
     841             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     842             : 
     843             :     /* Return received messages to caller */
     844      288140 :     *buffer = conn->recvBuf;
     845      288140 :     return rawlen;
     846             : }
     847             : 
     848             : /*
     849             :  * Send a message to XLOG stream.
     850             :  *
     851             :  * ereports on error.
     852             :  */
     853             : static void
     854       42078 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
     855             : {
     856       84156 :     if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
     857       42078 :         PQflush(conn->streamConn))
     858           0 :         ereport(ERROR,
     859             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
     860             :                  errmsg("could not send data to WAL stream: %s",
     861             :                         pchomp(PQerrorMessage(conn->streamConn)))));
     862       42078 : }
     863             : 
     864             : /*
     865             :  * Create new replication slot.
     866             :  * Returns the name of the exported snapshot for logical slot or NULL for
     867             :  * physical slot.
     868             :  */
     869             : static char *
     870         252 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
     871             :                      bool temporary, bool two_phase, CRSSnapshotAction snapshot_action,
     872             :                      XLogRecPtr *lsn)
     873             : {
     874             :     PGresult   *res;
     875             :     StringInfoData cmd;
     876             :     char       *snapshot;
     877             :     int         use_new_options_syntax;
     878             : 
     879         252 :     use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
     880             : 
     881         252 :     initStringInfo(&cmd);
     882             : 
     883         252 :     appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
     884             : 
     885         252 :     if (temporary)
     886           0 :         appendStringInfoString(&cmd, " TEMPORARY");
     887             : 
     888         252 :     if (conn->logical)
     889             :     {
     890         252 :         appendStringInfoString(&cmd, " LOGICAL pgoutput ");
     891         252 :         if (use_new_options_syntax)
     892         252 :             appendStringInfoChar(&cmd, '(');
     893         252 :         if (two_phase)
     894             :         {
     895           2 :             appendStringInfoString(&cmd, "TWO_PHASE");
     896           2 :             if (use_new_options_syntax)
     897           2 :                 appendStringInfoString(&cmd, ", ");
     898             :             else
     899           0 :                 appendStringInfoChar(&cmd, ' ');
     900             :         }
     901             : 
     902         252 :         if (use_new_options_syntax)
     903             :         {
     904         252 :             switch (snapshot_action)
     905             :             {
     906           0 :                 case CRS_EXPORT_SNAPSHOT:
     907           0 :                     appendStringInfoString(&cmd, "SNAPSHOT 'export'");
     908           0 :                     break;
     909          82 :                 case CRS_NOEXPORT_SNAPSHOT:
     910          82 :                     appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
     911          82 :                     break;
     912         170 :                 case CRS_USE_SNAPSHOT:
     913         170 :                     appendStringInfoString(&cmd, "SNAPSHOT 'use'");
     914         170 :                     break;
     915             :             }
     916         252 :         }
     917             :         else
     918             :         {
     919           0 :             switch (snapshot_action)
     920             :             {
     921           0 :                 case CRS_EXPORT_SNAPSHOT:
     922           0 :                     appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
     923           0 :                     break;
     924           0 :                 case CRS_NOEXPORT_SNAPSHOT:
     925           0 :                     appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
     926           0 :                     break;
     927           0 :                 case CRS_USE_SNAPSHOT:
     928           0 :                     appendStringInfoString(&cmd, "USE_SNAPSHOT");
     929           0 :                     break;
     930             :             }
     931         252 :         }
     932             : 
     933         252 :         if (use_new_options_syntax)
     934         252 :             appendStringInfoChar(&cmd, ')');
     935             :     }
     936             :     else
     937             :     {
     938           0 :         if (use_new_options_syntax)
     939           0 :             appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
     940             :         else
     941           0 :             appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
     942             :     }
     943             : 
     944         252 :     res = libpqrcv_PQexec(conn->streamConn, cmd.data);
     945         252 :     pfree(cmd.data);
     946             : 
     947         252 :     if (PQresultStatus(res) != PGRES_TUPLES_OK)
     948             :     {
     949           0 :         PQclear(res);
     950           0 :         ereport(ERROR,
     951             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     952             :                  errmsg("could not create replication slot \"%s\": %s",
     953             :                         slotname, pchomp(PQerrorMessage(conn->streamConn)))));
     954             :     }
     955             : 
     956         252 :     if (lsn)
     957         170 :         *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
     958             :                                                    CStringGetDatum(PQgetvalue(res, 0, 1))));
     959             : 
     960         252 :     if (!PQgetisnull(res, 0, 2))
     961           0 :         snapshot = pstrdup(PQgetvalue(res, 0, 2));
     962             :     else
     963         252 :         snapshot = NULL;
     964             : 
     965         252 :     PQclear(res);
     966             : 
     967         252 :     return snapshot;
     968             : }
     969             : 
     970             : /*
     971             :  * Return PID of remote backend process.
     972             :  */
     973             : static pid_t
     974           0 : libpqrcv_get_backend_pid(WalReceiverConn *conn)
     975             : {
     976           0 :     return PQbackendPID(conn->streamConn);
     977             : }
     978             : 
     979             : /*
     980             :  * Convert tuple query result to tuplestore.
     981             :  */
     982             : static void
     983         448 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
     984             :                        const int nRetTypes, const Oid *retTypes)
     985             : {
     986             :     int         tupn;
     987             :     int         coln;
     988         448 :     int         nfields = PQnfields(pgres);
     989             :     HeapTuple   tuple;
     990             :     AttInMetadata *attinmeta;
     991             :     MemoryContext rowcontext;
     992             :     MemoryContext oldcontext;
     993             : 
     994             :     /* Make sure we got expected number of fields. */
     995         448 :     if (nfields != nRetTypes)
     996           0 :         ereport(ERROR,
     997             :                 (errcode(ERRCODE_PROTOCOL_VIOLATION),
     998             :                  errmsg("invalid query response"),
     999             :                  errdetail("Expected %d fields, got %d fields.",
    1000             :                            nRetTypes, nfields)));
    1001             : 
    1002         448 :     walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1003             : 
    1004             :     /* Create tuple descriptor corresponding to expected result. */
    1005         448 :     walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
    1006        1684 :     for (coln = 0; coln < nRetTypes; coln++)
    1007        1236 :         TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
    1008        1236 :                            PQfname(pgres, coln), retTypes[coln], -1, 0);
    1009         448 :     attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
    1010             : 
    1011             :     /* No point in doing more here if there were no tuples returned. */
    1012         448 :     if (PQntuples(pgres) == 0)
    1013           2 :         return;
    1014             : 
    1015             :     /* Create temporary context for local allocations. */
    1016         446 :     rowcontext = AllocSetContextCreate(CurrentMemoryContext,
    1017             :                                        "libpqrcv query result context",
    1018             :                                        ALLOCSET_DEFAULT_SIZES);
    1019             : 
    1020             :     /* Process returned rows. */
    1021        1124 :     for (tupn = 0; tupn < PQntuples(pgres); tupn++)
    1022             :     {
    1023             :         char       *cstrs[MaxTupleAttributeNumber];
    1024             : 
    1025         678 :         ProcessWalRcvInterrupts();
    1026             : 
    1027             :         /* Do the allocations in temporary context. */
    1028         678 :         oldcontext = MemoryContextSwitchTo(rowcontext);
    1029             : 
    1030             :         /*
    1031             :          * Fill cstrs with null-terminated strings of column values.
    1032             :          */
    1033        2486 :         for (coln = 0; coln < nfields; coln++)
    1034             :         {
    1035        1808 :             if (PQgetisnull(pgres, tupn, coln))
    1036          60 :                 cstrs[coln] = NULL;
    1037             :             else
    1038        1748 :                 cstrs[coln] = PQgetvalue(pgres, tupn, coln);
    1039             :         }
    1040             : 
    1041             :         /* Convert row to a tuple, and add it to the tuplestore */
    1042         678 :         tuple = BuildTupleFromCStrings(attinmeta, cstrs);
    1043         678 :         tuplestore_puttuple(walres->tuplestore, tuple);
    1044             : 
    1045             :         /* Clean up */
    1046         678 :         MemoryContextSwitchTo(oldcontext);
    1047         678 :         MemoryContextReset(rowcontext);
    1048             :     }
    1049             : 
    1050         446 :     MemoryContextDelete(rowcontext);
    1051             : }
    1052             : 
    1053             : /*
    1054             :  * Public interface for sending generic queries (and commands).
    1055             :  *
    1056             :  * This can only be called from process connected to database.
    1057             :  */
    1058             : static WalRcvExecResult *
    1059        1152 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
    1060             :               const int nRetTypes, const Oid *retTypes)
    1061             : {
    1062        1152 :     PGresult   *pgres = NULL;
    1063        1152 :     WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
    1064             :     char       *diag_sqlstate;
    1065             : 
    1066        1152 :     if (MyDatabaseId == InvalidOid)
    1067           0 :         ereport(ERROR,
    1068             :                 (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
    1069             :                  errmsg("the query interface requires a database connection")));
    1070             : 
    1071        1152 :     pgres = libpqrcv_PQexec(conn->streamConn, query);
    1072             : 
    1073        1152 :     switch (PQresultStatus(pgres))
    1074             :     {
    1075         448 :         case PGRES_SINGLE_TUPLE:
    1076             :         case PGRES_TUPLES_OK:
    1077         448 :             walres->status = WALRCV_OK_TUPLES;
    1078         448 :             libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
    1079         448 :             break;
    1080             : 
    1081           0 :         case PGRES_COPY_IN:
    1082           0 :             walres->status = WALRCV_OK_COPY_IN;
    1083           0 :             break;
    1084             : 
    1085         170 :         case PGRES_COPY_OUT:
    1086         170 :             walres->status = WALRCV_OK_COPY_OUT;
    1087         170 :             break;
    1088             : 
    1089           0 :         case PGRES_COPY_BOTH:
    1090           0 :             walres->status = WALRCV_OK_COPY_BOTH;
    1091           0 :             break;
    1092             : 
    1093         534 :         case PGRES_COMMAND_OK:
    1094         534 :             walres->status = WALRCV_OK_COMMAND;
    1095         534 :             break;
    1096             : 
    1097             :             /* Empty query is considered error. */
    1098           0 :         case PGRES_EMPTY_QUERY:
    1099           0 :             walres->status = WALRCV_ERROR;
    1100           0 :             walres->err = _("empty query");
    1101           0 :             break;
    1102             : 
    1103           0 :         case PGRES_PIPELINE_SYNC:
    1104             :         case PGRES_PIPELINE_ABORTED:
    1105           0 :             walres->status = WALRCV_ERROR;
    1106           0 :             walres->err = _("unexpected pipeline mode");
    1107           0 :             break;
    1108             : 
    1109           0 :         case PGRES_NONFATAL_ERROR:
    1110             :         case PGRES_FATAL_ERROR:
    1111             :         case PGRES_BAD_RESPONSE:
    1112           0 :             walres->status = WALRCV_ERROR;
    1113           0 :             walres->err = pchomp(PQerrorMessage(conn->streamConn));
    1114           0 :             diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
    1115           0 :             if (diag_sqlstate)
    1116           0 :                 walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
    1117             :                                                  diag_sqlstate[1],
    1118             :                                                  diag_sqlstate[2],
    1119             :                                                  diag_sqlstate[3],
    1120             :                                                  diag_sqlstate[4]);
    1121           0 :             break;
    1122             :     }
    1123             : 
    1124        1152 :     PQclear(pgres);
    1125             : 
    1126        1152 :     return walres;
    1127             : }
    1128             : 
    1129             : /*
    1130             :  * Given a List of strings, return it as single comma separated
    1131             :  * string, quoting identifiers as needed.
    1132             :  *
    1133             :  * This is essentially the reverse of SplitIdentifierString.
    1134             :  *
    1135             :  * The caller should free the result.
    1136             :  */
    1137             : static char *
    1138         282 : stringlist_to_identifierstr(PGconn *conn, List *strings)
    1139             : {
    1140             :     ListCell   *lc;
    1141             :     StringInfoData res;
    1142         282 :     bool        first = true;
    1143             : 
    1144         282 :     initStringInfo(&res);
    1145             : 
    1146         600 :     foreach(lc, strings)
    1147             :     {
    1148         318 :         char       *val = strVal(lfirst(lc));
    1149             :         char       *val_escaped;
    1150             : 
    1151         318 :         if (first)
    1152         282 :             first = false;
    1153             :         else
    1154          36 :             appendStringInfoChar(&res, ',');
    1155             : 
    1156         318 :         val_escaped = PQescapeIdentifier(conn, val, strlen(val));
    1157         318 :         if (!val_escaped)
    1158             :         {
    1159           0 :             free(res.data);
    1160           0 :             return NULL;
    1161             :         }
    1162         318 :         appendStringInfoString(&res, val_escaped);
    1163         318 :         PQfreemem(val_escaped);
    1164             :     }
    1165             : 
    1166         282 :     return res.data;
    1167             : }

Generated by: LCOV version 1.14