LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 988 1137 86.9 %
Date: 2025-04-24 12:15:10 Functions: 79 81 97.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * dblink.c
       3             :  *
       4             :  * Functions returning results from a remote database
       5             :  *
       6             :  * Joe Conway <mail@joeconway.com>
       7             :  * And contributors:
       8             :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9             :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10             :  *
      11             :  * contrib/dblink/dblink.c
      12             :  * Copyright (c) 2001-2025, PostgreSQL Global Development Group
      13             :  * ALL RIGHTS RESERVED;
      14             :  *
      15             :  * Permission to use, copy, modify, and distribute this software and its
      16             :  * documentation for any purpose, without fee, and without a written agreement
      17             :  * is hereby granted, provided that the above copyright notice and this
      18             :  * paragraph and the following two paragraphs appear in all copies.
      19             :  *
      20             :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21             :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22             :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23             :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24             :  * POSSIBILITY OF SUCH DAMAGE.
      25             :  *
      26             :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27             :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28             :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29             :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30             :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31             :  *
      32             :  */
      33             : #include "postgres.h"
      34             : 
      35             : #include <limits.h>
      36             : 
      37             : #include "access/htup_details.h"
      38             : #include "access/relation.h"
      39             : #include "access/reloptions.h"
      40             : #include "access/table.h"
      41             : #include "catalog/namespace.h"
      42             : #include "catalog/pg_foreign_data_wrapper.h"
      43             : #include "catalog/pg_foreign_server.h"
      44             : #include "catalog/pg_type.h"
      45             : #include "catalog/pg_user_mapping.h"
      46             : #include "commands/defrem.h"
      47             : #include "common/base64.h"
      48             : #include "executor/spi.h"
      49             : #include "foreign/foreign.h"
      50             : #include "funcapi.h"
      51             : #include "lib/stringinfo.h"
      52             : #include "libpq-fe.h"
      53             : #include "libpq/libpq-be.h"
      54             : #include "libpq/libpq-be-fe-helpers.h"
      55             : #include "mb/pg_wchar.h"
      56             : #include "miscadmin.h"
      57             : #include "parser/scansup.h"
      58             : #include "utils/acl.h"
      59             : #include "utils/builtins.h"
      60             : #include "utils/fmgroids.h"
      61             : #include "utils/guc.h"
      62             : #include "utils/lsyscache.h"
      63             : #include "utils/memutils.h"
      64             : #include "utils/rel.h"
      65             : #include "utils/varlena.h"
      66             : #include "utils/wait_event.h"
      67             : 
      68          34 : PG_MODULE_MAGIC_EXT(
      69             :                     .name = "dblink",
      70             :                     .version = PG_VERSION
      71             : );
      72             : 
      73             : typedef struct remoteConn
      74             : {
      75             :     PGconn     *conn;           /* Hold the remote connection */
      76             :     int         openCursorCount;    /* The number of open cursors */
      77             :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      78             : } remoteConn;
      79             : 
      80             : typedef struct storeInfo
      81             : {
      82             :     FunctionCallInfo fcinfo;
      83             :     Tuplestorestate *tuplestore;
      84             :     AttInMetadata *attinmeta;
      85             :     MemoryContext tmpcontext;
      86             :     char      **cstrs;
      87             :     /* temp storage for results to avoid leaks on exception */
      88             :     PGresult   *last_res;
      89             :     PGresult   *cur_res;
      90             : } storeInfo;
      91             : 
      92             : /*
      93             :  * Internal declarations
      94             :  */
      95             : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      96             : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      97             : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      98             :                               PGresult *res);
      99             : static void materializeQueryResult(FunctionCallInfo fcinfo,
     100             :                                    PGconn *conn,
     101             :                                    const char *conname,
     102             :                                    const char *sql,
     103             :                                    bool fail);
     104             : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
     105             : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
     106             : static remoteConn *getConnectionByName(const char *name);
     107             : static HTAB *createConnHash(void);
     108             : static void createNewConnection(const char *name, remoteConn *rconn);
     109             : static void deleteConnection(const char *name);
     110             : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     111             : static char **get_text_array_contents(ArrayType *array, int *numitems);
     112             : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     113             : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     114             : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     115             : static char *quote_ident_cstr(char *rawstr);
     116             : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     117             : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     118             : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     119             : static char *generate_relation_name(Relation rel);
     120             : static void dblink_connstr_check(const char *connstr);
     121             : static bool dblink_connstr_has_pw(const char *connstr);
     122             : static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
     123             : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     124             :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     125             : static char *get_connect_string(const char *servername);
     126             : static char *escape_param_str(const char *str);
     127             : static void validate_pkattnums(Relation rel,
     128             :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     129             :                                int **pkattnums, int *pknumatts);
     130             : static bool is_valid_dblink_option(const PQconninfoOption *options,
     131             :                                    const char *option, Oid context);
     132             : static int  applyRemoteGucs(PGconn *conn);
     133             : static void restoreLocalGucs(int nestlevel);
     134             : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
     135             : static void appendSCRAMKeysInfo(StringInfo buf);
     136             : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
     137             :                                        Oid context);
     138             : static bool dblink_connstr_has_required_scram_options(const char *connstr);
     139             : 
     140             : /* Global */
     141             : static remoteConn *pconn = NULL;
     142             : static HTAB *remoteConnHash = NULL;
     143             : 
     144             : /* custom wait event values, retrieved from shared memory */
     145             : static uint32 dblink_we_connect = 0;
     146             : static uint32 dblink_we_get_conn = 0;
     147             : static uint32 dblink_we_get_result = 0;
     148             : 
     149             : /*
     150             :  *  Following is list that holds multiple remote connections.
     151             :  *  Calling convention of each dblink function changes to accept
     152             :  *  connection name as the first parameter. The connection list is
     153             :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     154             :  */
     155             : 
     156             : typedef struct remoteConnHashEnt
     157             : {
     158             :     char        name[NAMEDATALEN];
     159             :     remoteConn *rconn;
     160             : } remoteConnHashEnt;
     161             : 
     162             : /* initial number of connection hashes */
     163             : #define NUMCONN 16
     164             : 
     165             : static char *
     166          96 : xpstrdup(const char *in)
     167             : {
     168          96 :     if (in == NULL)
     169          72 :         return NULL;
     170          24 :     return pstrdup(in);
     171             : }
     172             : 
     173             : pg_noreturn static void
     174           0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     175             : {
     176           0 :     char       *msg = pchomp(PQerrorMessage(conn));
     177             : 
     178           0 :     PQclear(res);
     179           0 :     elog(ERROR, "%s: %s", p2, msg);
     180             : }
     181             : 
     182             : pg_noreturn static void
     183           6 : dblink_conn_not_avail(const char *conname)
     184             : {
     185           6 :     if (conname)
     186           2 :         ereport(ERROR,
     187             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     188             :                  errmsg("connection \"%s\" not available", conname)));
     189             :     else
     190           4 :         ereport(ERROR,
     191             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     192             :                  errmsg("connection not available")));
     193             : }
     194             : 
     195             : static void
     196          74 : dblink_get_conn(char *conname_or_str,
     197             :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     198             : {
     199          74 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     200             :     PGconn     *conn;
     201             :     char       *conname;
     202             :     bool        freeconn;
     203             : 
     204          74 :     if (rconn)
     205             :     {
     206          54 :         conn = rconn->conn;
     207          54 :         conname = conname_or_str;
     208          54 :         freeconn = false;
     209             :     }
     210             :     else
     211             :     {
     212             :         const char *connstr;
     213             : 
     214          20 :         connstr = get_connect_string(conname_or_str);
     215          20 :         if (connstr == NULL)
     216          12 :             connstr = conname_or_str;
     217          20 :         dblink_connstr_check(connstr);
     218             : 
     219             :         /* first time, allocate or get the custom wait event */
     220          18 :         if (dblink_we_get_conn == 0)
     221          10 :             dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
     222             : 
     223             :         /* OK to make connection */
     224          18 :         conn = libpqsrv_connect(connstr, dblink_we_get_conn);
     225             : 
     226          18 :         if (PQstatus(conn) == CONNECTION_BAD)
     227             :         {
     228           6 :             char       *msg = pchomp(PQerrorMessage(conn));
     229             : 
     230           6 :             libpqsrv_disconnect(conn);
     231           6 :             ereport(ERROR,
     232             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     233             :                      errmsg("could not establish connection"),
     234             :                      errdetail_internal("%s", msg)));
     235             :         }
     236          12 :         dblink_security_check(conn, rconn, connstr);
     237          12 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     238           0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     239          12 :         freeconn = true;
     240          12 :         conname = NULL;
     241             :     }
     242             : 
     243          66 :     *conn_p = conn;
     244          66 :     *conname_p = conname;
     245          66 :     *freeconn_p = freeconn;
     246          66 : }
     247             : 
     248             : static PGconn *
     249          34 : dblink_get_named_conn(const char *conname)
     250             : {
     251          34 :     remoteConn *rconn = getConnectionByName(conname);
     252             : 
     253          34 :     if (rconn)
     254          34 :         return rconn->conn;
     255             : 
     256           0 :     dblink_conn_not_avail(conname);
     257             :     return NULL;                /* keep compiler quiet */
     258             : }
     259             : 
     260             : static void
     261         248 : dblink_init(void)
     262             : {
     263         248 :     if (!pconn)
     264             :     {
     265          14 :         if (dblink_we_get_result == 0)
     266          14 :             dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
     267             : 
     268          14 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     269          14 :         pconn->conn = NULL;
     270          14 :         pconn->openCursorCount = 0;
     271          14 :         pconn->newXactForCursor = false;
     272             :     }
     273         248 : }
     274             : 
     275             : /*
     276             :  * Create a persistent connection to another database
     277             :  */
     278          26 : PG_FUNCTION_INFO_V1(dblink_connect);
     279             : Datum
     280          32 : dblink_connect(PG_FUNCTION_ARGS)
     281             : {
     282          32 :     char       *conname_or_str = NULL;
     283          32 :     char       *connstr = NULL;
     284          32 :     char       *connname = NULL;
     285             :     char       *msg;
     286          32 :     PGconn     *conn = NULL;
     287          32 :     remoteConn *rconn = NULL;
     288             : 
     289          32 :     dblink_init();
     290             : 
     291          32 :     if (PG_NARGS() == 2)
     292             :     {
     293          22 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     294          22 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     295             :     }
     296          10 :     else if (PG_NARGS() == 1)
     297          10 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     298             : 
     299          32 :     if (connname)
     300             :     {
     301          22 :         rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
     302             :                                                   sizeof(remoteConn));
     303          22 :         rconn->conn = NULL;
     304          22 :         rconn->openCursorCount = 0;
     305          22 :         rconn->newXactForCursor = false;
     306             :     }
     307             : 
     308             :     /* first check for valid foreign data server */
     309          32 :     connstr = get_connect_string(conname_or_str);
     310          32 :     if (connstr == NULL)
     311          28 :         connstr = conname_or_str;
     312             : 
     313             :     /* check password in connection string if not superuser */
     314          32 :     dblink_connstr_check(connstr);
     315             : 
     316             :     /* first time, allocate or get the custom wait event */
     317          30 :     if (dblink_we_connect == 0)
     318           4 :         dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
     319             : 
     320             :     /* OK to make connection */
     321          30 :     conn = libpqsrv_connect(connstr, dblink_we_connect);
     322             : 
     323          30 :     if (PQstatus(conn) == CONNECTION_BAD)
     324             :     {
     325           0 :         msg = pchomp(PQerrorMessage(conn));
     326           0 :         libpqsrv_disconnect(conn);
     327           0 :         if (rconn)
     328           0 :             pfree(rconn);
     329             : 
     330           0 :         ereport(ERROR,
     331             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     332             :                  errmsg("could not establish connection"),
     333             :                  errdetail_internal("%s", msg)));
     334             :     }
     335             : 
     336             :     /* check password actually used if not superuser */
     337          30 :     dblink_security_check(conn, rconn, connstr);
     338             : 
     339             :     /* attempt to set client encoding to match server encoding, if needed */
     340          30 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     341           0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     342             : 
     343          30 :     if (connname)
     344             :     {
     345          20 :         rconn->conn = conn;
     346          20 :         createNewConnection(connname, rconn);
     347             :     }
     348             :     else
     349             :     {
     350          10 :         if (pconn->conn)
     351           2 :             libpqsrv_disconnect(pconn->conn);
     352          10 :         pconn->conn = conn;
     353             :     }
     354             : 
     355          28 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     356             : }
     357             : 
     358             : /*
     359             :  * Clear a persistent connection to another database
     360             :  */
     361          16 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     362             : Datum
     363          26 : dblink_disconnect(PG_FUNCTION_ARGS)
     364             : {
     365          26 :     char       *conname = NULL;
     366          26 :     remoteConn *rconn = NULL;
     367          26 :     PGconn     *conn = NULL;
     368             : 
     369          26 :     dblink_init();
     370             : 
     371          26 :     if (PG_NARGS() == 1)
     372             :     {
     373          18 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     374          18 :         rconn = getConnectionByName(conname);
     375          18 :         if (rconn)
     376          16 :             conn = rconn->conn;
     377             :     }
     378             :     else
     379           8 :         conn = pconn->conn;
     380             : 
     381          26 :     if (!conn)
     382           2 :         dblink_conn_not_avail(conname);
     383             : 
     384          24 :     libpqsrv_disconnect(conn);
     385          24 :     if (rconn)
     386             :     {
     387          16 :         deleteConnection(conname);
     388          16 :         pfree(rconn);
     389             :     }
     390             :     else
     391           8 :         pconn->conn = NULL;
     392             : 
     393          24 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     394             : }
     395             : 
     396             : /*
     397             :  * opens a cursor using a persistent connection
     398             :  */
     399          26 : PG_FUNCTION_INFO_V1(dblink_open);
     400             : Datum
     401          18 : dblink_open(PG_FUNCTION_ARGS)
     402             : {
     403          18 :     PGresult   *res = NULL;
     404             :     PGconn     *conn;
     405          18 :     char       *curname = NULL;
     406          18 :     char       *sql = NULL;
     407          18 :     char       *conname = NULL;
     408             :     StringInfoData buf;
     409          18 :     remoteConn *rconn = NULL;
     410          18 :     bool        fail = true;    /* default to backward compatible behavior */
     411             : 
     412          18 :     dblink_init();
     413          18 :     initStringInfo(&buf);
     414             : 
     415          18 :     if (PG_NARGS() == 2)
     416             :     {
     417             :         /* text,text */
     418           4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     419           4 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     420           4 :         rconn = pconn;
     421             :     }
     422          14 :     else if (PG_NARGS() == 3)
     423             :     {
     424             :         /* might be text,text,text or text,text,bool */
     425          12 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     426             :         {
     427           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     428           2 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     429           2 :             fail = PG_GETARG_BOOL(2);
     430           2 :             rconn = pconn;
     431             :         }
     432             :         else
     433             :         {
     434          10 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     435          10 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     436          10 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     437          10 :             rconn = getConnectionByName(conname);
     438             :         }
     439             :     }
     440           2 :     else if (PG_NARGS() == 4)
     441             :     {
     442             :         /* text,text,text,bool */
     443           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     444           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     445           2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     446           2 :         fail = PG_GETARG_BOOL(3);
     447           2 :         rconn = getConnectionByName(conname);
     448             :     }
     449             : 
     450          18 :     if (!rconn || !rconn->conn)
     451           0 :         dblink_conn_not_avail(conname);
     452             : 
     453          18 :     conn = rconn->conn;
     454             : 
     455             :     /* If we are not in a transaction, start one */
     456          18 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     457             :     {
     458          14 :         res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
     459          14 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     460           0 :             dblink_res_internalerror(conn, res, "begin error");
     461          14 :         PQclear(res);
     462          14 :         rconn->newXactForCursor = true;
     463             : 
     464             :         /*
     465             :          * Since transaction state was IDLE, we force cursor count to
     466             :          * initially be 0. This is needed as a previous ABORT might have wiped
     467             :          * out our transaction without maintaining the cursor count for us.
     468             :          */
     469          14 :         rconn->openCursorCount = 0;
     470             :     }
     471             : 
     472             :     /* if we started a transaction, increment cursor count */
     473          18 :     if (rconn->newXactForCursor)
     474          18 :         (rconn->openCursorCount)++;
     475             : 
     476          18 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     477          18 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     478          18 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     479             :     {
     480           4 :         dblink_res_error(conn, conname, res, fail,
     481             :                          "while opening cursor \"%s\"", curname);
     482           4 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     483             :     }
     484             : 
     485          14 :     PQclear(res);
     486          14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     487             : }
     488             : 
     489             : /*
     490             :  * closes a cursor
     491             :  */
     492          20 : PG_FUNCTION_INFO_V1(dblink_close);
     493             : Datum
     494          10 : dblink_close(PG_FUNCTION_ARGS)
     495             : {
     496             :     PGconn     *conn;
     497          10 :     PGresult   *res = NULL;
     498          10 :     char       *curname = NULL;
     499          10 :     char       *conname = NULL;
     500             :     StringInfoData buf;
     501          10 :     remoteConn *rconn = NULL;
     502          10 :     bool        fail = true;    /* default to backward compatible behavior */
     503             : 
     504          10 :     dblink_init();
     505          10 :     initStringInfo(&buf);
     506             : 
     507          10 :     if (PG_NARGS() == 1)
     508             :     {
     509             :         /* text */
     510           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     511           0 :         rconn = pconn;
     512             :     }
     513          10 :     else if (PG_NARGS() == 2)
     514             :     {
     515             :         /* might be text,text or text,bool */
     516          10 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     517             :         {
     518           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     519           4 :             fail = PG_GETARG_BOOL(1);
     520           4 :             rconn = pconn;
     521             :         }
     522             :         else
     523             :         {
     524           6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     525           6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     526           6 :             rconn = getConnectionByName(conname);
     527             :         }
     528             :     }
     529          10 :     if (PG_NARGS() == 3)
     530             :     {
     531             :         /* text,text,bool */
     532           0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     533           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     534           0 :         fail = PG_GETARG_BOOL(2);
     535           0 :         rconn = getConnectionByName(conname);
     536             :     }
     537             : 
     538          10 :     if (!rconn || !rconn->conn)
     539           0 :         dblink_conn_not_avail(conname);
     540             : 
     541          10 :     conn = rconn->conn;
     542             : 
     543          10 :     appendStringInfo(&buf, "CLOSE %s", curname);
     544             : 
     545             :     /* close the cursor */
     546          10 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     547          10 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     548             :     {
     549           2 :         dblink_res_error(conn, conname, res, fail,
     550             :                          "while closing cursor \"%s\"", curname);
     551           2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     552             :     }
     553             : 
     554           8 :     PQclear(res);
     555             : 
     556             :     /* if we started a transaction, decrement cursor count */
     557           8 :     if (rconn->newXactForCursor)
     558             :     {
     559           8 :         (rconn->openCursorCount)--;
     560             : 
     561             :         /* if count is zero, commit the transaction */
     562           8 :         if (rconn->openCursorCount == 0)
     563             :         {
     564           4 :             rconn->newXactForCursor = false;
     565             : 
     566           4 :             res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
     567           4 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     568           0 :                 dblink_res_internalerror(conn, res, "commit error");
     569           4 :             PQclear(res);
     570             :         }
     571             :     }
     572             : 
     573           8 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     574             : }
     575             : 
     576             : /*
     577             :  * Fetch results from an open cursor
     578             :  */
     579          26 : PG_FUNCTION_INFO_V1(dblink_fetch);
     580             : Datum
     581          26 : dblink_fetch(PG_FUNCTION_ARGS)
     582             : {
     583          26 :     PGresult   *res = NULL;
     584          26 :     char       *conname = NULL;
     585          26 :     remoteConn *rconn = NULL;
     586          26 :     PGconn     *conn = NULL;
     587             :     StringInfoData buf;
     588          26 :     char       *curname = NULL;
     589          26 :     int         howmany = 0;
     590          26 :     bool        fail = true;    /* default to backward compatible */
     591             : 
     592          26 :     prepTuplestoreResult(fcinfo);
     593             : 
     594          26 :     dblink_init();
     595             : 
     596          26 :     if (PG_NARGS() == 4)
     597             :     {
     598             :         /* text,text,int,bool */
     599           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     600           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     601           2 :         howmany = PG_GETARG_INT32(2);
     602           2 :         fail = PG_GETARG_BOOL(3);
     603             : 
     604           2 :         rconn = getConnectionByName(conname);
     605           2 :         if (rconn)
     606           2 :             conn = rconn->conn;
     607             :     }
     608          24 :     else if (PG_NARGS() == 3)
     609             :     {
     610             :         /* text,text,int or text,int,bool */
     611          16 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     612             :         {
     613           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     614           4 :             howmany = PG_GETARG_INT32(1);
     615           4 :             fail = PG_GETARG_BOOL(2);
     616           4 :             conn = pconn->conn;
     617             :         }
     618             :         else
     619             :         {
     620          12 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     621          12 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     622          12 :             howmany = PG_GETARG_INT32(2);
     623             : 
     624          12 :             rconn = getConnectionByName(conname);
     625          12 :             if (rconn)
     626          12 :                 conn = rconn->conn;
     627             :         }
     628             :     }
     629           8 :     else if (PG_NARGS() == 2)
     630             :     {
     631             :         /* text,int */
     632           8 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     633           8 :         howmany = PG_GETARG_INT32(1);
     634           8 :         conn = pconn->conn;
     635             :     }
     636             : 
     637          26 :     if (!conn)
     638           0 :         dblink_conn_not_avail(conname);
     639             : 
     640          26 :     initStringInfo(&buf);
     641          26 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     642             : 
     643             :     /*
     644             :      * Try to execute the query.  Note that since libpq uses malloc, the
     645             :      * PGresult will be long-lived even though we are still in a short-lived
     646             :      * memory context.
     647             :      */
     648          26 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     649          52 :     if (!res ||
     650          52 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     651          26 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     652             :     {
     653          10 :         dblink_res_error(conn, conname, res, fail,
     654             :                          "while fetching from cursor \"%s\"", curname);
     655           6 :         return (Datum) 0;
     656             :     }
     657          16 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     658             :     {
     659             :         /* cursor does not exist - closed already or bad name */
     660           0 :         PQclear(res);
     661           0 :         ereport(ERROR,
     662             :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     663             :                  errmsg("cursor \"%s\" does not exist", curname)));
     664             :     }
     665             : 
     666          16 :     materializeResult(fcinfo, conn, res);
     667          14 :     return (Datum) 0;
     668             : }
     669             : 
     670             : /*
     671             :  * Note: this is the new preferred version of dblink
     672             :  */
     673          38 : PG_FUNCTION_INFO_V1(dblink_record);
     674             : Datum
     675          58 : dblink_record(PG_FUNCTION_ARGS)
     676             : {
     677          58 :     return dblink_record_internal(fcinfo, false);
     678             : }
     679             : 
     680           8 : PG_FUNCTION_INFO_V1(dblink_send_query);
     681             : Datum
     682          12 : dblink_send_query(PG_FUNCTION_ARGS)
     683             : {
     684             :     PGconn     *conn;
     685             :     char       *sql;
     686             :     int         retval;
     687             : 
     688          12 :     if (PG_NARGS() == 2)
     689             :     {
     690          12 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     691          12 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     692             :     }
     693             :     else
     694             :         /* shouldn't happen */
     695           0 :         elog(ERROR, "wrong number of arguments");
     696             : 
     697             :     /* async query send */
     698          12 :     retval = PQsendQuery(conn, sql);
     699          12 :     if (retval != 1)
     700           0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     701             : 
     702          12 :     PG_RETURN_INT32(retval);
     703             : }
     704             : 
     705          12 : PG_FUNCTION_INFO_V1(dblink_get_result);
     706             : Datum
     707          16 : dblink_get_result(PG_FUNCTION_ARGS)
     708             : {
     709          16 :     return dblink_record_internal(fcinfo, true);
     710             : }
     711             : 
     712             : static Datum
     713          74 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     714             : {
     715          74 :     PGconn     *volatile conn = NULL;
     716          74 :     volatile bool freeconn = false;
     717             : 
     718          74 :     prepTuplestoreResult(fcinfo);
     719             : 
     720          74 :     dblink_init();
     721             : 
     722          74 :     PG_TRY();
     723             :     {
     724          74 :         char       *sql = NULL;
     725          74 :         char       *conname = NULL;
     726          74 :         bool        fail = true;    /* default to backward compatible */
     727             : 
     728          74 :         if (!is_async)
     729             :         {
     730          58 :             if (PG_NARGS() == 3)
     731             :             {
     732             :                 /* text,text,bool */
     733           2 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     734           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     735           2 :                 fail = PG_GETARG_BOOL(2);
     736           2 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     737             :             }
     738          56 :             else if (PG_NARGS() == 2)
     739             :             {
     740             :                 /* text,text or text,bool */
     741          42 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     742             :                 {
     743           2 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     744           2 :                     fail = PG_GETARG_BOOL(1);
     745           2 :                     conn = pconn->conn;
     746             :                 }
     747             :                 else
     748             :                 {
     749          40 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     750          40 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     751          40 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     752             :                 }
     753             :             }
     754          14 :             else if (PG_NARGS() == 1)
     755             :             {
     756             :                 /* text */
     757          14 :                 conn = pconn->conn;
     758          14 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     759             :             }
     760             :             else
     761             :                 /* shouldn't happen */
     762           0 :                 elog(ERROR, "wrong number of arguments");
     763             :         }
     764             :         else                    /* is_async */
     765             :         {
     766             :             /* get async result */
     767          16 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     768             : 
     769          16 :             if (PG_NARGS() == 2)
     770             :             {
     771             :                 /* text,bool */
     772           0 :                 fail = PG_GETARG_BOOL(1);
     773           0 :                 conn = dblink_get_named_conn(conname);
     774             :             }
     775          16 :             else if (PG_NARGS() == 1)
     776             :             {
     777             :                 /* text */
     778          16 :                 conn = dblink_get_named_conn(conname);
     779             :             }
     780             :             else
     781             :                 /* shouldn't happen */
     782           0 :                 elog(ERROR, "wrong number of arguments");
     783             :         }
     784             : 
     785          66 :         if (!conn)
     786           4 :             dblink_conn_not_avail(conname);
     787             : 
     788          62 :         if (!is_async)
     789             :         {
     790             :             /* synchronous query, use efficient tuple collection method */
     791          46 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     792             :         }
     793             :         else
     794             :         {
     795             :             /* async result retrieval, do it the old way */
     796          16 :             PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
     797             : 
     798             :             /* NULL means we're all done with the async results */
     799          16 :             if (res)
     800             :             {
     801          10 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     802          10 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     803             :                 {
     804           0 :                     dblink_res_error(conn, conname, res, fail,
     805             :                                      "while executing query");
     806             :                     /* if fail isn't set, we'll return an empty query result */
     807             :                 }
     808             :                 else
     809             :                 {
     810          10 :                     materializeResult(fcinfo, conn, res);
     811             :                 }
     812             :             }
     813             :         }
     814             :     }
     815          12 :     PG_FINALLY();
     816             :     {
     817             :         /* if needed, close the connection to the database */
     818          74 :         if (freeconn)
     819          10 :             libpqsrv_disconnect(conn);
     820             :     }
     821          74 :     PG_END_TRY();
     822             : 
     823          62 :     return (Datum) 0;
     824             : }
     825             : 
     826             : /*
     827             :  * Verify function caller can handle a tuplestore result, and set up for that.
     828             :  *
     829             :  * Note: if the caller returns without actually creating a tuplestore, the
     830             :  * executor will treat the function result as an empty set.
     831             :  */
     832             : static void
     833         100 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     834             : {
     835         100 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     836             : 
     837             :     /* check to see if query supports us returning a tuplestore */
     838         100 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     839           0 :         ereport(ERROR,
     840             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     841             :                  errmsg("set-valued function called in context that cannot accept a set")));
     842         100 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     843           0 :         ereport(ERROR,
     844             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     845             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     846             : 
     847             :     /* let the executor know we're sending back a tuplestore */
     848         100 :     rsinfo->returnMode = SFRM_Materialize;
     849             : 
     850             :     /* caller must fill these to return a non-empty result */
     851         100 :     rsinfo->setResult = NULL;
     852         100 :     rsinfo->setDesc = NULL;
     853         100 : }
     854             : 
     855             : /*
     856             :  * Copy the contents of the PGresult into a tuplestore to be returned
     857             :  * as the result of the current function.
     858             :  * The PGresult will be released in this function.
     859             :  */
     860             : static void
     861          26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     862             : {
     863          26 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     864             : 
     865             :     /* prepTuplestoreResult must have been called previously */
     866             :     Assert(rsinfo->returnMode == SFRM_Materialize);
     867             : 
     868          26 :     PG_TRY();
     869             :     {
     870             :         TupleDesc   tupdesc;
     871             :         bool        is_sql_cmd;
     872             :         int         ntuples;
     873             :         int         nfields;
     874             : 
     875          26 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     876             :         {
     877           0 :             is_sql_cmd = true;
     878             : 
     879             :             /*
     880             :              * need a tuple descriptor representing one TEXT column to return
     881             :              * the command status string as our result tuple
     882             :              */
     883           0 :             tupdesc = CreateTemplateTupleDesc(1);
     884           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     885             :                                TEXTOID, -1, 0);
     886           0 :             ntuples = 1;
     887           0 :             nfields = 1;
     888             :         }
     889             :         else
     890             :         {
     891             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     892             : 
     893          26 :             is_sql_cmd = false;
     894             : 
     895             :             /* get a tuple descriptor for our result type */
     896          26 :             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     897             :             {
     898          26 :                 case TYPEFUNC_COMPOSITE:
     899             :                     /* success */
     900          26 :                     break;
     901           0 :                 case TYPEFUNC_RECORD:
     902             :                     /* failed to determine actual type of RECORD */
     903           0 :                     ereport(ERROR,
     904             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     905             :                              errmsg("function returning record called in context "
     906             :                                     "that cannot accept type record")));
     907             :                     break;
     908           0 :                 default:
     909             :                     /* result type isn't composite */
     910           0 :                     elog(ERROR, "return type must be a row type");
     911             :                     break;
     912             :             }
     913             : 
     914             :             /* make sure we have a persistent copy of the tupdesc */
     915          26 :             tupdesc = CreateTupleDescCopy(tupdesc);
     916          26 :             ntuples = PQntuples(res);
     917          26 :             nfields = PQnfields(res);
     918             :         }
     919             : 
     920             :         /*
     921             :          * check result and tuple descriptor have the same number of columns
     922             :          */
     923          26 :         if (nfields != tupdesc->natts)
     924           0 :             ereport(ERROR,
     925             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
     926             :                      errmsg("remote query result rowtype does not match "
     927             :                             "the specified FROM clause rowtype")));
     928             : 
     929          26 :         if (ntuples > 0)
     930             :         {
     931             :             AttInMetadata *attinmeta;
     932          26 :             int         nestlevel = -1;
     933             :             Tuplestorestate *tupstore;
     934             :             MemoryContext oldcontext;
     935             :             int         row;
     936             :             char      **values;
     937             : 
     938          26 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
     939             : 
     940             :             /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     941          26 :             if (!is_sql_cmd)
     942          26 :                 nestlevel = applyRemoteGucs(conn);
     943             : 
     944          26 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     945          26 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
     946          26 :             rsinfo->setResult = tupstore;
     947          26 :             rsinfo->setDesc = tupdesc;
     948          26 :             MemoryContextSwitchTo(oldcontext);
     949             : 
     950          26 :             values = palloc_array(char *, nfields);
     951             : 
     952             :             /* put all tuples into the tuplestore */
     953          98 :             for (row = 0; row < ntuples; row++)
     954             :             {
     955             :                 HeapTuple   tuple;
     956             : 
     957          74 :                 if (!is_sql_cmd)
     958             :                 {
     959             :                     int         i;
     960             : 
     961         276 :                     for (i = 0; i < nfields; i++)
     962             :                     {
     963         202 :                         if (PQgetisnull(res, row, i))
     964           0 :                             values[i] = NULL;
     965             :                         else
     966         202 :                             values[i] = PQgetvalue(res, row, i);
     967             :                     }
     968             :                 }
     969             :                 else
     970             :                 {
     971           0 :                     values[0] = PQcmdStatus(res);
     972             :                 }
     973             : 
     974             :                 /* build the tuple and put it into the tuplestore. */
     975          74 :                 tuple = BuildTupleFromCStrings(attinmeta, values);
     976          72 :                 tuplestore_puttuple(tupstore, tuple);
     977             :             }
     978             : 
     979             :             /* clean up GUC settings, if we changed any */
     980          24 :             restoreLocalGucs(nestlevel);
     981             :         }
     982             :     }
     983           2 :     PG_FINALLY();
     984             :     {
     985             :         /* be sure to release the libpq result */
     986          26 :         PQclear(res);
     987             :     }
     988          26 :     PG_END_TRY();
     989          24 : }
     990             : 
     991             : /*
     992             :  * Execute the given SQL command and store its results into a tuplestore
     993             :  * to be returned as the result of the current function.
     994             :  *
     995             :  * This is equivalent to PQexec followed by materializeResult, but we make
     996             :  * use of libpq's single-row mode to avoid accumulating the whole result
     997             :  * inside libpq before it gets transferred to the tuplestore.
     998             :  */
     999             : static void
    1000          46 : materializeQueryResult(FunctionCallInfo fcinfo,
    1001             :                        PGconn *conn,
    1002             :                        const char *conname,
    1003             :                        const char *sql,
    1004             :                        bool fail)
    1005             : {
    1006          46 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1007          46 :     PGresult   *volatile res = NULL;
    1008          46 :     volatile storeInfo sinfo = {0};
    1009             : 
    1010             :     /* prepTuplestoreResult must have been called previously */
    1011             :     Assert(rsinfo->returnMode == SFRM_Materialize);
    1012             : 
    1013          46 :     sinfo.fcinfo = fcinfo;
    1014             : 
    1015          46 :     PG_TRY();
    1016             :     {
    1017             :         /* Create short-lived memory context for data conversions */
    1018          46 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1019             :                                                  "dblink temporary context",
    1020             :                                                  ALLOCSET_DEFAULT_SIZES);
    1021             : 
    1022             :         /* execute query, collecting any tuples into the tuplestore */
    1023          46 :         res = storeQueryResult(&sinfo, conn, sql);
    1024             : 
    1025          46 :         if (!res ||
    1026          46 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1027          46 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1028           4 :         {
    1029             :             /*
    1030             :              * dblink_res_error will clear the passed PGresult, so we need
    1031             :              * this ugly dance to avoid doing so twice during error exit
    1032             :              */
    1033           4 :             PGresult   *res1 = res;
    1034             : 
    1035           4 :             res = NULL;
    1036           4 :             dblink_res_error(conn, conname, res1, fail,
    1037             :                              "while executing query");
    1038             :             /* if fail isn't set, we'll return an empty query result */
    1039             :         }
    1040          42 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1041             :         {
    1042             :             /*
    1043             :              * storeRow didn't get called, so we need to convert the command
    1044             :              * status string to a tuple manually
    1045             :              */
    1046             :             TupleDesc   tupdesc;
    1047             :             AttInMetadata *attinmeta;
    1048             :             Tuplestorestate *tupstore;
    1049             :             HeapTuple   tuple;
    1050             :             char       *values[1];
    1051             :             MemoryContext oldcontext;
    1052             : 
    1053             :             /*
    1054             :              * need a tuple descriptor representing one TEXT column to return
    1055             :              * the command status string as our result tuple
    1056             :              */
    1057           0 :             tupdesc = CreateTemplateTupleDesc(1);
    1058           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1059             :                                TEXTOID, -1, 0);
    1060           0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1061             : 
    1062           0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1063           0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1064           0 :             rsinfo->setResult = tupstore;
    1065           0 :             rsinfo->setDesc = tupdesc;
    1066           0 :             MemoryContextSwitchTo(oldcontext);
    1067             : 
    1068           0 :             values[0] = PQcmdStatus(res);
    1069             : 
    1070             :             /* build the tuple and put it into the tuplestore. */
    1071           0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1072           0 :             tuplestore_puttuple(tupstore, tuple);
    1073             : 
    1074           0 :             PQclear(res);
    1075           0 :             res = NULL;
    1076             :         }
    1077             :         else
    1078             :         {
    1079             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1080             :             /* storeRow should have created a tuplestore */
    1081             :             Assert(rsinfo->setResult != NULL);
    1082             : 
    1083          42 :             PQclear(res);
    1084          42 :             res = NULL;
    1085             :         }
    1086             : 
    1087             :         /* clean up data conversion short-lived memory context */
    1088          46 :         if (sinfo.tmpcontext != NULL)
    1089          46 :             MemoryContextDelete(sinfo.tmpcontext);
    1090          46 :         sinfo.tmpcontext = NULL;
    1091             : 
    1092          46 :         PQclear(sinfo.last_res);
    1093          46 :         sinfo.last_res = NULL;
    1094          46 :         PQclear(sinfo.cur_res);
    1095          46 :         sinfo.cur_res = NULL;
    1096             :     }
    1097           0 :     PG_CATCH();
    1098             :     {
    1099             :         /* be sure to release any libpq result we collected */
    1100           0 :         PQclear(res);
    1101           0 :         PQclear(sinfo.last_res);
    1102           0 :         PQclear(sinfo.cur_res);
    1103             :         /* and clear out any pending data in libpq */
    1104           0 :         while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
    1105             :                NULL)
    1106           0 :             PQclear(res);
    1107           0 :         PG_RE_THROW();
    1108             :     }
    1109          46 :     PG_END_TRY();
    1110          46 : }
    1111             : 
    1112             : /*
    1113             :  * Execute query, and send any result rows to sinfo->tuplestore.
    1114             :  */
    1115             : static PGresult *
    1116          46 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
    1117             : {
    1118          46 :     bool        first = true;
    1119          46 :     int         nestlevel = -1;
    1120             :     PGresult   *res;
    1121             : 
    1122          46 :     if (!PQsendQuery(conn, sql))
    1123           0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1124             : 
    1125          46 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1126           0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1127             : 
    1128             :     for (;;)
    1129             :     {
    1130         396 :         CHECK_FOR_INTERRUPTS();
    1131             : 
    1132         396 :         sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
    1133         396 :         if (!sinfo->cur_res)
    1134          46 :             break;
    1135             : 
    1136         350 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1137             :         {
    1138             :             /* got one row from possibly-bigger resultset */
    1139             : 
    1140             :             /*
    1141             :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1142             :              * We shouldn't do this until we have a row in hand, to ensure
    1143             :              * libpq has seen any earlier ParameterStatus protocol messages.
    1144             :              */
    1145         304 :             if (first && nestlevel < 0)
    1146          42 :                 nestlevel = applyRemoteGucs(conn);
    1147             : 
    1148         304 :             storeRow(sinfo, sinfo->cur_res, first);
    1149             : 
    1150         304 :             PQclear(sinfo->cur_res);
    1151         304 :             sinfo->cur_res = NULL;
    1152         304 :             first = false;
    1153             :         }
    1154             :         else
    1155             :         {
    1156             :             /* if empty resultset, fill tuplestore header */
    1157          46 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1158           0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1159             : 
    1160             :             /* store completed result at last_res */
    1161          46 :             PQclear(sinfo->last_res);
    1162          46 :             sinfo->last_res = sinfo->cur_res;
    1163          46 :             sinfo->cur_res = NULL;
    1164          46 :             first = true;
    1165             :         }
    1166             :     }
    1167             : 
    1168             :     /* clean up GUC settings, if we changed any */
    1169          46 :     restoreLocalGucs(nestlevel);
    1170             : 
    1171             :     /* return last_res */
    1172          46 :     res = sinfo->last_res;
    1173          46 :     sinfo->last_res = NULL;
    1174          46 :     return res;
    1175             : }
    1176             : 
    1177             : /*
    1178             :  * Send single row to sinfo->tuplestore.
    1179             :  *
    1180             :  * If "first" is true, create the tuplestore using PGresult's metadata
    1181             :  * (in this case the PGresult might contain either zero or one row).
    1182             :  */
    1183             : static void
    1184         304 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
    1185             : {
    1186         304 :     int         nfields = PQnfields(res);
    1187             :     HeapTuple   tuple;
    1188             :     int         i;
    1189             :     MemoryContext oldcontext;
    1190             : 
    1191         304 :     if (first)
    1192             :     {
    1193             :         /* Prepare for new result set */
    1194          42 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1195             :         TupleDesc   tupdesc;
    1196             : 
    1197             :         /*
    1198             :          * It's possible to get more than one result set if the query string
    1199             :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1200             :          * traditional behavior of throwing away all but the last result.
    1201             :          */
    1202          42 :         if (sinfo->tuplestore)
    1203           0 :             tuplestore_end(sinfo->tuplestore);
    1204          42 :         sinfo->tuplestore = NULL;
    1205             : 
    1206             :         /* get a tuple descriptor for our result type */
    1207          42 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1208             :         {
    1209          42 :             case TYPEFUNC_COMPOSITE:
    1210             :                 /* success */
    1211          42 :                 break;
    1212           0 :             case TYPEFUNC_RECORD:
    1213             :                 /* failed to determine actual type of RECORD */
    1214           0 :                 ereport(ERROR,
    1215             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1216             :                          errmsg("function returning record called in context "
    1217             :                                 "that cannot accept type record")));
    1218             :                 break;
    1219           0 :             default:
    1220             :                 /* result type isn't composite */
    1221           0 :                 elog(ERROR, "return type must be a row type");
    1222             :                 break;
    1223             :         }
    1224             : 
    1225             :         /* make sure we have a persistent copy of the tupdesc */
    1226          42 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1227             : 
    1228             :         /* check result and tuple descriptor have the same number of columns */
    1229          42 :         if (nfields != tupdesc->natts)
    1230           0 :             ereport(ERROR,
    1231             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1232             :                      errmsg("remote query result rowtype does not match "
    1233             :                             "the specified FROM clause rowtype")));
    1234             : 
    1235             :         /* Prepare attinmeta for later data conversions */
    1236          42 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1237             : 
    1238             :         /* Create a new, empty tuplestore */
    1239          42 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1240          42 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1241          42 :         rsinfo->setResult = sinfo->tuplestore;
    1242          42 :         rsinfo->setDesc = tupdesc;
    1243          42 :         MemoryContextSwitchTo(oldcontext);
    1244             : 
    1245             :         /* Done if empty resultset */
    1246          42 :         if (PQntuples(res) == 0)
    1247           0 :             return;
    1248             : 
    1249             :         /*
    1250             :          * Set up sufficiently-wide string pointers array; this won't change
    1251             :          * in size so it's easy to preallocate.
    1252             :          */
    1253          42 :         if (sinfo->cstrs)
    1254           0 :             pfree(sinfo->cstrs);
    1255          42 :         sinfo->cstrs = palloc_array(char *, nfields);
    1256             :     }
    1257             : 
    1258             :     /* Should have a single-row result if we get here */
    1259             :     Assert(PQntuples(res) == 1);
    1260             : 
    1261             :     /*
    1262             :      * Do the following work in a temp context that we reset after each tuple.
    1263             :      * This cleans up not only the data we have direct access to, but any
    1264             :      * cruft the I/O functions might leak.
    1265             :      */
    1266         304 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1267             : 
    1268             :     /*
    1269             :      * Fill cstrs with null-terminated strings of column values.
    1270             :      */
    1271        1140 :     for (i = 0; i < nfields; i++)
    1272             :     {
    1273         836 :         if (PQgetisnull(res, 0, i))
    1274           0 :             sinfo->cstrs[i] = NULL;
    1275             :         else
    1276         836 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1277             :     }
    1278             : 
    1279             :     /* Convert row to a tuple, and add it to the tuplestore */
    1280         304 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1281             : 
    1282         304 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1283             : 
    1284             :     /* Clean up */
    1285         304 :     MemoryContextSwitchTo(oldcontext);
    1286         304 :     MemoryContextReset(sinfo->tmpcontext);
    1287             : }
    1288             : 
    1289             : /*
    1290             :  * List all open dblink connections by name.
    1291             :  * Returns an array of all connection names.
    1292             :  * Takes no params
    1293             :  */
    1294           6 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1295             : Datum
    1296           2 : dblink_get_connections(PG_FUNCTION_ARGS)
    1297             : {
    1298             :     HASH_SEQ_STATUS status;
    1299             :     remoteConnHashEnt *hentry;
    1300           2 :     ArrayBuildState *astate = NULL;
    1301             : 
    1302           2 :     if (remoteConnHash)
    1303             :     {
    1304           2 :         hash_seq_init(&status, remoteConnHash);
    1305           8 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1306             :         {
    1307             :             /* stash away current value */
    1308           6 :             astate = accumArrayResult(astate,
    1309           6 :                                       CStringGetTextDatum(hentry->name),
    1310             :                                       false, TEXTOID, CurrentMemoryContext);
    1311             :         }
    1312             :     }
    1313             : 
    1314           2 :     if (astate)
    1315           2 :         PG_RETURN_DATUM(makeArrayResult(astate,
    1316             :                                         CurrentMemoryContext));
    1317             :     else
    1318           0 :         PG_RETURN_NULL();
    1319             : }
    1320             : 
    1321             : /*
    1322             :  * Checks if a given remote connection is busy
    1323             :  *
    1324             :  * Returns 1 if the connection is busy, 0 otherwise
    1325             :  * Params:
    1326             :  *  text connection_name - name of the connection to check
    1327             :  *
    1328             :  */
    1329           6 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1330             : Datum
    1331           2 : dblink_is_busy(PG_FUNCTION_ARGS)
    1332             : {
    1333             :     PGconn     *conn;
    1334             : 
    1335           2 :     dblink_init();
    1336           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1337             : 
    1338           2 :     PQconsumeInput(conn);
    1339           2 :     PG_RETURN_INT32(PQisBusy(conn));
    1340             : }
    1341             : 
    1342             : /*
    1343             :  * Cancels a running request on a connection
    1344             :  *
    1345             :  * Returns text:
    1346             :  *  "OK" if the cancel request has been sent correctly,
    1347             :  *      an error message otherwise
    1348             :  *
    1349             :  * Params:
    1350             :  *  text connection_name - name of the connection to check
    1351             :  *
    1352             :  */
    1353           6 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1354             : Datum
    1355           2 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1356             : {
    1357             :     PGconn     *conn;
    1358             :     const char *msg;
    1359             :     TimestampTz endtime;
    1360             : 
    1361           2 :     dblink_init();
    1362           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1363           2 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1364             :                                           30000);
    1365           2 :     msg = libpqsrv_cancel(conn, endtime);
    1366           2 :     if (msg == NULL)
    1367           2 :         msg = "OK";
    1368             : 
    1369           2 :     PG_RETURN_TEXT_P(cstring_to_text(msg));
    1370             : }
    1371             : 
    1372             : 
    1373             : /*
    1374             :  * Get error message from a connection
    1375             :  *
    1376             :  * Returns text:
    1377             :  *  "OK" if no error, an error message otherwise
    1378             :  *
    1379             :  * Params:
    1380             :  *  text connection_name - name of the connection to check
    1381             :  *
    1382             :  */
    1383           6 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1384             : Datum
    1385           2 : dblink_error_message(PG_FUNCTION_ARGS)
    1386             : {
    1387             :     char       *msg;
    1388             :     PGconn     *conn;
    1389             : 
    1390           2 :     dblink_init();
    1391           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1392             : 
    1393           2 :     msg = PQerrorMessage(conn);
    1394           2 :     if (msg == NULL || msg[0] == '\0')
    1395           2 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1396             :     else
    1397           0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1398             : }
    1399             : 
    1400             : /*
    1401             :  * Execute an SQL non-SELECT command
    1402             :  */
    1403          26 : PG_FUNCTION_INFO_V1(dblink_exec);
    1404             : Datum
    1405          52 : dblink_exec(PG_FUNCTION_ARGS)
    1406             : {
    1407          52 :     text       *volatile sql_cmd_status = NULL;
    1408          52 :     PGconn     *volatile conn = NULL;
    1409          52 :     volatile bool freeconn = false;
    1410             : 
    1411          52 :     dblink_init();
    1412             : 
    1413          52 :     PG_TRY();
    1414             :     {
    1415          52 :         PGresult   *res = NULL;
    1416          52 :         char       *sql = NULL;
    1417          52 :         char       *conname = NULL;
    1418          52 :         bool        fail = true;    /* default to backward compatible behavior */
    1419             : 
    1420          52 :         if (PG_NARGS() == 3)
    1421             :         {
    1422             :             /* must be text,text,bool */
    1423           0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1424           0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1425           0 :             fail = PG_GETARG_BOOL(2);
    1426           0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1427             :         }
    1428          52 :         else if (PG_NARGS() == 2)
    1429             :         {
    1430             :             /* might be text,text or text,bool */
    1431          34 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1432             :             {
    1433           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1434           2 :                 fail = PG_GETARG_BOOL(1);
    1435           2 :                 conn = pconn->conn;
    1436             :             }
    1437             :             else
    1438             :             {
    1439          32 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1440          32 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1441          32 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1442             :             }
    1443             :         }
    1444          18 :         else if (PG_NARGS() == 1)
    1445             :         {
    1446             :             /* must be single text argument */
    1447          18 :             conn = pconn->conn;
    1448          18 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1449             :         }
    1450             :         else
    1451             :             /* shouldn't happen */
    1452           0 :             elog(ERROR, "wrong number of arguments");
    1453             : 
    1454          52 :         if (!conn)
    1455           0 :             dblink_conn_not_avail(conname);
    1456             : 
    1457          52 :         res = libpqsrv_exec(conn, sql, dblink_we_get_result);
    1458          52 :         if (!res ||
    1459          52 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1460           4 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1461             :         {
    1462           4 :             dblink_res_error(conn, conname, res, fail,
    1463             :                              "while executing command");
    1464             : 
    1465             :             /*
    1466             :              * and save a copy of the command status string to return as our
    1467             :              * result tuple
    1468             :              */
    1469           2 :             sql_cmd_status = cstring_to_text("ERROR");
    1470             :         }
    1471          48 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1472             :         {
    1473             :             /*
    1474             :              * and save a copy of the command status string to return as our
    1475             :              * result tuple
    1476             :              */
    1477          48 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1478          48 :             PQclear(res);
    1479             :         }
    1480             :         else
    1481             :         {
    1482           0 :             PQclear(res);
    1483           0 :             ereport(ERROR,
    1484             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1485             :                      errmsg("statement returning results not allowed")));
    1486             :         }
    1487             :     }
    1488           2 :     PG_FINALLY();
    1489             :     {
    1490             :         /* if needed, close the connection to the database */
    1491          52 :         if (freeconn)
    1492           2 :             libpqsrv_disconnect(conn);
    1493             :     }
    1494          52 :     PG_END_TRY();
    1495             : 
    1496          50 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1497             : }
    1498             : 
    1499             : 
    1500             : /*
    1501             :  * dblink_get_pkey
    1502             :  *
    1503             :  * Return list of primary key fields for the supplied relation,
    1504             :  * or NULL if none exists.
    1505             :  */
    1506           6 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1507             : Datum
    1508          18 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1509             : {
    1510             :     int16       indnkeyatts;
    1511             :     char      **results;
    1512             :     FuncCallContext *funcctx;
    1513             :     int32       call_cntr;
    1514             :     int32       max_calls;
    1515             :     AttInMetadata *attinmeta;
    1516             :     MemoryContext oldcontext;
    1517             : 
    1518             :     /* stuff done only on the first call of the function */
    1519          18 :     if (SRF_IS_FIRSTCALL())
    1520             :     {
    1521             :         Relation    rel;
    1522             :         TupleDesc   tupdesc;
    1523             : 
    1524             :         /* create a function context for cross-call persistence */
    1525           6 :         funcctx = SRF_FIRSTCALL_INIT();
    1526             : 
    1527             :         /*
    1528             :          * switch to memory context appropriate for multiple function calls
    1529             :          */
    1530           6 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1531             : 
    1532             :         /* open target relation */
    1533           6 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1534             : 
    1535             :         /* get the array of attnums */
    1536           6 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1537             : 
    1538           6 :         relation_close(rel, AccessShareLock);
    1539             : 
    1540             :         /*
    1541             :          * need a tuple descriptor representing one INT and one TEXT column
    1542             :          */
    1543           6 :         tupdesc = CreateTemplateTupleDesc(2);
    1544           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1545             :                            INT4OID, -1, 0);
    1546           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1547             :                            TEXTOID, -1, 0);
    1548             : 
    1549             :         /*
    1550             :          * Generate attribute metadata needed later to produce tuples from raw
    1551             :          * C strings
    1552             :          */
    1553           6 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1554           6 :         funcctx->attinmeta = attinmeta;
    1555             : 
    1556           6 :         if ((results != NULL) && (indnkeyatts > 0))
    1557             :         {
    1558           6 :             funcctx->max_calls = indnkeyatts;
    1559             : 
    1560             :             /* got results, keep track of them */
    1561           6 :             funcctx->user_fctx = results;
    1562             :         }
    1563             :         else
    1564             :         {
    1565             :             /* fast track when no results */
    1566           0 :             MemoryContextSwitchTo(oldcontext);
    1567           0 :             SRF_RETURN_DONE(funcctx);
    1568             :         }
    1569             : 
    1570           6 :         MemoryContextSwitchTo(oldcontext);
    1571             :     }
    1572             : 
    1573             :     /* stuff done on every call of the function */
    1574          18 :     funcctx = SRF_PERCALL_SETUP();
    1575             : 
    1576             :     /*
    1577             :      * initialize per-call variables
    1578             :      */
    1579          18 :     call_cntr = funcctx->call_cntr;
    1580          18 :     max_calls = funcctx->max_calls;
    1581             : 
    1582          18 :     results = (char **) funcctx->user_fctx;
    1583          18 :     attinmeta = funcctx->attinmeta;
    1584             : 
    1585          18 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1586             :     {
    1587             :         char      **values;
    1588             :         HeapTuple   tuple;
    1589             :         Datum       result;
    1590             : 
    1591          12 :         values = palloc_array(char *, 2);
    1592          12 :         values[0] = psprintf("%d", call_cntr + 1);
    1593          12 :         values[1] = results[call_cntr];
    1594             : 
    1595             :         /* build the tuple */
    1596          12 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1597             : 
    1598             :         /* make the tuple into a datum */
    1599          12 :         result = HeapTupleGetDatum(tuple);
    1600             : 
    1601          12 :         SRF_RETURN_NEXT(funcctx, result);
    1602             :     }
    1603             :     else
    1604             :     {
    1605             :         /* do when there is no more left */
    1606           6 :         SRF_RETURN_DONE(funcctx);
    1607             :     }
    1608             : }
    1609             : 
    1610             : 
    1611             : /*
    1612             :  * dblink_build_sql_insert
    1613             :  *
    1614             :  * Used to generate an SQL insert statement
    1615             :  * based on an existing tuple in a local relation.
    1616             :  * This is useful for selectively replicating data
    1617             :  * to another server via dblink.
    1618             :  *
    1619             :  * API:
    1620             :  * <relname> - name of local table of interest
    1621             :  * <pkattnums> - an int2vector of attnums which will be used
    1622             :  * to identify the local tuple of interest
    1623             :  * <pknumatts> - number of attnums in pkattnums
    1624             :  * <src_pkattvals_arry> - text array of key values which will be used
    1625             :  * to identify the local tuple of interest
    1626             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1627             :  * to build the string for execution remotely. These are substituted
    1628             :  * for their counterparts in src_pkattvals_arry
    1629             :  */
    1630           8 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1631             : Datum
    1632          12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1633             : {
    1634          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1635          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1636          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1637          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1638          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1639             :     Relation    rel;
    1640             :     int        *pkattnums;
    1641             :     int         pknumatts;
    1642             :     char      **src_pkattvals;
    1643             :     char      **tgt_pkattvals;
    1644             :     int         src_nitems;
    1645             :     int         tgt_nitems;
    1646             :     char       *sql;
    1647             : 
    1648             :     /*
    1649             :      * Open target relation.
    1650             :      */
    1651          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1652             : 
    1653             :     /*
    1654             :      * Process pkattnums argument.
    1655             :      */
    1656          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1657             :                        &pkattnums, &pknumatts);
    1658             : 
    1659             :     /*
    1660             :      * Source array is made up of key values that will be used to locate the
    1661             :      * tuple of interest from the local system.
    1662             :      */
    1663           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1664             : 
    1665             :     /*
    1666             :      * There should be one source array key value for each key attnum
    1667             :      */
    1668           8 :     if (src_nitems != pknumatts)
    1669           0 :         ereport(ERROR,
    1670             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1671             :                  errmsg("source key array length must match number of key attributes")));
    1672             : 
    1673             :     /*
    1674             :      * Target array is made up of key values that will be used to build the
    1675             :      * SQL string for use on the remote system.
    1676             :      */
    1677           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1678             : 
    1679             :     /*
    1680             :      * There should be one target array key value for each key attnum
    1681             :      */
    1682           8 :     if (tgt_nitems != pknumatts)
    1683           0 :         ereport(ERROR,
    1684             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1685             :                  errmsg("target key array length must match number of key attributes")));
    1686             : 
    1687             :     /*
    1688             :      * Prep work is finally done. Go get the SQL string.
    1689             :      */
    1690           8 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1691             : 
    1692             :     /*
    1693             :      * Now we can close the relation.
    1694             :      */
    1695           8 :     relation_close(rel, AccessShareLock);
    1696             : 
    1697             :     /*
    1698             :      * And send it
    1699             :      */
    1700           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1701             : }
    1702             : 
    1703             : 
    1704             : /*
    1705             :  * dblink_build_sql_delete
    1706             :  *
    1707             :  * Used to generate an SQL delete statement.
    1708             :  * This is useful for selectively replicating a
    1709             :  * delete to another server via dblink.
    1710             :  *
    1711             :  * API:
    1712             :  * <relname> - name of remote table of interest
    1713             :  * <pkattnums> - an int2vector of attnums which will be used
    1714             :  * to identify the remote tuple of interest
    1715             :  * <pknumatts> - number of attnums in pkattnums
    1716             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1717             :  * to build the string for execution remotely.
    1718             :  */
    1719           8 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1720             : Datum
    1721          12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1722             : {
    1723          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1724          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1725          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1726          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1727             :     Relation    rel;
    1728             :     int        *pkattnums;
    1729             :     int         pknumatts;
    1730             :     char      **tgt_pkattvals;
    1731             :     int         tgt_nitems;
    1732             :     char       *sql;
    1733             : 
    1734             :     /*
    1735             :      * Open target relation.
    1736             :      */
    1737          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1738             : 
    1739             :     /*
    1740             :      * Process pkattnums argument.
    1741             :      */
    1742          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1743             :                        &pkattnums, &pknumatts);
    1744             : 
    1745             :     /*
    1746             :      * Target array is made up of key values that will be used to build the
    1747             :      * SQL string for use on the remote system.
    1748             :      */
    1749           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1750             : 
    1751             :     /*
    1752             :      * There should be one target array key value for each key attnum
    1753             :      */
    1754           8 :     if (tgt_nitems != pknumatts)
    1755           0 :         ereport(ERROR,
    1756             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1757             :                  errmsg("target key array length must match number of key attributes")));
    1758             : 
    1759             :     /*
    1760             :      * Prep work is finally done. Go get the SQL string.
    1761             :      */
    1762           8 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1763             : 
    1764             :     /*
    1765             :      * Now we can close the relation.
    1766             :      */
    1767           8 :     relation_close(rel, AccessShareLock);
    1768             : 
    1769             :     /*
    1770             :      * And send it
    1771             :      */
    1772           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1773             : }
    1774             : 
    1775             : 
    1776             : /*
    1777             :  * dblink_build_sql_update
    1778             :  *
    1779             :  * Used to generate an SQL update statement
    1780             :  * based on an existing tuple in a local relation.
    1781             :  * This is useful for selectively replicating data
    1782             :  * to another server via dblink.
    1783             :  *
    1784             :  * API:
    1785             :  * <relname> - name of local table of interest
    1786             :  * <pkattnums> - an int2vector of attnums which will be used
    1787             :  * to identify the local tuple of interest
    1788             :  * <pknumatts> - number of attnums in pkattnums
    1789             :  * <src_pkattvals_arry> - text array of key values which will be used
    1790             :  * to identify the local tuple of interest
    1791             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1792             :  * to build the string for execution remotely. These are substituted
    1793             :  * for their counterparts in src_pkattvals_arry
    1794             :  */
    1795           8 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1796             : Datum
    1797          12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1798             : {
    1799          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1800          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1801          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1802          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1803          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1804             :     Relation    rel;
    1805             :     int        *pkattnums;
    1806             :     int         pknumatts;
    1807             :     char      **src_pkattvals;
    1808             :     char      **tgt_pkattvals;
    1809             :     int         src_nitems;
    1810             :     int         tgt_nitems;
    1811             :     char       *sql;
    1812             : 
    1813             :     /*
    1814             :      * Open target relation.
    1815             :      */
    1816          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1817             : 
    1818             :     /*
    1819             :      * Process pkattnums argument.
    1820             :      */
    1821          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1822             :                        &pkattnums, &pknumatts);
    1823             : 
    1824             :     /*
    1825             :      * Source array is made up of key values that will be used to locate the
    1826             :      * tuple of interest from the local system.
    1827             :      */
    1828           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1829             : 
    1830             :     /*
    1831             :      * There should be one source array key value for each key attnum
    1832             :      */
    1833           8 :     if (src_nitems != pknumatts)
    1834           0 :         ereport(ERROR,
    1835             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1836             :                  errmsg("source key array length must match number of key attributes")));
    1837             : 
    1838             :     /*
    1839             :      * Target array is made up of key values that will be used to build the
    1840             :      * SQL string for use on the remote system.
    1841             :      */
    1842           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1843             : 
    1844             :     /*
    1845             :      * There should be one target array key value for each key attnum
    1846             :      */
    1847           8 :     if (tgt_nitems != pknumatts)
    1848           0 :         ereport(ERROR,
    1849             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1850             :                  errmsg("target key array length must match number of key attributes")));
    1851             : 
    1852             :     /*
    1853             :      * Prep work is finally done. Go get the SQL string.
    1854             :      */
    1855           8 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1856             : 
    1857             :     /*
    1858             :      * Now we can close the relation.
    1859             :      */
    1860           8 :     relation_close(rel, AccessShareLock);
    1861             : 
    1862             :     /*
    1863             :      * And send it
    1864             :      */
    1865           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1866             : }
    1867             : 
    1868             : /*
    1869             :  * dblink_current_query
    1870             :  * return the current query string
    1871             :  * to allow its use in (among other things)
    1872             :  * rewrite rules
    1873             :  */
    1874           4 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1875             : Datum
    1876           0 : dblink_current_query(PG_FUNCTION_ARGS)
    1877             : {
    1878             :     /* This is now just an alias for the built-in function current_query() */
    1879           0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1880             : }
    1881             : 
    1882             : /*
    1883             :  * Retrieve async notifications for a connection.
    1884             :  *
    1885             :  * Returns a setof record of notifications, or an empty set if none received.
    1886             :  * Can optionally take a named connection as parameter, but uses the unnamed
    1887             :  * connection per default.
    1888             :  *
    1889             :  */
    1890             : #define DBLINK_NOTIFY_COLS      3
    1891             : 
    1892          10 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1893             : Datum
    1894           4 : dblink_get_notify(PG_FUNCTION_ARGS)
    1895             : {
    1896             :     PGconn     *conn;
    1897             :     PGnotify   *notify;
    1898           4 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1899             : 
    1900           4 :     dblink_init();
    1901           4 :     if (PG_NARGS() == 1)
    1902           0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1903             :     else
    1904           4 :         conn = pconn->conn;
    1905             : 
    1906           4 :     InitMaterializedSRF(fcinfo, 0);
    1907             : 
    1908           4 :     PQconsumeInput(conn);
    1909           8 :     while ((notify = PQnotifies(conn)) != NULL)
    1910             :     {
    1911             :         Datum       values[DBLINK_NOTIFY_COLS];
    1912             :         bool        nulls[DBLINK_NOTIFY_COLS];
    1913             : 
    1914           4 :         memset(values, 0, sizeof(values));
    1915           4 :         memset(nulls, 0, sizeof(nulls));
    1916             : 
    1917           4 :         if (notify->relname != NULL)
    1918           4 :             values[0] = CStringGetTextDatum(notify->relname);
    1919             :         else
    1920           0 :             nulls[0] = true;
    1921             : 
    1922           4 :         values[1] = Int32GetDatum(notify->be_pid);
    1923             : 
    1924           4 :         if (notify->extra != NULL)
    1925           4 :             values[2] = CStringGetTextDatum(notify->extra);
    1926             :         else
    1927           0 :             nulls[2] = true;
    1928             : 
    1929           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1930             : 
    1931           4 :         PQfreemem(notify);
    1932           4 :         PQconsumeInput(conn);
    1933             :     }
    1934             : 
    1935           4 :     return (Datum) 0;
    1936             : }
    1937             : 
    1938             : /*
    1939             :  * Validate the options given to a dblink foreign server or user mapping.
    1940             :  * Raise an error if any option is invalid.
    1941             :  *
    1942             :  * We just check the names of options here, so semantic errors in options,
    1943             :  * such as invalid numeric format, will be detected at the attempt to connect.
    1944             :  */
    1945          28 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1946             : Datum
    1947          30 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1948             : {
    1949          30 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1950          30 :     Oid         context = PG_GETARG_OID(1);
    1951             :     ListCell   *cell;
    1952             : 
    1953             :     static const PQconninfoOption *options = NULL;
    1954             : 
    1955             :     /*
    1956             :      * Get list of valid libpq options.
    1957             :      *
    1958             :      * To avoid unnecessary work, we get the list once and use it throughout
    1959             :      * the lifetime of this backend process.  We don't need to care about
    1960             :      * memory context issues, because PQconndefaults allocates with malloc.
    1961             :      */
    1962          30 :     if (!options)
    1963             :     {
    1964          24 :         options = PQconndefaults();
    1965          24 :         if (!options)           /* assume reason for failure is OOM */
    1966           0 :             ereport(ERROR,
    1967             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1968             :                      errmsg("out of memory"),
    1969             :                      errdetail("Could not get libpq's default connection options.")));
    1970             :     }
    1971             : 
    1972             :     /* Validate each supplied option. */
    1973          80 :     foreach(cell, options_list)
    1974             :     {
    1975          58 :         DefElem    *def = (DefElem *) lfirst(cell);
    1976             : 
    1977          58 :         if (!is_valid_dblink_fdw_option(options, def->defname, context))
    1978             :         {
    1979             :             /*
    1980             :              * Unknown option, or invalid option for the context specified, so
    1981             :              * complain about it.  Provide a hint with a valid option that
    1982             :              * looks similar, if there is one.
    1983             :              */
    1984             :             const PQconninfoOption *opt;
    1985             :             const char *closest_match;
    1986             :             ClosestMatchState match_state;
    1987           8 :             bool        has_valid_options = false;
    1988             : 
    1989           8 :             initClosestMatch(&match_state, def->defname, 4);
    1990         408 :             for (opt = options; opt->keyword; opt++)
    1991             :             {
    1992         400 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    1993             :                 {
    1994          18 :                     has_valid_options = true;
    1995          18 :                     updateClosestMatch(&match_state, opt->keyword);
    1996             :                 }
    1997             :             }
    1998             : 
    1999           8 :             closest_match = getClosestMatch(&match_state);
    2000           8 :             ereport(ERROR,
    2001             :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    2002             :                      errmsg("invalid option \"%s\"", def->defname),
    2003             :                      has_valid_options ? closest_match ?
    2004             :                      errhint("Perhaps you meant the option \"%s\".",
    2005             :                              closest_match) : 0 :
    2006             :                      errhint("There are no valid options in this context.")));
    2007             :         }
    2008             :     }
    2009             : 
    2010          22 :     PG_RETURN_VOID();
    2011             : }
    2012             : 
    2013             : 
    2014             : /*************************************************************
    2015             :  * internal functions
    2016             :  */
    2017             : 
    2018             : 
    2019             : /*
    2020             :  * get_pkey_attnames
    2021             :  *
    2022             :  * Get the primary key attnames for the given relation.
    2023             :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2024             :  */
    2025             : static char **
    2026           6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2027             : {
    2028             :     Relation    indexRelation;
    2029             :     ScanKeyData skey;
    2030             :     SysScanDesc scan;
    2031             :     HeapTuple   indexTuple;
    2032             :     int         i;
    2033           6 :     char      **result = NULL;
    2034             :     TupleDesc   tupdesc;
    2035             : 
    2036             :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2037           6 :     *indnkeyatts = 0;
    2038             : 
    2039           6 :     tupdesc = rel->rd_att;
    2040             : 
    2041             :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2042           6 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2043           6 :     ScanKeyInit(&skey,
    2044             :                 Anum_pg_index_indrelid,
    2045             :                 BTEqualStrategyNumber, F_OIDEQ,
    2046             :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2047             : 
    2048           6 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2049             :                               NULL, 1, &skey);
    2050             : 
    2051           6 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2052             :     {
    2053           6 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2054             : 
    2055             :         /* we're only interested if it is the primary key */
    2056           6 :         if (index->indisprimary)
    2057             :         {
    2058           6 :             *indnkeyatts = index->indnkeyatts;
    2059           6 :             if (*indnkeyatts > 0)
    2060             :             {
    2061           6 :                 result = palloc_array(char *, *indnkeyatts);
    2062             : 
    2063          18 :                 for (i = 0; i < *indnkeyatts; i++)
    2064          12 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2065             :             }
    2066           6 :             break;
    2067             :         }
    2068             :     }
    2069             : 
    2070           6 :     systable_endscan(scan);
    2071           6 :     table_close(indexRelation, AccessShareLock);
    2072             : 
    2073           6 :     return result;
    2074             : }
    2075             : 
    2076             : /*
    2077             :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2078             :  * returned as NULL pointers)
    2079             :  */
    2080             : static char **
    2081          40 : get_text_array_contents(ArrayType *array, int *numitems)
    2082             : {
    2083          40 :     int         ndim = ARR_NDIM(array);
    2084          40 :     int        *dims = ARR_DIMS(array);
    2085             :     int         nitems;
    2086             :     int16       typlen;
    2087             :     bool        typbyval;
    2088             :     char        typalign;
    2089             :     char      **values;
    2090             :     char       *ptr;
    2091             :     bits8      *bitmap;
    2092             :     int         bitmask;
    2093             :     int         i;
    2094             : 
    2095             :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2096             : 
    2097          40 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2098             : 
    2099          40 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2100             :                          &typlen, &typbyval, &typalign);
    2101             : 
    2102          40 :     values = palloc_array(char *, nitems);
    2103             : 
    2104          40 :     ptr = ARR_DATA_PTR(array);
    2105          40 :     bitmap = ARR_NULLBITMAP(array);
    2106          40 :     bitmask = 1;
    2107             : 
    2108         110 :     for (i = 0; i < nitems; i++)
    2109             :     {
    2110          70 :         if (bitmap && (*bitmap & bitmask) == 0)
    2111             :         {
    2112           0 :             values[i] = NULL;
    2113             :         }
    2114             :         else
    2115             :         {
    2116          70 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2117          70 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2118          70 :             ptr = (char *) att_align_nominal(ptr, typalign);
    2119             :         }
    2120             : 
    2121             :         /* advance bitmap pointer if any */
    2122          70 :         if (bitmap)
    2123             :         {
    2124           0 :             bitmask <<= 1;
    2125           0 :             if (bitmask == 0x100)
    2126             :             {
    2127           0 :                 bitmap++;
    2128           0 :                 bitmask = 1;
    2129             :             }
    2130             :         }
    2131             :     }
    2132             : 
    2133          40 :     return values;
    2134             : }
    2135             : 
    2136             : static char *
    2137           8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2138             : {
    2139             :     char       *relname;
    2140             :     HeapTuple   tuple;
    2141             :     TupleDesc   tupdesc;
    2142             :     int         natts;
    2143             :     StringInfoData buf;
    2144             :     char       *val;
    2145             :     int         key;
    2146             :     int         i;
    2147             :     bool        needComma;
    2148             : 
    2149           8 :     initStringInfo(&buf);
    2150             : 
    2151             :     /* get relation name including any needed schema prefix and quoting */
    2152           8 :     relname = generate_relation_name(rel);
    2153             : 
    2154           8 :     tupdesc = rel->rd_att;
    2155           8 :     natts = tupdesc->natts;
    2156             : 
    2157           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2158           8 :     if (!tuple)
    2159           0 :         ereport(ERROR,
    2160             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2161             :                  errmsg("source row not found")));
    2162             : 
    2163           8 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2164             : 
    2165           8 :     needComma = false;
    2166          38 :     for (i = 0; i < natts; i++)
    2167             :     {
    2168          30 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2169             : 
    2170          30 :         if (att->attisdropped)
    2171           4 :             continue;
    2172             : 
    2173          26 :         if (needComma)
    2174          18 :             appendStringInfoChar(&buf, ',');
    2175             : 
    2176          26 :         appendStringInfoString(&buf,
    2177          26 :                                quote_ident_cstr(NameStr(att->attname)));
    2178          26 :         needComma = true;
    2179             :     }
    2180             : 
    2181           8 :     appendStringInfoString(&buf, ") VALUES(");
    2182             : 
    2183             :     /*
    2184             :      * Note: i is physical column number (counting from 0).
    2185             :      */
    2186           8 :     needComma = false;
    2187          38 :     for (i = 0; i < natts; i++)
    2188             :     {
    2189          30 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2190           4 :             continue;
    2191             : 
    2192          26 :         if (needComma)
    2193          18 :             appendStringInfoChar(&buf, ',');
    2194             : 
    2195          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2196             : 
    2197          26 :         if (key >= 0)
    2198          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2199             :         else
    2200          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2201             : 
    2202          26 :         if (val != NULL)
    2203             :         {
    2204          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2205          26 :             pfree(val);
    2206             :         }
    2207             :         else
    2208           0 :             appendStringInfoString(&buf, "NULL");
    2209          26 :         needComma = true;
    2210             :     }
    2211           8 :     appendStringInfoChar(&buf, ')');
    2212             : 
    2213           8 :     return buf.data;
    2214             : }
    2215             : 
    2216             : static char *
    2217           8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2218             : {
    2219             :     char       *relname;
    2220             :     TupleDesc   tupdesc;
    2221             :     StringInfoData buf;
    2222             :     int         i;
    2223             : 
    2224           8 :     initStringInfo(&buf);
    2225             : 
    2226             :     /* get relation name including any needed schema prefix and quoting */
    2227           8 :     relname = generate_relation_name(rel);
    2228             : 
    2229           8 :     tupdesc = rel->rd_att;
    2230             : 
    2231           8 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2232          22 :     for (i = 0; i < pknumatts; i++)
    2233             :     {
    2234          14 :         int         pkattnum = pkattnums[i];
    2235          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2236             : 
    2237          14 :         if (i > 0)
    2238           6 :             appendStringInfoString(&buf, " AND ");
    2239             : 
    2240          14 :         appendStringInfoString(&buf,
    2241          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2242             : 
    2243          14 :         if (tgt_pkattvals[i] != NULL)
    2244          14 :             appendStringInfo(&buf, " = %s",
    2245          14 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2246             :         else
    2247           0 :             appendStringInfoString(&buf, " IS NULL");
    2248             :     }
    2249             : 
    2250           8 :     return buf.data;
    2251             : }
    2252             : 
    2253             : static char *
    2254           8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2255             : {
    2256             :     char       *relname;
    2257             :     HeapTuple   tuple;
    2258             :     TupleDesc   tupdesc;
    2259             :     int         natts;
    2260             :     StringInfoData buf;
    2261             :     char       *val;
    2262             :     int         key;
    2263             :     int         i;
    2264             :     bool        needComma;
    2265             : 
    2266           8 :     initStringInfo(&buf);
    2267             : 
    2268             :     /* get relation name including any needed schema prefix and quoting */
    2269           8 :     relname = generate_relation_name(rel);
    2270             : 
    2271           8 :     tupdesc = rel->rd_att;
    2272           8 :     natts = tupdesc->natts;
    2273             : 
    2274           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2275           8 :     if (!tuple)
    2276           0 :         ereport(ERROR,
    2277             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2278             :                  errmsg("source row not found")));
    2279             : 
    2280           8 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2281             : 
    2282             :     /*
    2283             :      * Note: i is physical column number (counting from 0).
    2284             :      */
    2285           8 :     needComma = false;
    2286          38 :     for (i = 0; i < natts; i++)
    2287             :     {
    2288          30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2289             : 
    2290          30 :         if (attr->attisdropped)
    2291           4 :             continue;
    2292             : 
    2293          26 :         if (needComma)
    2294          18 :             appendStringInfoString(&buf, ", ");
    2295             : 
    2296          26 :         appendStringInfo(&buf, "%s = ",
    2297          26 :                          quote_ident_cstr(NameStr(attr->attname)));
    2298             : 
    2299          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2300             : 
    2301          26 :         if (key >= 0)
    2302          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2303             :         else
    2304          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2305             : 
    2306          26 :         if (val != NULL)
    2307             :         {
    2308          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2309          26 :             pfree(val);
    2310             :         }
    2311             :         else
    2312           0 :             appendStringInfoString(&buf, "NULL");
    2313          26 :         needComma = true;
    2314             :     }
    2315             : 
    2316           8 :     appendStringInfoString(&buf, " WHERE ");
    2317             : 
    2318          22 :     for (i = 0; i < pknumatts; i++)
    2319             :     {
    2320          14 :         int         pkattnum = pkattnums[i];
    2321          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2322             : 
    2323          14 :         if (i > 0)
    2324           6 :             appendStringInfoString(&buf, " AND ");
    2325             : 
    2326          14 :         appendStringInfoString(&buf,
    2327          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2328             : 
    2329          14 :         val = tgt_pkattvals[i];
    2330             : 
    2331          14 :         if (val != NULL)
    2332          14 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2333             :         else
    2334           0 :             appendStringInfoString(&buf, " IS NULL");
    2335             :     }
    2336             : 
    2337           8 :     return buf.data;
    2338             : }
    2339             : 
    2340             : /*
    2341             :  * Return a properly quoted identifier.
    2342             :  * Uses quote_ident in quote.c
    2343             :  */
    2344             : static char *
    2345         160 : quote_ident_cstr(char *rawstr)
    2346             : {
    2347             :     text       *rawstr_text;
    2348             :     text       *result_text;
    2349             :     char       *result;
    2350             : 
    2351         160 :     rawstr_text = cstring_to_text(rawstr);
    2352         160 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2353             :                                                      PointerGetDatum(rawstr_text)));
    2354         160 :     result = text_to_cstring(result_text);
    2355             : 
    2356         160 :     return result;
    2357             : }
    2358             : 
    2359             : static int
    2360          52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2361             : {
    2362             :     int         i;
    2363             : 
    2364             :     /*
    2365             :      * Not likely a long list anyway, so just scan for the value
    2366             :      */
    2367         100 :     for (i = 0; i < pknumatts; i++)
    2368          76 :         if (key == pkattnums[i])
    2369          28 :             return i;
    2370             : 
    2371          24 :     return -1;
    2372             : }
    2373             : 
    2374             : static HeapTuple
    2375          16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2376             : {
    2377             :     char       *relname;
    2378             :     TupleDesc   tupdesc;
    2379             :     int         natts;
    2380             :     StringInfoData buf;
    2381             :     int         ret;
    2382             :     HeapTuple   tuple;
    2383             :     int         i;
    2384             : 
    2385             :     /*
    2386             :      * Connect to SPI manager
    2387             :      */
    2388          16 :     SPI_connect();
    2389             : 
    2390          16 :     initStringInfo(&buf);
    2391             : 
    2392             :     /* get relation name including any needed schema prefix and quoting */
    2393          16 :     relname = generate_relation_name(rel);
    2394             : 
    2395          16 :     tupdesc = rel->rd_att;
    2396          16 :     natts = tupdesc->natts;
    2397             : 
    2398             :     /*
    2399             :      * Build sql statement to look up tuple of interest, ie, the one matching
    2400             :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2401             :      * generate a result tuple that matches the table's physical structure,
    2402             :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2403             :      * different tupdescs and everything's very confusing.
    2404             :      */
    2405          16 :     appendStringInfoString(&buf, "SELECT ");
    2406             : 
    2407          76 :     for (i = 0; i < natts; i++)
    2408             :     {
    2409          60 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2410             : 
    2411          60 :         if (i > 0)
    2412          44 :             appendStringInfoString(&buf, ", ");
    2413             : 
    2414          60 :         if (attr->attisdropped)
    2415           8 :             appendStringInfoString(&buf, "NULL");
    2416             :         else
    2417          52 :             appendStringInfoString(&buf,
    2418          52 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2419             :     }
    2420             : 
    2421          16 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2422             : 
    2423          44 :     for (i = 0; i < pknumatts; i++)
    2424             :     {
    2425          28 :         int         pkattnum = pkattnums[i];
    2426          28 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2427             : 
    2428          28 :         if (i > 0)
    2429          12 :             appendStringInfoString(&buf, " AND ");
    2430             : 
    2431          28 :         appendStringInfoString(&buf,
    2432          28 :                                quote_ident_cstr(NameStr(attr->attname)));
    2433             : 
    2434          28 :         if (src_pkattvals[i] != NULL)
    2435          28 :             appendStringInfo(&buf, " = %s",
    2436          28 :                              quote_literal_cstr(src_pkattvals[i]));
    2437             :         else
    2438           0 :             appendStringInfoString(&buf, " IS NULL");
    2439             :     }
    2440             : 
    2441             :     /*
    2442             :      * Retrieve the desired tuple
    2443             :      */
    2444          16 :     ret = SPI_exec(buf.data, 0);
    2445          16 :     pfree(buf.data);
    2446             : 
    2447             :     /*
    2448             :      * Only allow one qualifying tuple
    2449             :      */
    2450          16 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2451           0 :         ereport(ERROR,
    2452             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2453             :                  errmsg("source criteria matched more than one record")));
    2454             : 
    2455          16 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2456             :     {
    2457          16 :         SPITupleTable *tuptable = SPI_tuptable;
    2458             : 
    2459          16 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2460          16 :         SPI_finish();
    2461             : 
    2462          16 :         return tuple;
    2463             :     }
    2464             :     else
    2465             :     {
    2466             :         /*
    2467             :          * no qualifying tuples
    2468             :          */
    2469           0 :         SPI_finish();
    2470             : 
    2471           0 :         return NULL;
    2472             :     }
    2473             : 
    2474             :     /*
    2475             :      * never reached, but keep compiler quiet
    2476             :      */
    2477             :     return NULL;
    2478             : }
    2479             : 
    2480             : /*
    2481             :  * Open the relation named by relname_text, acquire specified type of lock,
    2482             :  * verify we have specified permissions.
    2483             :  * Caller must close rel when done with it.
    2484             :  */
    2485             : static Relation
    2486          42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2487             : {
    2488             :     RangeVar   *relvar;
    2489             :     Relation    rel;
    2490             :     AclResult   aclresult;
    2491             : 
    2492          42 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2493          42 :     rel = table_openrv(relvar, lockmode);
    2494             : 
    2495          42 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    2496             :                                   aclmode);
    2497          42 :     if (aclresult != ACLCHECK_OK)
    2498           0 :         aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
    2499           0 :                        RelationGetRelationName(rel));
    2500             : 
    2501          42 :     return rel;
    2502             : }
    2503             : 
    2504             : /*
    2505             :  * generate_relation_name - copied from ruleutils.c
    2506             :  *      Compute the name to display for a relation
    2507             :  *
    2508             :  * The result includes all necessary quoting and schema-prefixing.
    2509             :  */
    2510             : static char *
    2511          40 : generate_relation_name(Relation rel)
    2512             : {
    2513             :     char       *nspname;
    2514             :     char       *result;
    2515             : 
    2516             :     /* Qualify the name if not visible in search path */
    2517          40 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2518          30 :         nspname = NULL;
    2519             :     else
    2520          10 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2521             : 
    2522          40 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2523             : 
    2524          40 :     return result;
    2525             : }
    2526             : 
    2527             : 
    2528             : static remoteConn *
    2529         158 : getConnectionByName(const char *name)
    2530             : {
    2531             :     remoteConnHashEnt *hentry;
    2532             :     char       *key;
    2533             : 
    2534         158 :     if (!remoteConnHash)
    2535          12 :         remoteConnHash = createConnHash();
    2536             : 
    2537         158 :     key = pstrdup(name);
    2538         158 :     truncate_identifier(key, strlen(key), false);
    2539         158 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2540             :                                                key, HASH_FIND, NULL);
    2541             : 
    2542         158 :     if (hentry)
    2543         136 :         return hentry->rconn;
    2544             : 
    2545          22 :     return NULL;
    2546             : }
    2547             : 
    2548             : static HTAB *
    2549          14 : createConnHash(void)
    2550             : {
    2551             :     HASHCTL     ctl;
    2552             : 
    2553          14 :     ctl.keysize = NAMEDATALEN;
    2554          14 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2555             : 
    2556          14 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2557             :                        HASH_ELEM | HASH_STRINGS);
    2558             : }
    2559             : 
    2560             : static void
    2561          20 : createNewConnection(const char *name, remoteConn *rconn)
    2562             : {
    2563             :     remoteConnHashEnt *hentry;
    2564             :     bool        found;
    2565             :     char       *key;
    2566             : 
    2567          20 :     if (!remoteConnHash)
    2568           2 :         remoteConnHash = createConnHash();
    2569             : 
    2570          20 :     key = pstrdup(name);
    2571          20 :     truncate_identifier(key, strlen(key), true);
    2572          20 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2573             :                                                HASH_ENTER, &found);
    2574             : 
    2575          20 :     if (found)
    2576             :     {
    2577           2 :         libpqsrv_disconnect(rconn->conn);
    2578           2 :         pfree(rconn);
    2579             : 
    2580           2 :         ereport(ERROR,
    2581             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2582             :                  errmsg("duplicate connection name")));
    2583             :     }
    2584             : 
    2585          18 :     hentry->rconn = rconn;
    2586          18 : }
    2587             : 
    2588             : static void
    2589          16 : deleteConnection(const char *name)
    2590             : {
    2591             :     remoteConnHashEnt *hentry;
    2592             :     bool        found;
    2593             :     char       *key;
    2594             : 
    2595          16 :     if (!remoteConnHash)
    2596           0 :         remoteConnHash = createConnHash();
    2597             : 
    2598          16 :     key = pstrdup(name);
    2599          16 :     truncate_identifier(key, strlen(key), false);
    2600          16 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2601             :                                                key, HASH_REMOVE, &found);
    2602             : 
    2603          16 :     if (!hentry)
    2604           0 :         ereport(ERROR,
    2605             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2606             :                  errmsg("undefined connection name")));
    2607          16 : }
    2608             : 
    2609             :  /*
    2610             :   * Ensure that require_auth and SCRAM keys are correctly set on connstr.
    2611             :   * SCRAM keys used to pass-through are coming from the initial connection
    2612             :   * from the client with the server.
    2613             :   *
    2614             :   * All required SCRAM options are set by dblink, so we just need to ensure
    2615             :   * that these options are not overwritten by the user.
    2616             :   *
    2617             :   * See appendSCRAMKeysInfo and its usage for more.
    2618             :   */
    2619             : bool
    2620          12 : dblink_connstr_has_required_scram_options(const char *connstr)
    2621             : {
    2622             :     PQconninfoOption *options;
    2623          12 :     bool        has_scram_server_key = false;
    2624          12 :     bool        has_scram_client_key = false;
    2625          12 :     bool        has_require_auth = false;
    2626          12 :     bool        has_scram_keys = false;
    2627             : 
    2628          12 :     options = PQconninfoParse(connstr, NULL);
    2629          12 :     if (options)
    2630             :     {
    2631             :         /*
    2632             :          * Continue iterating even if we found the keys that we need to
    2633             :          * validate to make sure that there is no other declaration of these
    2634             :          * keys that can overwrite the first.
    2635             :          */
    2636         612 :         for (PQconninfoOption *option = options; option->keyword != NULL; option++)
    2637             :         {
    2638         600 :             if (strcmp(option->keyword, "require_auth") == 0)
    2639             :             {
    2640          12 :                 if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
    2641          10 :                     has_require_auth = true;
    2642             :                 else
    2643           2 :                     has_require_auth = false;
    2644             :             }
    2645             : 
    2646         600 :             if (strcmp(option->keyword, "scram_client_key") == 0)
    2647             :             {
    2648          12 :                 if (option->val != NULL && option->val[0] != '\0')
    2649          12 :                     has_scram_client_key = true;
    2650             :                 else
    2651           0 :                     has_scram_client_key = false;
    2652             :             }
    2653             : 
    2654         600 :             if (strcmp(option->keyword, "scram_server_key") == 0)
    2655             :             {
    2656          12 :                 if (option->val != NULL && option->val[0] != '\0')
    2657          12 :                     has_scram_server_key = true;
    2658             :                 else
    2659           0 :                     has_scram_server_key = false;
    2660             :             }
    2661             :         }
    2662          12 :         PQconninfoFree(options);
    2663             :     }
    2664             : 
    2665          12 :     has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
    2666             : 
    2667          12 :     return (has_scram_keys && has_require_auth);
    2668             : }
    2669             : 
    2670             : /*
    2671             :  * We need to make sure that the connection made used credentials
    2672             :  * which were provided by the user, so check what credentials were
    2673             :  * used to connect and then make sure that they came from the user.
    2674             :  */
    2675             : static void
    2676          42 : dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr)
    2677             : {
    2678             :     /* Superuser bypasses security check */
    2679          42 :     if (superuser())
    2680          38 :         return;
    2681             : 
    2682             :     /* If password was used to connect, make sure it was one provided */
    2683           4 :     if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
    2684           0 :         return;
    2685             : 
    2686             :     /*
    2687             :      * Password was not used to connect, check if SCRAM pass-through is in
    2688             :      * use.
    2689             :      *
    2690             :      * If dblink_connstr_has_required_scram_options is true we assume that
    2691             :      * UseScramPassthrough is also true because the required SCRAM keys are
    2692             :      * only added if UseScramPassthrough is set, and the user is not allowed
    2693             :      * to add the SCRAM keys on fdw and user mapping options.
    2694             :      */
    2695           4 :     if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2696           4 :         return;
    2697             : 
    2698             : #ifdef ENABLE_GSS
    2699             :     /* If GSSAPI creds used to connect, make sure it was one delegated */
    2700             :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
    2701             :         return;
    2702             : #endif
    2703             : 
    2704             :     /* Otherwise, fail out */
    2705           0 :     libpqsrv_disconnect(conn);
    2706           0 :     if (rconn)
    2707           0 :         pfree(rconn);
    2708             : 
    2709           0 :     ereport(ERROR,
    2710             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2711             :              errmsg("password or GSSAPI delegated credentials required"),
    2712             :              errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
    2713             :              errhint("Ensure provided credentials match target server's authentication method.")));
    2714             : }
    2715             : 
    2716             : /*
    2717             :  * Function to check if the connection string includes an explicit
    2718             :  * password, needed to ensure that non-superuser password-based auth
    2719             :  * is using a provided password and not one picked up from the
    2720             :  * environment.
    2721             :  */
    2722             : static bool
    2723          14 : dblink_connstr_has_pw(const char *connstr)
    2724             : {
    2725             :     PQconninfoOption *options;
    2726             :     PQconninfoOption *option;
    2727          14 :     bool        connstr_gives_password = false;
    2728             : 
    2729          14 :     options = PQconninfoParse(connstr, NULL);
    2730          14 :     if (options)
    2731             :     {
    2732         714 :         for (option = options; option->keyword != NULL; option++)
    2733             :         {
    2734         700 :             if (strcmp(option->keyword, "password") == 0)
    2735             :             {
    2736          14 :                 if (option->val != NULL && option->val[0] != '\0')
    2737             :                 {
    2738           0 :                     connstr_gives_password = true;
    2739           0 :                     break;
    2740             :                 }
    2741             :             }
    2742             :         }
    2743          14 :         PQconninfoFree(options);
    2744             :     }
    2745             : 
    2746          14 :     return connstr_gives_password;
    2747             : }
    2748             : 
    2749             : /*
    2750             :  * For non-superusers, insist that the connstr specify a password, except if
    2751             :  * GSSAPI credentials have been delegated (and we check that they are used for
    2752             :  * the connection in dblink_security_check later) or if SCRAM pass-through is
    2753             :  * being used.  This prevents a password or GSSAPI credentials from being
    2754             :  * picked up from .pgpass, a service file, the environment, etc.  We don't want
    2755             :  * the postgres user's passwords or Kerberos credentials to be accessible to
    2756             :  * non-superusers. In case of SCRAM pass-through insist that the connstr
    2757             :  * has the required SCRAM pass-through options.
    2758             :  */
    2759             : static void
    2760          52 : dblink_connstr_check(const char *connstr)
    2761             : {
    2762          52 :     if (superuser())
    2763          42 :         return;
    2764             : 
    2765          10 :     if (dblink_connstr_has_pw(connstr))
    2766           0 :         return;
    2767             : 
    2768          10 :     if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2769           6 :         return;
    2770             : 
    2771             : #ifdef ENABLE_GSS
    2772             :     if (be_gssapi_get_delegation(MyProcPort))
    2773             :         return;
    2774             : #endif
    2775             : 
    2776           4 :     ereport(ERROR,
    2777             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2778             :              errmsg("password or GSSAPI delegated credentials required"),
    2779             :              errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
    2780             : }
    2781             : 
    2782             : /*
    2783             :  * Report an error received from the remote server
    2784             :  *
    2785             :  * res: the received error result (will be freed)
    2786             :  * fail: true for ERROR ereport, false for NOTICE
    2787             :  * fmt and following args: sprintf-style format and values for errcontext;
    2788             :  * the resulting string should be worded like "while <some action>"
    2789             :  */
    2790             : static void
    2791          24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2792             :                  bool fail, const char *fmt,...)
    2793             : {
    2794             :     int         level;
    2795          24 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2796          24 :     char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2797          24 :     char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2798          24 :     char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2799          24 :     char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2800             :     int         sqlstate;
    2801             :     char       *message_primary;
    2802             :     char       *message_detail;
    2803             :     char       *message_hint;
    2804             :     char       *message_context;
    2805             :     va_list     ap;
    2806             :     char        dblink_context_msg[512];
    2807             : 
    2808          24 :     if (fail)
    2809           6 :         level = ERROR;
    2810             :     else
    2811          18 :         level = NOTICE;
    2812             : 
    2813          24 :     if (pg_diag_sqlstate)
    2814          24 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2815             :                                  pg_diag_sqlstate[1],
    2816             :                                  pg_diag_sqlstate[2],
    2817             :                                  pg_diag_sqlstate[3],
    2818             :                                  pg_diag_sqlstate[4]);
    2819             :     else
    2820           0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2821             : 
    2822          24 :     message_primary = xpstrdup(pg_diag_message_primary);
    2823          24 :     message_detail = xpstrdup(pg_diag_message_detail);
    2824          24 :     message_hint = xpstrdup(pg_diag_message_hint);
    2825          24 :     message_context = xpstrdup(pg_diag_context);
    2826             : 
    2827             :     /*
    2828             :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2829             :      * needed because for connection-level failures, PQgetResult may just
    2830             :      * return NULL, not a PGresult at all.
    2831             :      */
    2832          24 :     if (message_primary == NULL)
    2833           0 :         message_primary = pchomp(PQerrorMessage(conn));
    2834             : 
    2835             :     /*
    2836             :      * Now that we've copied all the data we need out of the PGresult, it's
    2837             :      * safe to free it.  We must do this to avoid PGresult leakage.  We're
    2838             :      * leaking all the strings too, but those are in palloc'd memory that will
    2839             :      * get cleaned up eventually.
    2840             :      */
    2841          24 :     PQclear(res);
    2842             : 
    2843             :     /*
    2844             :      * Format the basic errcontext string.  Below, we'll add on something
    2845             :      * about the connection name.  That's a violation of the translatability
    2846             :      * guidelines about constructing error messages out of parts, but since
    2847             :      * there's no translation support for dblink, there's no need to worry
    2848             :      * about that (yet).
    2849             :      */
    2850          24 :     va_start(ap, fmt);
    2851          24 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2852          24 :     va_end(ap);
    2853             : 
    2854          24 :     ereport(level,
    2855             :             (errcode(sqlstate),
    2856             :              (message_primary != NULL && message_primary[0] != '\0') ?
    2857             :              errmsg_internal("%s", message_primary) :
    2858             :              errmsg("could not obtain message string for remote error"),
    2859             :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2860             :              message_hint ? errhint("%s", message_hint) : 0,
    2861             :              message_context ? (errcontext("%s", message_context)) : 0,
    2862             :              conname ?
    2863             :              (errcontext("%s on dblink connection named \"%s\"",
    2864             :                          dblink_context_msg, conname)) :
    2865             :              (errcontext("%s on unnamed dblink connection",
    2866             :                          dblink_context_msg))));
    2867          18 : }
    2868             : 
    2869             : /*
    2870             :  * Obtain connection string for a foreign server
    2871             :  */
    2872             : static char *
    2873          52 : get_connect_string(const char *servername)
    2874             : {
    2875          52 :     ForeignServer *foreign_server = NULL;
    2876             :     UserMapping *user_mapping;
    2877             :     ListCell   *cell;
    2878             :     StringInfoData buf;
    2879             :     ForeignDataWrapper *fdw;
    2880             :     AclResult   aclresult;
    2881             :     char       *srvname;
    2882             : 
    2883             :     static const PQconninfoOption *options = NULL;
    2884             : 
    2885          52 :     initStringInfo(&buf);
    2886             : 
    2887             :     /*
    2888             :      * Get list of valid libpq options.
    2889             :      *
    2890             :      * To avoid unnecessary work, we get the list once and use it throughout
    2891             :      * the lifetime of this backend process.  We don't need to care about
    2892             :      * memory context issues, because PQconndefaults allocates with malloc.
    2893             :      */
    2894          52 :     if (!options)
    2895             :     {
    2896          14 :         options = PQconndefaults();
    2897          14 :         if (!options)           /* assume reason for failure is OOM */
    2898           0 :             ereport(ERROR,
    2899             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2900             :                      errmsg("out of memory"),
    2901             :                      errdetail("Could not get libpq's default connection options.")));
    2902             :     }
    2903             : 
    2904             :     /* first gather the server connstr options */
    2905          52 :     srvname = pstrdup(servername);
    2906          52 :     truncate_identifier(srvname, strlen(srvname), false);
    2907          52 :     foreign_server = GetForeignServerByName(srvname, true);
    2908             : 
    2909          52 :     if (foreign_server)
    2910             :     {
    2911          12 :         Oid         serverid = foreign_server->serverid;
    2912          12 :         Oid         fdwid = foreign_server->fdwid;
    2913          12 :         Oid         userid = GetUserId();
    2914             : 
    2915          12 :         user_mapping = GetUserMapping(userid, serverid);
    2916          12 :         fdw = GetForeignDataWrapper(fdwid);
    2917             : 
    2918             :         /* Check permissions, user must have usage on the server. */
    2919          12 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2920          12 :         if (aclresult != ACLCHECK_OK)
    2921           0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2922             : 
    2923             :         /*
    2924             :          * First append hardcoded options needed for SCRAM pass-through, so if
    2925             :          * the user overwrites these options we can ereport on
    2926             :          * dblink_connstr_check and dblink_security_check.
    2927             :          */
    2928          12 :         if (MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
    2929           8 :             appendSCRAMKeysInfo(&buf);
    2930             : 
    2931          12 :         foreach(cell, fdw->options)
    2932             :         {
    2933           0 :             DefElem    *def = lfirst(cell);
    2934             : 
    2935           0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2936           0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2937           0 :                                  escape_param_str(strVal(def->arg)));
    2938             :         }
    2939             : 
    2940          54 :         foreach(cell, foreign_server->options)
    2941             :         {
    2942          42 :             DefElem    *def = lfirst(cell);
    2943             : 
    2944          42 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2945          34 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2946          34 :                                  escape_param_str(strVal(def->arg)));
    2947             :         }
    2948             : 
    2949          24 :         foreach(cell, user_mapping->options)
    2950             :         {
    2951             : 
    2952          12 :             DefElem    *def = lfirst(cell);
    2953             : 
    2954          12 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2955          12 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2956          12 :                                  escape_param_str(strVal(def->arg)));
    2957             :         }
    2958             : 
    2959          12 :         return buf.data;
    2960             :     }
    2961             :     else
    2962          40 :         return NULL;
    2963             : }
    2964             : 
    2965             : /*
    2966             :  * Escaping libpq connect parameter strings.
    2967             :  *
    2968             :  * Replaces "'" with "\'" and "\" with "\\".
    2969             :  */
    2970             : static char *
    2971          46 : escape_param_str(const char *str)
    2972             : {
    2973             :     const char *cp;
    2974             :     StringInfoData buf;
    2975             : 
    2976          46 :     initStringInfo(&buf);
    2977             : 
    2978         410 :     for (cp = str; *cp; cp++)
    2979             :     {
    2980         364 :         if (*cp == '\\' || *cp == '\'')
    2981           0 :             appendStringInfoChar(&buf, '\\');
    2982         364 :         appendStringInfoChar(&buf, *cp);
    2983             :     }
    2984             : 
    2985          46 :     return buf.data;
    2986             : }
    2987             : 
    2988             : /*
    2989             :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2990             :  * functions, and translate to the internal representation.
    2991             :  *
    2992             :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2993             :  * argument (the need for the separate count argument is historical, but we
    2994             :  * still check it).  We check that each attnum corresponds to a valid,
    2995             :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2996             :  * listed twice, though the actual use-case for such things is dubious.
    2997             :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2998             :  * physical not logical column numbers; this was changed for future-proofing.
    2999             :  *
    3000             :  * The internal representation is a palloc'd int array of 0-based physical
    3001             :  * attnums.
    3002             :  */
    3003             : static void
    3004          36 : validate_pkattnums(Relation rel,
    3005             :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    3006             :                    int **pkattnums, int *pknumatts)
    3007             : {
    3008          36 :     TupleDesc   tupdesc = rel->rd_att;
    3009          36 :     int         natts = tupdesc->natts;
    3010             :     int         i;
    3011             : 
    3012             :     /* Don't take more array elements than there are */
    3013          36 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    3014             : 
    3015             :     /* Must have at least one pk attnum selected */
    3016          36 :     if (pknumatts_arg <= 0)
    3017           0 :         ereport(ERROR,
    3018             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3019             :                  errmsg("number of key attributes must be > 0")));
    3020             : 
    3021             :     /* Allocate output array */
    3022          36 :     *pkattnums = palloc_array(int, pknumatts_arg);
    3023          36 :     *pknumatts = pknumatts_arg;
    3024             : 
    3025             :     /* Validate attnums and convert to internal form */
    3026         114 :     for (i = 0; i < pknumatts_arg; i++)
    3027             :     {
    3028          90 :         int         pkattnum = pkattnums_arg->values[i];
    3029             :         int         lnum;
    3030             :         int         j;
    3031             : 
    3032             :         /* Can throw error immediately if out of range */
    3033          90 :         if (pkattnum <= 0 || pkattnum > natts)
    3034          12 :             ereport(ERROR,
    3035             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3036             :                      errmsg("invalid attribute number %d", pkattnum)));
    3037             : 
    3038             :         /* Identify which physical column has this logical number */
    3039          78 :         lnum = 0;
    3040         138 :         for (j = 0; j < natts; j++)
    3041             :         {
    3042             :             /* dropped columns don't count */
    3043         138 :             if (TupleDescAttr(tupdesc, j)->attisdropped)
    3044           6 :                 continue;
    3045             : 
    3046         132 :             if (++lnum == pkattnum)
    3047          78 :                 break;
    3048             :         }
    3049             : 
    3050          78 :         if (j < natts)
    3051          78 :             (*pkattnums)[i] = j;
    3052             :         else
    3053           0 :             ereport(ERROR,
    3054             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3055             :                      errmsg("invalid attribute number %d", pkattnum)));
    3056             :     }
    3057          24 : }
    3058             : 
    3059             : /*
    3060             :  * Check if the specified connection option is valid.
    3061             :  *
    3062             :  * We basically allow whatever libpq thinks is an option, with these
    3063             :  * restrictions:
    3064             :  *      debug options: disallowed
    3065             :  *      "client_encoding": disallowed
    3066             :  *      "user": valid only in USER MAPPING options
    3067             :  *      secure options (eg password): valid only in USER MAPPING options
    3068             :  *      others: valid only in FOREIGN SERVER options
    3069             :  *
    3070             :  * We disallow client_encoding because it would be overridden anyway via
    3071             :  * PQclientEncoding; allowing it to be specified would merely promote
    3072             :  * confusion.
    3073             :  */
    3074             : static bool
    3075         504 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    3076             :                        Oid context)
    3077             : {
    3078             :     const PQconninfoOption *opt;
    3079             : 
    3080             :     /* Look up the option in libpq result */
    3081       11668 :     for (opt = options; opt->keyword; opt++)
    3082             :     {
    3083       11656 :         if (strcmp(opt->keyword, option) == 0)
    3084         492 :             break;
    3085             :     }
    3086         504 :     if (opt->keyword == NULL)
    3087          12 :         return false;
    3088             : 
    3089             :     /* Disallow debug options (particularly "replication") */
    3090         492 :     if (strchr(opt->dispchar, 'D'))
    3091          36 :         return false;
    3092             : 
    3093             :     /* Disallow "client_encoding" */
    3094         456 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    3095           8 :         return false;
    3096             : 
    3097             :     /*
    3098             :      * If the option is "user" or marked secure, it should be specified only
    3099             :      * in USER MAPPING.  Others should be specified only in SERVER.
    3100             :      */
    3101         448 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3102             :     {
    3103          48 :         if (context != UserMappingRelationId)
    3104           6 :             return false;
    3105             :     }
    3106             :     else
    3107             :     {
    3108         400 :         if (context != ForeignServerRelationId)
    3109         336 :             return false;
    3110             :     }
    3111             : 
    3112         106 :     return true;
    3113             : }
    3114             : 
    3115             : /*
    3116             :  * Same as is_valid_dblink_option but also check for only dblink_fdw specific
    3117             :  * options.
    3118             :  */
    3119             : static bool
    3120          58 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
    3121             :                            Oid context)
    3122             : {
    3123          58 :     if (strcmp(option, "use_scram_passthrough") == 0)
    3124           8 :         return true;
    3125             : 
    3126          50 :     return is_valid_dblink_option(options, option, context);
    3127             : }
    3128             : 
    3129             : /*
    3130             :  * Copy the remote session's values of GUCs that affect datatype I/O
    3131             :  * and apply them locally in a new GUC nesting level.  Returns the new
    3132             :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3133             :  * or -1 if no new nestlevel was needed.
    3134             :  *
    3135             :  * We use the equivalent of a function SET option to allow the settings to
    3136             :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3137             :  * thrown in between, guc.c will take care of undoing the settings.
    3138             :  */
    3139             : static int
    3140          68 : applyRemoteGucs(PGconn *conn)
    3141             : {
    3142             :     static const char *const GUCsAffectingIO[] = {
    3143             :         "DateStyle",
    3144             :         "IntervalStyle"
    3145             :     };
    3146             : 
    3147          68 :     int         nestlevel = -1;
    3148             :     int         i;
    3149             : 
    3150         204 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3151             :     {
    3152         136 :         const char *gucName = GUCsAffectingIO[i];
    3153         136 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3154             :         const char *localVal;
    3155             : 
    3156             :         /*
    3157             :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3158             :          * that's okay because its output format won't be ambiguous.  So just
    3159             :          * skip the GUC if we don't get a value for it.  (We might eventually
    3160             :          * need more complicated logic with remote-version checks here.)
    3161             :          */
    3162         136 :         if (remoteVal == NULL)
    3163           0 :             continue;
    3164             : 
    3165             :         /*
    3166             :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3167             :          * have the same value.
    3168             :          */
    3169         136 :         localVal = GetConfigOption(gucName, false, false);
    3170             :         Assert(localVal != NULL);
    3171             : 
    3172         136 :         if (strcmp(remoteVal, localVal) == 0)
    3173         102 :             continue;
    3174             : 
    3175             :         /* Create new GUC nest level if we didn't already */
    3176          34 :         if (nestlevel < 0)
    3177          18 :             nestlevel = NewGUCNestLevel();
    3178             : 
    3179             :         /* Apply the option (this will throw error on failure) */
    3180          34 :         (void) set_config_option(gucName, remoteVal,
    3181             :                                  PGC_USERSET, PGC_S_SESSION,
    3182             :                                  GUC_ACTION_SAVE, true, 0, false);
    3183             :     }
    3184             : 
    3185          68 :     return nestlevel;
    3186             : }
    3187             : 
    3188             : /*
    3189             :  * Restore local GUCs after they have been overlaid with remote settings.
    3190             :  */
    3191             : static void
    3192          70 : restoreLocalGucs(int nestlevel)
    3193             : {
    3194             :     /* Do nothing if no new nestlevel was created */
    3195          70 :     if (nestlevel > 0)
    3196          16 :         AtEOXact_GUC(true, nestlevel);
    3197          70 : }
    3198             : 
    3199             : /*
    3200             :  * Append SCRAM client key and server key information from the global
    3201             :  * MyProcPort into the given StringInfo buffer.
    3202             :  */
    3203             : static void
    3204           8 : appendSCRAMKeysInfo(StringInfo buf)
    3205             : {
    3206             :     int         len;
    3207             :     int         encoded_len;
    3208             :     char       *client_key;
    3209             :     char       *server_key;
    3210             : 
    3211           8 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
    3212             :     /* don't forget the zero-terminator */
    3213           8 :     client_key = palloc0(len + 1);
    3214           8 :     encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ClientKey,
    3215             :                                 sizeof(MyProcPort->scram_ClientKey),
    3216             :                                 client_key, len);
    3217           8 :     if (encoded_len < 0)
    3218           0 :         elog(ERROR, "could not encode SCRAM client key");
    3219             : 
    3220           8 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
    3221             :     /* don't forget the zero-terminator */
    3222           8 :     server_key = palloc0(len + 1);
    3223           8 :     encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ServerKey,
    3224             :                                 sizeof(MyProcPort->scram_ServerKey),
    3225             :                                 server_key, len);
    3226           8 :     if (encoded_len < 0)
    3227           0 :         elog(ERROR, "could not encode SCRAM server key");
    3228             : 
    3229           8 :     appendStringInfo(buf, "scram_client_key='%s' ", client_key);
    3230           8 :     appendStringInfo(buf, "scram_server_key='%s' ", server_key);
    3231           8 :     appendStringInfoString(buf, "require_auth='scram-sha-256' ");
    3232             : 
    3233           8 :     pfree(client_key);
    3234           8 :     pfree(server_key);
    3235           8 : }
    3236             : 
    3237             : 
    3238             : static bool
    3239           8 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
    3240             : {
    3241             :     ListCell   *cell;
    3242             : 
    3243          32 :     foreach(cell, foreign_server->options)
    3244             :     {
    3245          32 :         DefElem    *def = lfirst(cell);
    3246             : 
    3247          32 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3248           8 :             return defGetBoolean(def);
    3249             :     }
    3250             : 
    3251           0 :     foreach(cell, user->options)
    3252             :     {
    3253           0 :         DefElem    *def = (DefElem *) lfirst(cell);
    3254             : 
    3255           0 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3256           0 :             return defGetBoolean(def);
    3257             :     }
    3258             : 
    3259           0 :     return false;
    3260             : }

Generated by: LCOV version 1.14