LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 967 1113 86.9 %
Date: 2025-07-27 11:17:30 Functions: 78 80 97.5 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * dblink.c
       3             :  *
       4             :  * Functions returning results from a remote database
       5             :  *
       6             :  * Joe Conway <mail@joeconway.com>
       7             :  * And contributors:
       8             :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9             :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10             :  *
      11             :  * contrib/dblink/dblink.c
      12             :  * Copyright (c) 2001-2025, PostgreSQL Global Development Group
      13             :  * ALL RIGHTS RESERVED;
      14             :  *
      15             :  * Permission to use, copy, modify, and distribute this software and its
      16             :  * documentation for any purpose, without fee, and without a written agreement
      17             :  * is hereby granted, provided that the above copyright notice and this
      18             :  * paragraph and the following two paragraphs appear in all copies.
      19             :  *
      20             :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21             :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22             :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23             :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24             :  * POSSIBILITY OF SUCH DAMAGE.
      25             :  *
      26             :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27             :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28             :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29             :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30             :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31             :  *
      32             :  */
      33             : #include "postgres.h"
      34             : 
      35             : #include <limits.h>
      36             : 
      37             : #include "access/htup_details.h"
      38             : #include "access/relation.h"
      39             : #include "access/reloptions.h"
      40             : #include "access/table.h"
      41             : #include "catalog/namespace.h"
      42             : #include "catalog/pg_foreign_data_wrapper.h"
      43             : #include "catalog/pg_foreign_server.h"
      44             : #include "catalog/pg_type.h"
      45             : #include "catalog/pg_user_mapping.h"
      46             : #include "commands/defrem.h"
      47             : #include "common/base64.h"
      48             : #include "executor/spi.h"
      49             : #include "foreign/foreign.h"
      50             : #include "funcapi.h"
      51             : #include "lib/stringinfo.h"
      52             : #include "libpq-fe.h"
      53             : #include "libpq/libpq-be.h"
      54             : #include "libpq/libpq-be-fe-helpers.h"
      55             : #include "mb/pg_wchar.h"
      56             : #include "miscadmin.h"
      57             : #include "parser/scansup.h"
      58             : #include "utils/acl.h"
      59             : #include "utils/builtins.h"
      60             : #include "utils/fmgroids.h"
      61             : #include "utils/guc.h"
      62             : #include "utils/lsyscache.h"
      63             : #include "utils/memutils.h"
      64             : #include "utils/rel.h"
      65             : #include "utils/varlena.h"
      66             : #include "utils/wait_event.h"
      67             : 
      68          32 : PG_MODULE_MAGIC_EXT(
      69             :                     .name = "dblink",
      70             :                     .version = PG_VERSION
      71             : );
      72             : 
      73             : typedef struct remoteConn
      74             : {
      75             :     PGconn     *conn;           /* Hold the remote connection */
      76             :     int         openCursorCount;    /* The number of open cursors */
      77             :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      78             : } remoteConn;
      79             : 
      80             : typedef struct storeInfo
      81             : {
      82             :     FunctionCallInfo fcinfo;
      83             :     Tuplestorestate *tuplestore;
      84             :     AttInMetadata *attinmeta;
      85             :     MemoryContext tmpcontext;
      86             :     char      **cstrs;
      87             :     /* temp storage for results to avoid leaks on exception */
      88             :     PGresult   *last_res;
      89             :     PGresult   *cur_res;
      90             : } storeInfo;
      91             : 
      92             : /*
      93             :  * Internal declarations
      94             :  */
      95             : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      96             : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      97             : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      98             :                               PGresult *res);
      99             : static void materializeQueryResult(FunctionCallInfo fcinfo,
     100             :                                    PGconn *conn,
     101             :                                    const char *conname,
     102             :                                    const char *sql,
     103             :                                    bool fail);
     104             : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
     105             : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
     106             : static remoteConn *getConnectionByName(const char *name);
     107             : static HTAB *createConnHash(void);
     108             : static remoteConn *createNewConnection(const char *name);
     109             : static void deleteConnection(const char *name);
     110             : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     111             : static char **get_text_array_contents(ArrayType *array, int *numitems);
     112             : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     113             : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     114             : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     115             : static char *quote_ident_cstr(char *rawstr);
     116             : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     117             : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     118             : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     119             : static char *generate_relation_name(Relation rel);
     120             : static void dblink_connstr_check(const char *connstr);
     121             : static bool dblink_connstr_has_pw(const char *connstr);
     122             : static void dblink_security_check(PGconn *conn, const char *connname,
     123             :                                   const char *connstr);
     124             : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     125             :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     126             : static char *get_connect_string(const char *servername);
     127             : static char *escape_param_str(const char *str);
     128             : static void validate_pkattnums(Relation rel,
     129             :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     130             :                                int **pkattnums, int *pknumatts);
     131             : static bool is_valid_dblink_option(const PQconninfoOption *options,
     132             :                                    const char *option, Oid context);
     133             : static int  applyRemoteGucs(PGconn *conn);
     134             : static void restoreLocalGucs(int nestlevel);
     135             : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
     136             : static void appendSCRAMKeysInfo(StringInfo buf);
     137             : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
     138             :                                        Oid context);
     139             : static bool dblink_connstr_has_required_scram_options(const char *connstr);
     140             : 
     141             : /* Global */
     142             : static remoteConn *pconn = NULL;
     143             : static HTAB *remoteConnHash = NULL;
     144             : 
     145             : /* custom wait event values, retrieved from shared memory */
     146             : static uint32 dblink_we_connect = 0;
     147             : static uint32 dblink_we_get_conn = 0;
     148             : static uint32 dblink_we_get_result = 0;
     149             : 
     150             : /*
     151             :  *  Following is hash that holds multiple remote connections.
     152             :  *  Calling convention of each dblink function changes to accept
     153             :  *  connection name as the first parameter. The connection hash is
     154             :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     155             :  *
     156             :  *  To avoid potentially leaking a PGconn object in case of out-of-memory
     157             :  *  errors, we first create the hash entry, then open the PGconn.
     158             :  *  Hence, a hash entry whose rconn.conn pointer is NULL must be
     159             :  *  understood as a leftover from a failed create; it should be ignored
     160             :  *  by lookup operations, and silently replaced by create operations.
     161             :  */
     162             : 
     163             : typedef struct remoteConnHashEnt
     164             : {
     165             :     char        name[NAMEDATALEN];
     166             :     remoteConn  rconn;
     167             : } remoteConnHashEnt;
     168             : 
     169             : /* initial number of connection hashes */
     170             : #define NUMCONN 16
     171             : 
     172             : pg_noreturn static void
     173           0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     174             : {
     175           0 :     char       *msg = pchomp(PQerrorMessage(conn));
     176             : 
     177           0 :     PQclear(res);
     178           0 :     elog(ERROR, "%s: %s", p2, msg);
     179             : }
     180             : 
     181             : pg_noreturn static void
     182           6 : dblink_conn_not_avail(const char *conname)
     183             : {
     184           6 :     if (conname)
     185           2 :         ereport(ERROR,
     186             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     187             :                  errmsg("connection \"%s\" not available", conname)));
     188             :     else
     189           4 :         ereport(ERROR,
     190             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     191             :                  errmsg("connection not available")));
     192             : }
     193             : 
     194             : static void
     195          72 : dblink_get_conn(char *conname_or_str,
     196             :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     197             : {
     198          72 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     199             :     PGconn     *conn;
     200             :     char       *conname;
     201             :     bool        freeconn;
     202             : 
     203          72 :     if (rconn)
     204             :     {
     205          54 :         conn = rconn->conn;
     206          54 :         conname = conname_or_str;
     207          54 :         freeconn = false;
     208             :     }
     209             :     else
     210             :     {
     211             :         const char *connstr;
     212             : 
     213          18 :         connstr = get_connect_string(conname_or_str);
     214          18 :         if (connstr == NULL)
     215          12 :             connstr = conname_or_str;
     216          18 :         dblink_connstr_check(connstr);
     217             : 
     218             :         /* first time, allocate or get the custom wait event */
     219          16 :         if (dblink_we_get_conn == 0)
     220           8 :             dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
     221             : 
     222             :         /* OK to make connection */
     223          16 :         conn = libpqsrv_connect(connstr, dblink_we_get_conn);
     224             : 
     225          16 :         if (PQstatus(conn) == CONNECTION_BAD)
     226             :         {
     227           4 :             char       *msg = pchomp(PQerrorMessage(conn));
     228             : 
     229           4 :             libpqsrv_disconnect(conn);
     230           4 :             ereport(ERROR,
     231             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     232             :                      errmsg("could not establish connection"),
     233             :                      errdetail_internal("%s", msg)));
     234             :         }
     235             : 
     236          12 :         PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     237             :                             "received message via remote connection");
     238             : 
     239          12 :         dblink_security_check(conn, NULL, connstr);
     240          12 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     241           0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     242          12 :         freeconn = true;
     243          12 :         conname = NULL;
     244             :     }
     245             : 
     246          66 :     *conn_p = conn;
     247          66 :     *conname_p = conname;
     248          66 :     *freeconn_p = freeconn;
     249          66 : }
     250             : 
     251             : static PGconn *
     252          34 : dblink_get_named_conn(const char *conname)
     253             : {
     254          34 :     remoteConn *rconn = getConnectionByName(conname);
     255             : 
     256          34 :     if (rconn)
     257          34 :         return rconn->conn;
     258             : 
     259           0 :     dblink_conn_not_avail(conname);
     260             :     return NULL;                /* keep compiler quiet */
     261             : }
     262             : 
     263             : static void
     264         246 : dblink_init(void)
     265             : {
     266         246 :     if (!pconn)
     267             :     {
     268          12 :         if (dblink_we_get_result == 0)
     269          12 :             dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
     270             : 
     271          12 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     272          12 :         pconn->conn = NULL;
     273          12 :         pconn->openCursorCount = 0;
     274          12 :         pconn->newXactForCursor = false;
     275             :     }
     276         246 : }
     277             : 
     278             : /*
     279             :  * Create a persistent connection to another database
     280             :  */
     281          26 : PG_FUNCTION_INFO_V1(dblink_connect);
     282             : Datum
     283          32 : dblink_connect(PG_FUNCTION_ARGS)
     284             : {
     285          32 :     char       *conname_or_str = NULL;
     286          32 :     char       *connstr = NULL;
     287          32 :     char       *connname = NULL;
     288             :     char       *msg;
     289          32 :     PGconn     *conn = NULL;
     290          32 :     remoteConn *rconn = NULL;
     291             : 
     292          32 :     dblink_init();
     293             : 
     294          32 :     if (PG_NARGS() == 2)
     295             :     {
     296          22 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     297          22 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     298             :     }
     299          10 :     else if (PG_NARGS() == 1)
     300          10 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     301             : 
     302             :     /* first check for valid foreign data server */
     303          32 :     connstr = get_connect_string(conname_or_str);
     304          32 :     if (connstr == NULL)
     305          28 :         connstr = conname_or_str;
     306             : 
     307             :     /* check password in connection string if not superuser */
     308          32 :     dblink_connstr_check(connstr);
     309             : 
     310             :     /* first time, allocate or get the custom wait event */
     311          30 :     if (dblink_we_connect == 0)
     312           4 :         dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
     313             : 
     314             :     /* if we need a hashtable entry, make that first, since it might fail */
     315          30 :     if (connname)
     316             :     {
     317          20 :         rconn = createNewConnection(connname);
     318             :         Assert(rconn->conn == NULL);
     319             :     }
     320             : 
     321             :     /* OK to make connection */
     322          28 :     conn = libpqsrv_connect(connstr, dblink_we_connect);
     323             : 
     324          28 :     if (PQstatus(conn) == CONNECTION_BAD)
     325             :     {
     326           0 :         msg = pchomp(PQerrorMessage(conn));
     327           0 :         libpqsrv_disconnect(conn);
     328           0 :         if (connname)
     329           0 :             deleteConnection(connname);
     330             : 
     331           0 :         ereport(ERROR,
     332             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     333             :                  errmsg("could not establish connection"),
     334             :                  errdetail_internal("%s", msg)));
     335             :     }
     336             : 
     337          28 :     PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     338             :                         "received message via remote connection");
     339             : 
     340             :     /* check password actually used if not superuser */
     341          28 :     dblink_security_check(conn, connname, connstr);
     342             : 
     343             :     /* attempt to set client encoding to match server encoding, if needed */
     344          28 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     345           0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     346             : 
     347             :     /* all OK, save away the conn */
     348          28 :     if (connname)
     349             :     {
     350          18 :         rconn->conn = conn;
     351             :     }
     352             :     else
     353             :     {
     354          10 :         if (pconn->conn)
     355           2 :             libpqsrv_disconnect(pconn->conn);
     356          10 :         pconn->conn = conn;
     357             :     }
     358             : 
     359          28 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     360             : }
     361             : 
     362             : /*
     363             :  * Clear a persistent connection to another database
     364             :  */
     365          16 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     366             : Datum
     367          26 : dblink_disconnect(PG_FUNCTION_ARGS)
     368             : {
     369          26 :     char       *conname = NULL;
     370          26 :     remoteConn *rconn = NULL;
     371          26 :     PGconn     *conn = NULL;
     372             : 
     373          26 :     dblink_init();
     374             : 
     375          26 :     if (PG_NARGS() == 1)
     376             :     {
     377          18 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     378          18 :         rconn = getConnectionByName(conname);
     379          18 :         if (rconn)
     380          16 :             conn = rconn->conn;
     381             :     }
     382             :     else
     383           8 :         conn = pconn->conn;
     384             : 
     385          26 :     if (!conn)
     386           2 :         dblink_conn_not_avail(conname);
     387             : 
     388          24 :     libpqsrv_disconnect(conn);
     389          24 :     if (rconn)
     390          16 :         deleteConnection(conname);
     391             :     else
     392           8 :         pconn->conn = NULL;
     393             : 
     394          24 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     395             : }
     396             : 
     397             : /*
     398             :  * opens a cursor using a persistent connection
     399             :  */
     400          26 : PG_FUNCTION_INFO_V1(dblink_open);
     401             : Datum
     402          18 : dblink_open(PG_FUNCTION_ARGS)
     403             : {
     404          18 :     PGresult   *res = NULL;
     405             :     PGconn     *conn;
     406          18 :     char       *curname = NULL;
     407          18 :     char       *sql = NULL;
     408          18 :     char       *conname = NULL;
     409             :     StringInfoData buf;
     410          18 :     remoteConn *rconn = NULL;
     411          18 :     bool        fail = true;    /* default to backward compatible behavior */
     412             : 
     413          18 :     dblink_init();
     414          18 :     initStringInfo(&buf);
     415             : 
     416          18 :     if (PG_NARGS() == 2)
     417             :     {
     418             :         /* text,text */
     419           4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     420           4 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     421           4 :         rconn = pconn;
     422             :     }
     423          14 :     else if (PG_NARGS() == 3)
     424             :     {
     425             :         /* might be text,text,text or text,text,bool */
     426          12 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     427             :         {
     428           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     429           2 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     430           2 :             fail = PG_GETARG_BOOL(2);
     431           2 :             rconn = pconn;
     432             :         }
     433             :         else
     434             :         {
     435          10 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     436          10 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     437          10 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     438          10 :             rconn = getConnectionByName(conname);
     439             :         }
     440             :     }
     441           2 :     else if (PG_NARGS() == 4)
     442             :     {
     443             :         /* text,text,text,bool */
     444           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     445           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     446           2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     447           2 :         fail = PG_GETARG_BOOL(3);
     448           2 :         rconn = getConnectionByName(conname);
     449             :     }
     450             : 
     451          18 :     if (!rconn || !rconn->conn)
     452           0 :         dblink_conn_not_avail(conname);
     453             : 
     454          18 :     conn = rconn->conn;
     455             : 
     456             :     /* If we are not in a transaction, start one */
     457          18 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     458             :     {
     459          14 :         res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
     460          14 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     461           0 :             dblink_res_internalerror(conn, res, "begin error");
     462          14 :         PQclear(res);
     463          14 :         rconn->newXactForCursor = true;
     464             : 
     465             :         /*
     466             :          * Since transaction state was IDLE, we force cursor count to
     467             :          * initially be 0. This is needed as a previous ABORT might have wiped
     468             :          * out our transaction without maintaining the cursor count for us.
     469             :          */
     470          14 :         rconn->openCursorCount = 0;
     471             :     }
     472             : 
     473             :     /* if we started a transaction, increment cursor count */
     474          18 :     if (rconn->newXactForCursor)
     475          18 :         (rconn->openCursorCount)++;
     476             : 
     477          18 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     478          18 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     479          18 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     480             :     {
     481           4 :         dblink_res_error(conn, conname, res, fail,
     482             :                          "while opening cursor \"%s\"", curname);
     483           4 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     484             :     }
     485             : 
     486          14 :     PQclear(res);
     487          14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     488             : }
     489             : 
     490             : /*
     491             :  * closes a cursor
     492             :  */
     493          20 : PG_FUNCTION_INFO_V1(dblink_close);
     494             : Datum
     495          10 : dblink_close(PG_FUNCTION_ARGS)
     496             : {
     497             :     PGconn     *conn;
     498          10 :     PGresult   *res = NULL;
     499          10 :     char       *curname = NULL;
     500          10 :     char       *conname = NULL;
     501             :     StringInfoData buf;
     502          10 :     remoteConn *rconn = NULL;
     503          10 :     bool        fail = true;    /* default to backward compatible behavior */
     504             : 
     505          10 :     dblink_init();
     506          10 :     initStringInfo(&buf);
     507             : 
     508          10 :     if (PG_NARGS() == 1)
     509             :     {
     510             :         /* text */
     511           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     512           0 :         rconn = pconn;
     513             :     }
     514          10 :     else if (PG_NARGS() == 2)
     515             :     {
     516             :         /* might be text,text or text,bool */
     517          10 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     518             :         {
     519           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     520           4 :             fail = PG_GETARG_BOOL(1);
     521           4 :             rconn = pconn;
     522             :         }
     523             :         else
     524             :         {
     525           6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     526           6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     527           6 :             rconn = getConnectionByName(conname);
     528             :         }
     529             :     }
     530          10 :     if (PG_NARGS() == 3)
     531             :     {
     532             :         /* text,text,bool */
     533           0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     534           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     535           0 :         fail = PG_GETARG_BOOL(2);
     536           0 :         rconn = getConnectionByName(conname);
     537             :     }
     538             : 
     539          10 :     if (!rconn || !rconn->conn)
     540           0 :         dblink_conn_not_avail(conname);
     541             : 
     542          10 :     conn = rconn->conn;
     543             : 
     544          10 :     appendStringInfo(&buf, "CLOSE %s", curname);
     545             : 
     546             :     /* close the cursor */
     547          10 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     548          10 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     549             :     {
     550           2 :         dblink_res_error(conn, conname, res, fail,
     551             :                          "while closing cursor \"%s\"", curname);
     552           2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     553             :     }
     554             : 
     555           8 :     PQclear(res);
     556             : 
     557             :     /* if we started a transaction, decrement cursor count */
     558           8 :     if (rconn->newXactForCursor)
     559             :     {
     560           8 :         (rconn->openCursorCount)--;
     561             : 
     562             :         /* if count is zero, commit the transaction */
     563           8 :         if (rconn->openCursorCount == 0)
     564             :         {
     565           4 :             rconn->newXactForCursor = false;
     566             : 
     567           4 :             res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
     568           4 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     569           0 :                 dblink_res_internalerror(conn, res, "commit error");
     570           4 :             PQclear(res);
     571             :         }
     572             :     }
     573             : 
     574           8 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     575             : }
     576             : 
     577             : /*
     578             :  * Fetch results from an open cursor
     579             :  */
     580          26 : PG_FUNCTION_INFO_V1(dblink_fetch);
     581             : Datum
     582          26 : dblink_fetch(PG_FUNCTION_ARGS)
     583             : {
     584          26 :     PGresult   *res = NULL;
     585          26 :     char       *conname = NULL;
     586          26 :     remoteConn *rconn = NULL;
     587          26 :     PGconn     *conn = NULL;
     588             :     StringInfoData buf;
     589          26 :     char       *curname = NULL;
     590          26 :     int         howmany = 0;
     591          26 :     bool        fail = true;    /* default to backward compatible */
     592             : 
     593          26 :     prepTuplestoreResult(fcinfo);
     594             : 
     595          26 :     dblink_init();
     596             : 
     597          26 :     if (PG_NARGS() == 4)
     598             :     {
     599             :         /* text,text,int,bool */
     600           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     601           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     602           2 :         howmany = PG_GETARG_INT32(2);
     603           2 :         fail = PG_GETARG_BOOL(3);
     604             : 
     605           2 :         rconn = getConnectionByName(conname);
     606           2 :         if (rconn)
     607           2 :             conn = rconn->conn;
     608             :     }
     609          24 :     else if (PG_NARGS() == 3)
     610             :     {
     611             :         /* text,text,int or text,int,bool */
     612          16 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     613             :         {
     614           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     615           4 :             howmany = PG_GETARG_INT32(1);
     616           4 :             fail = PG_GETARG_BOOL(2);
     617           4 :             conn = pconn->conn;
     618             :         }
     619             :         else
     620             :         {
     621          12 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     622          12 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     623          12 :             howmany = PG_GETARG_INT32(2);
     624             : 
     625          12 :             rconn = getConnectionByName(conname);
     626          12 :             if (rconn)
     627          12 :                 conn = rconn->conn;
     628             :         }
     629             :     }
     630           8 :     else if (PG_NARGS() == 2)
     631             :     {
     632             :         /* text,int */
     633           8 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     634           8 :         howmany = PG_GETARG_INT32(1);
     635           8 :         conn = pconn->conn;
     636             :     }
     637             : 
     638          26 :     if (!conn)
     639           0 :         dblink_conn_not_avail(conname);
     640             : 
     641          26 :     initStringInfo(&buf);
     642          26 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     643             : 
     644             :     /*
     645             :      * Try to execute the query.  Note that since libpq uses malloc, the
     646             :      * PGresult will be long-lived even though we are still in a short-lived
     647             :      * memory context.
     648             :      */
     649          26 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     650          52 :     if (!res ||
     651          52 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     652          26 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     653             :     {
     654          10 :         dblink_res_error(conn, conname, res, fail,
     655             :                          "while fetching from cursor \"%s\"", curname);
     656           6 :         return (Datum) 0;
     657             :     }
     658          16 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     659             :     {
     660             :         /* cursor does not exist - closed already or bad name */
     661           0 :         PQclear(res);
     662           0 :         ereport(ERROR,
     663             :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     664             :                  errmsg("cursor \"%s\" does not exist", curname)));
     665             :     }
     666             : 
     667          16 :     materializeResult(fcinfo, conn, res);
     668          14 :     return (Datum) 0;
     669             : }
     670             : 
     671             : /*
     672             :  * Note: this is the new preferred version of dblink
     673             :  */
     674          36 : PG_FUNCTION_INFO_V1(dblink_record);
     675             : Datum
     676          56 : dblink_record(PG_FUNCTION_ARGS)
     677             : {
     678          56 :     return dblink_record_internal(fcinfo, false);
     679             : }
     680             : 
     681           8 : PG_FUNCTION_INFO_V1(dblink_send_query);
     682             : Datum
     683          12 : dblink_send_query(PG_FUNCTION_ARGS)
     684             : {
     685             :     PGconn     *conn;
     686             :     char       *sql;
     687             :     int         retval;
     688             : 
     689          12 :     if (PG_NARGS() == 2)
     690             :     {
     691          12 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     692          12 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     693             :     }
     694             :     else
     695             :         /* shouldn't happen */
     696           0 :         elog(ERROR, "wrong number of arguments");
     697             : 
     698             :     /* async query send */
     699          12 :     retval = PQsendQuery(conn, sql);
     700          12 :     if (retval != 1)
     701           0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     702             : 
     703          12 :     PG_RETURN_INT32(retval);
     704             : }
     705             : 
     706          12 : PG_FUNCTION_INFO_V1(dblink_get_result);
     707             : Datum
     708          16 : dblink_get_result(PG_FUNCTION_ARGS)
     709             : {
     710          16 :     return dblink_record_internal(fcinfo, true);
     711             : }
     712             : 
     713             : static Datum
     714          72 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     715             : {
     716          72 :     PGconn     *volatile conn = NULL;
     717          72 :     volatile bool freeconn = false;
     718             : 
     719          72 :     prepTuplestoreResult(fcinfo);
     720             : 
     721          72 :     dblink_init();
     722             : 
     723          72 :     PG_TRY();
     724             :     {
     725          72 :         char       *sql = NULL;
     726          72 :         char       *conname = NULL;
     727          72 :         bool        fail = true;    /* default to backward compatible */
     728             : 
     729          72 :         if (!is_async)
     730             :         {
     731          56 :             if (PG_NARGS() == 3)
     732             :             {
     733             :                 /* text,text,bool */
     734           2 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     735           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     736           2 :                 fail = PG_GETARG_BOOL(2);
     737           2 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     738             :             }
     739          54 :             else if (PG_NARGS() == 2)
     740             :             {
     741             :                 /* text,text or text,bool */
     742          40 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     743             :                 {
     744           2 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     745           2 :                     fail = PG_GETARG_BOOL(1);
     746           2 :                     conn = pconn->conn;
     747             :                 }
     748             :                 else
     749             :                 {
     750          38 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     751          38 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     752          38 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     753             :                 }
     754             :             }
     755          14 :             else if (PG_NARGS() == 1)
     756             :             {
     757             :                 /* text */
     758          14 :                 conn = pconn->conn;
     759          14 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     760             :             }
     761             :             else
     762             :                 /* shouldn't happen */
     763           0 :                 elog(ERROR, "wrong number of arguments");
     764             :         }
     765             :         else                    /* is_async */
     766             :         {
     767             :             /* get async result */
     768          16 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     769             : 
     770          16 :             if (PG_NARGS() == 2)
     771             :             {
     772             :                 /* text,bool */
     773           0 :                 fail = PG_GETARG_BOOL(1);
     774           0 :                 conn = dblink_get_named_conn(conname);
     775             :             }
     776          16 :             else if (PG_NARGS() == 1)
     777             :             {
     778             :                 /* text */
     779          16 :                 conn = dblink_get_named_conn(conname);
     780             :             }
     781             :             else
     782             :                 /* shouldn't happen */
     783           0 :                 elog(ERROR, "wrong number of arguments");
     784             :         }
     785             : 
     786          66 :         if (!conn)
     787           4 :             dblink_conn_not_avail(conname);
     788             : 
     789          62 :         if (!is_async)
     790             :         {
     791             :             /* synchronous query, use efficient tuple collection method */
     792          46 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     793             :         }
     794             :         else
     795             :         {
     796             :             /* async result retrieval, do it the old way */
     797          16 :             PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
     798             : 
     799             :             /* NULL means we're all done with the async results */
     800          16 :             if (res)
     801             :             {
     802          10 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     803          10 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     804             :                 {
     805           0 :                     dblink_res_error(conn, conname, res, fail,
     806             :                                      "while executing query");
     807             :                     /* if fail isn't set, we'll return an empty query result */
     808             :                 }
     809             :                 else
     810             :                 {
     811          10 :                     materializeResult(fcinfo, conn, res);
     812             :                 }
     813             :             }
     814             :         }
     815             :     }
     816          10 :     PG_FINALLY();
     817             :     {
     818             :         /* if needed, close the connection to the database */
     819          72 :         if (freeconn)
     820          10 :             libpqsrv_disconnect(conn);
     821             :     }
     822          72 :     PG_END_TRY();
     823             : 
     824          62 :     return (Datum) 0;
     825             : }
     826             : 
     827             : /*
     828             :  * Verify function caller can handle a tuplestore result, and set up for that.
     829             :  *
     830             :  * Note: if the caller returns without actually creating a tuplestore, the
     831             :  * executor will treat the function result as an empty set.
     832             :  */
     833             : static void
     834          98 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     835             : {
     836          98 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     837             : 
     838             :     /* check to see if query supports us returning a tuplestore */
     839          98 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     840           0 :         ereport(ERROR,
     841             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     842             :                  errmsg("set-valued function called in context that cannot accept a set")));
     843          98 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     844           0 :         ereport(ERROR,
     845             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     846             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     847             : 
     848             :     /* let the executor know we're sending back a tuplestore */
     849          98 :     rsinfo->returnMode = SFRM_Materialize;
     850             : 
     851             :     /* caller must fill these to return a non-empty result */
     852          98 :     rsinfo->setResult = NULL;
     853          98 :     rsinfo->setDesc = NULL;
     854          98 : }
     855             : 
     856             : /*
     857             :  * Copy the contents of the PGresult into a tuplestore to be returned
     858             :  * as the result of the current function.
     859             :  * The PGresult will be released in this function.
     860             :  */
     861             : static void
     862          26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     863             : {
     864          26 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     865             :     TupleDesc   tupdesc;
     866             :     bool        is_sql_cmd;
     867             :     int         ntuples;
     868             :     int         nfields;
     869             : 
     870             :     /* prepTuplestoreResult must have been called previously */
     871             :     Assert(rsinfo->returnMode == SFRM_Materialize);
     872             : 
     873          26 :     if (PQresultStatus(res) == PGRES_COMMAND_OK)
     874             :     {
     875           0 :         is_sql_cmd = true;
     876             : 
     877             :         /*
     878             :          * need a tuple descriptor representing one TEXT column to return the
     879             :          * command status string as our result tuple
     880             :          */
     881           0 :         tupdesc = CreateTemplateTupleDesc(1);
     882           0 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     883             :                            TEXTOID, -1, 0);
     884           0 :         ntuples = 1;
     885           0 :         nfields = 1;
     886             :     }
     887             :     else
     888             :     {
     889             :         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     890             : 
     891          26 :         is_sql_cmd = false;
     892             : 
     893             :         /* get a tuple descriptor for our result type */
     894          26 :         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     895             :         {
     896          26 :             case TYPEFUNC_COMPOSITE:
     897             :                 /* success */
     898          26 :                 break;
     899           0 :             case TYPEFUNC_RECORD:
     900             :                 /* failed to determine actual type of RECORD */
     901           0 :                 ereport(ERROR,
     902             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     903             :                          errmsg("function returning record called in context "
     904             :                                 "that cannot accept type record")));
     905             :                 break;
     906           0 :             default:
     907             :                 /* result type isn't composite */
     908           0 :                 elog(ERROR, "return type must be a row type");
     909             :                 break;
     910             :         }
     911             : 
     912             :         /* make sure we have a persistent copy of the tupdesc */
     913          26 :         tupdesc = CreateTupleDescCopy(tupdesc);
     914          26 :         ntuples = PQntuples(res);
     915          26 :         nfields = PQnfields(res);
     916             :     }
     917             : 
     918             :     /*
     919             :      * check result and tuple descriptor have the same number of columns
     920             :      */
     921          26 :     if (nfields != tupdesc->natts)
     922           0 :         ereport(ERROR,
     923             :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
     924             :                  errmsg("remote query result rowtype does not match "
     925             :                         "the specified FROM clause rowtype")));
     926             : 
     927          26 :     if (ntuples > 0)
     928             :     {
     929             :         AttInMetadata *attinmeta;
     930          26 :         int         nestlevel = -1;
     931             :         Tuplestorestate *tupstore;
     932             :         MemoryContext oldcontext;
     933             :         int         row;
     934             :         char      **values;
     935             : 
     936          26 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
     937             : 
     938             :         /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     939          26 :         if (!is_sql_cmd)
     940          26 :             nestlevel = applyRemoteGucs(conn);
     941             : 
     942          26 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     943          26 :         tupstore = tuplestore_begin_heap(true, false, work_mem);
     944          26 :         rsinfo->setResult = tupstore;
     945          26 :         rsinfo->setDesc = tupdesc;
     946          26 :         MemoryContextSwitchTo(oldcontext);
     947             : 
     948          26 :         values = palloc_array(char *, nfields);
     949             : 
     950             :         /* put all tuples into the tuplestore */
     951          98 :         for (row = 0; row < ntuples; row++)
     952             :         {
     953             :             HeapTuple   tuple;
     954             : 
     955          74 :             if (!is_sql_cmd)
     956             :             {
     957             :                 int         i;
     958             : 
     959         276 :                 for (i = 0; i < nfields; i++)
     960             :                 {
     961         202 :                     if (PQgetisnull(res, row, i))
     962           0 :                         values[i] = NULL;
     963             :                     else
     964         202 :                         values[i] = PQgetvalue(res, row, i);
     965             :                 }
     966             :             }
     967             :             else
     968             :             {
     969           0 :                 values[0] = PQcmdStatus(res);
     970             :             }
     971             : 
     972             :             /* build the tuple and put it into the tuplestore. */
     973          74 :             tuple = BuildTupleFromCStrings(attinmeta, values);
     974          72 :             tuplestore_puttuple(tupstore, tuple);
     975             :         }
     976             : 
     977             :         /* clean up GUC settings, if we changed any */
     978          24 :         restoreLocalGucs(nestlevel);
     979             :     }
     980             : 
     981          24 :     PQclear(res);
     982          24 : }
     983             : 
     984             : /*
     985             :  * Execute the given SQL command and store its results into a tuplestore
     986             :  * to be returned as the result of the current function.
     987             :  *
     988             :  * This is equivalent to PQexec followed by materializeResult, but we make
     989             :  * use of libpq's single-row mode to avoid accumulating the whole result
     990             :  * inside libpq before it gets transferred to the tuplestore.
     991             :  */
     992             : static void
     993          46 : materializeQueryResult(FunctionCallInfo fcinfo,
     994             :                        PGconn *conn,
     995             :                        const char *conname,
     996             :                        const char *sql,
     997             :                        bool fail)
     998             : {
     999          46 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1000             : 
    1001             :     /* prepTuplestoreResult must have been called previously */
    1002             :     Assert(rsinfo->returnMode == SFRM_Materialize);
    1003             : 
    1004             :     /* Use a PG_TRY block to ensure we pump libpq dry of results */
    1005          46 :     PG_TRY();
    1006             :     {
    1007          46 :         storeInfo   sinfo = {0};
    1008             :         PGresult   *res;
    1009             : 
    1010          46 :         sinfo.fcinfo = fcinfo;
    1011             :         /* Create short-lived memory context for data conversions */
    1012          46 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1013             :                                                  "dblink temporary context",
    1014             :                                                  ALLOCSET_DEFAULT_SIZES);
    1015             : 
    1016             :         /* execute query, collecting any tuples into the tuplestore */
    1017          46 :         res = storeQueryResult(&sinfo, conn, sql);
    1018             : 
    1019          46 :         if (!res ||
    1020          46 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1021          46 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1022             :         {
    1023           4 :             dblink_res_error(conn, conname, res, fail,
    1024             :                              "while executing query");
    1025             :             /* if fail isn't set, we'll return an empty query result */
    1026             :         }
    1027          42 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1028             :         {
    1029             :             /*
    1030             :              * storeRow didn't get called, so we need to convert the command
    1031             :              * status string to a tuple manually
    1032             :              */
    1033             :             TupleDesc   tupdesc;
    1034             :             AttInMetadata *attinmeta;
    1035             :             Tuplestorestate *tupstore;
    1036             :             HeapTuple   tuple;
    1037             :             char       *values[1];
    1038             :             MemoryContext oldcontext;
    1039             : 
    1040             :             /*
    1041             :              * need a tuple descriptor representing one TEXT column to return
    1042             :              * the command status string as our result tuple
    1043             :              */
    1044           0 :             tupdesc = CreateTemplateTupleDesc(1);
    1045           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1046             :                                TEXTOID, -1, 0);
    1047           0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1048             : 
    1049           0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1050           0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1051           0 :             rsinfo->setResult = tupstore;
    1052           0 :             rsinfo->setDesc = tupdesc;
    1053           0 :             MemoryContextSwitchTo(oldcontext);
    1054             : 
    1055           0 :             values[0] = PQcmdStatus(res);
    1056             : 
    1057             :             /* build the tuple and put it into the tuplestore. */
    1058           0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1059           0 :             tuplestore_puttuple(tupstore, tuple);
    1060             : 
    1061           0 :             PQclear(res);
    1062             :         }
    1063             :         else
    1064             :         {
    1065             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1066             :             /* storeRow should have created a tuplestore */
    1067             :             Assert(rsinfo->setResult != NULL);
    1068             : 
    1069          42 :             PQclear(res);
    1070             :         }
    1071             : 
    1072             :         /* clean up data conversion short-lived memory context */
    1073          46 :         if (sinfo.tmpcontext != NULL)
    1074          46 :             MemoryContextDelete(sinfo.tmpcontext);
    1075             : 
    1076          46 :         PQclear(sinfo.last_res);
    1077          46 :         PQclear(sinfo.cur_res);
    1078             :     }
    1079           0 :     PG_CATCH();
    1080             :     {
    1081             :         PGresult   *res;
    1082             : 
    1083             :         /* be sure to clear out any pending data in libpq */
    1084           0 :         while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
    1085             :                NULL)
    1086           0 :             PQclear(res);
    1087           0 :         PG_RE_THROW();
    1088             :     }
    1089          46 :     PG_END_TRY();
    1090          46 : }
    1091             : 
    1092             : /*
    1093             :  * Execute query, and send any result rows to sinfo->tuplestore.
    1094             :  */
    1095             : static PGresult *
    1096          46 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
    1097             : {
    1098          46 :     bool        first = true;
    1099          46 :     int         nestlevel = -1;
    1100             :     PGresult   *res;
    1101             : 
    1102          46 :     if (!PQsendQuery(conn, sql))
    1103           0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1104             : 
    1105          46 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1106           0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1107             : 
    1108             :     for (;;)
    1109             :     {
    1110         396 :         CHECK_FOR_INTERRUPTS();
    1111             : 
    1112         396 :         sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
    1113         396 :         if (!sinfo->cur_res)
    1114          46 :             break;
    1115             : 
    1116         350 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1117             :         {
    1118             :             /* got one row from possibly-bigger resultset */
    1119             : 
    1120             :             /*
    1121             :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1122             :              * We shouldn't do this until we have a row in hand, to ensure
    1123             :              * libpq has seen any earlier ParameterStatus protocol messages.
    1124             :              */
    1125         304 :             if (first && nestlevel < 0)
    1126          42 :                 nestlevel = applyRemoteGucs(conn);
    1127             : 
    1128         304 :             storeRow(sinfo, sinfo->cur_res, first);
    1129             : 
    1130         304 :             PQclear(sinfo->cur_res);
    1131         304 :             sinfo->cur_res = NULL;
    1132         304 :             first = false;
    1133             :         }
    1134             :         else
    1135             :         {
    1136             :             /* if empty resultset, fill tuplestore header */
    1137          46 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1138           0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1139             : 
    1140             :             /* store completed result at last_res */
    1141          46 :             PQclear(sinfo->last_res);
    1142          46 :             sinfo->last_res = sinfo->cur_res;
    1143          46 :             sinfo->cur_res = NULL;
    1144          46 :             first = true;
    1145             :         }
    1146             :     }
    1147             : 
    1148             :     /* clean up GUC settings, if we changed any */
    1149          46 :     restoreLocalGucs(nestlevel);
    1150             : 
    1151             :     /* return last_res */
    1152          46 :     res = sinfo->last_res;
    1153          46 :     sinfo->last_res = NULL;
    1154          46 :     return res;
    1155             : }
    1156             : 
    1157             : /*
    1158             :  * Send single row to sinfo->tuplestore.
    1159             :  *
    1160             :  * If "first" is true, create the tuplestore using PGresult's metadata
    1161             :  * (in this case the PGresult might contain either zero or one row).
    1162             :  */
    1163             : static void
    1164         304 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
    1165             : {
    1166         304 :     int         nfields = PQnfields(res);
    1167             :     HeapTuple   tuple;
    1168             :     int         i;
    1169             :     MemoryContext oldcontext;
    1170             : 
    1171         304 :     if (first)
    1172             :     {
    1173             :         /* Prepare for new result set */
    1174          42 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1175             :         TupleDesc   tupdesc;
    1176             : 
    1177             :         /*
    1178             :          * It's possible to get more than one result set if the query string
    1179             :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1180             :          * traditional behavior of throwing away all but the last result.
    1181             :          */
    1182          42 :         if (sinfo->tuplestore)
    1183           0 :             tuplestore_end(sinfo->tuplestore);
    1184          42 :         sinfo->tuplestore = NULL;
    1185             : 
    1186             :         /* get a tuple descriptor for our result type */
    1187          42 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1188             :         {
    1189          42 :             case TYPEFUNC_COMPOSITE:
    1190             :                 /* success */
    1191          42 :                 break;
    1192           0 :             case TYPEFUNC_RECORD:
    1193             :                 /* failed to determine actual type of RECORD */
    1194           0 :                 ereport(ERROR,
    1195             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1196             :                          errmsg("function returning record called in context "
    1197             :                                 "that cannot accept type record")));
    1198             :                 break;
    1199           0 :             default:
    1200             :                 /* result type isn't composite */
    1201           0 :                 elog(ERROR, "return type must be a row type");
    1202             :                 break;
    1203             :         }
    1204             : 
    1205             :         /* make sure we have a persistent copy of the tupdesc */
    1206          42 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1207             : 
    1208             :         /* check result and tuple descriptor have the same number of columns */
    1209          42 :         if (nfields != tupdesc->natts)
    1210           0 :             ereport(ERROR,
    1211             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1212             :                      errmsg("remote query result rowtype does not match "
    1213             :                             "the specified FROM clause rowtype")));
    1214             : 
    1215             :         /* Prepare attinmeta for later data conversions */
    1216          42 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1217             : 
    1218             :         /* Create a new, empty tuplestore */
    1219          42 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1220          42 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1221          42 :         rsinfo->setResult = sinfo->tuplestore;
    1222          42 :         rsinfo->setDesc = tupdesc;
    1223          42 :         MemoryContextSwitchTo(oldcontext);
    1224             : 
    1225             :         /* Done if empty resultset */
    1226          42 :         if (PQntuples(res) == 0)
    1227           0 :             return;
    1228             : 
    1229             :         /*
    1230             :          * Set up sufficiently-wide string pointers array; this won't change
    1231             :          * in size so it's easy to preallocate.
    1232             :          */
    1233          42 :         if (sinfo->cstrs)
    1234           0 :             pfree(sinfo->cstrs);
    1235          42 :         sinfo->cstrs = palloc_array(char *, nfields);
    1236             :     }
    1237             : 
    1238             :     /* Should have a single-row result if we get here */
    1239             :     Assert(PQntuples(res) == 1);
    1240             : 
    1241             :     /*
    1242             :      * Do the following work in a temp context that we reset after each tuple.
    1243             :      * This cleans up not only the data we have direct access to, but any
    1244             :      * cruft the I/O functions might leak.
    1245             :      */
    1246         304 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1247             : 
    1248             :     /*
    1249             :      * Fill cstrs with null-terminated strings of column values.
    1250             :      */
    1251        1140 :     for (i = 0; i < nfields; i++)
    1252             :     {
    1253         836 :         if (PQgetisnull(res, 0, i))
    1254           0 :             sinfo->cstrs[i] = NULL;
    1255             :         else
    1256         836 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1257             :     }
    1258             : 
    1259             :     /* Convert row to a tuple, and add it to the tuplestore */
    1260         304 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1261             : 
    1262         304 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1263             : 
    1264             :     /* Clean up */
    1265         304 :     MemoryContextSwitchTo(oldcontext);
    1266         304 :     MemoryContextReset(sinfo->tmpcontext);
    1267             : }
    1268             : 
    1269             : /*
    1270             :  * List all open dblink connections by name.
    1271             :  * Returns an array of all connection names.
    1272             :  * Takes no params
    1273             :  */
    1274           6 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1275             : Datum
    1276           2 : dblink_get_connections(PG_FUNCTION_ARGS)
    1277             : {
    1278             :     HASH_SEQ_STATUS status;
    1279             :     remoteConnHashEnt *hentry;
    1280           2 :     ArrayBuildState *astate = NULL;
    1281             : 
    1282           2 :     if (remoteConnHash)
    1283             :     {
    1284           2 :         hash_seq_init(&status, remoteConnHash);
    1285           8 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1286             :         {
    1287             :             /* ignore it if it's not an open connection */
    1288           6 :             if (hentry->rconn.conn == NULL)
    1289           0 :                 continue;
    1290             :             /* stash away current value */
    1291           6 :             astate = accumArrayResult(astate,
    1292           6 :                                       CStringGetTextDatum(hentry->name),
    1293             :                                       false, TEXTOID, CurrentMemoryContext);
    1294             :         }
    1295             :     }
    1296             : 
    1297           2 :     if (astate)
    1298           2 :         PG_RETURN_DATUM(makeArrayResult(astate,
    1299             :                                         CurrentMemoryContext));
    1300             :     else
    1301           0 :         PG_RETURN_NULL();
    1302             : }
    1303             : 
    1304             : /*
    1305             :  * Checks if a given remote connection is busy
    1306             :  *
    1307             :  * Returns 1 if the connection is busy, 0 otherwise
    1308             :  * Params:
    1309             :  *  text connection_name - name of the connection to check
    1310             :  *
    1311             :  */
    1312           6 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1313             : Datum
    1314           2 : dblink_is_busy(PG_FUNCTION_ARGS)
    1315             : {
    1316             :     PGconn     *conn;
    1317             : 
    1318           2 :     dblink_init();
    1319           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1320             : 
    1321           2 :     PQconsumeInput(conn);
    1322           2 :     PG_RETURN_INT32(PQisBusy(conn));
    1323             : }
    1324             : 
    1325             : /*
    1326             :  * Cancels a running request on a connection
    1327             :  *
    1328             :  * Returns text:
    1329             :  *  "OK" if the cancel request has been sent correctly,
    1330             :  *      an error message otherwise
    1331             :  *
    1332             :  * Params:
    1333             :  *  text connection_name - name of the connection to check
    1334             :  *
    1335             :  */
    1336           6 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1337             : Datum
    1338           2 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1339             : {
    1340             :     PGconn     *conn;
    1341             :     const char *msg;
    1342             :     TimestampTz endtime;
    1343             : 
    1344           2 :     dblink_init();
    1345           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1346           2 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1347             :                                           30000);
    1348           2 :     msg = libpqsrv_cancel(conn, endtime);
    1349           2 :     if (msg == NULL)
    1350           2 :         msg = "OK";
    1351             : 
    1352           2 :     PG_RETURN_TEXT_P(cstring_to_text(msg));
    1353             : }
    1354             : 
    1355             : 
    1356             : /*
    1357             :  * Get error message from a connection
    1358             :  *
    1359             :  * Returns text:
    1360             :  *  "OK" if no error, an error message otherwise
    1361             :  *
    1362             :  * Params:
    1363             :  *  text connection_name - name of the connection to check
    1364             :  *
    1365             :  */
    1366           6 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1367             : Datum
    1368           2 : dblink_error_message(PG_FUNCTION_ARGS)
    1369             : {
    1370             :     char       *msg;
    1371             :     PGconn     *conn;
    1372             : 
    1373           2 :     dblink_init();
    1374           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1375             : 
    1376           2 :     msg = PQerrorMessage(conn);
    1377           2 :     if (msg == NULL || msg[0] == '\0')
    1378           2 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1379             :     else
    1380           0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1381             : }
    1382             : 
    1383             : /*
    1384             :  * Execute an SQL non-SELECT command
    1385             :  */
    1386          26 : PG_FUNCTION_INFO_V1(dblink_exec);
    1387             : Datum
    1388          52 : dblink_exec(PG_FUNCTION_ARGS)
    1389             : {
    1390          52 :     text       *volatile sql_cmd_status = NULL;
    1391          52 :     PGconn     *volatile conn = NULL;
    1392          52 :     volatile bool freeconn = false;
    1393             : 
    1394          52 :     dblink_init();
    1395             : 
    1396          52 :     PG_TRY();
    1397             :     {
    1398          52 :         PGresult   *res = NULL;
    1399          52 :         char       *sql = NULL;
    1400          52 :         char       *conname = NULL;
    1401          52 :         bool        fail = true;    /* default to backward compatible behavior */
    1402             : 
    1403          52 :         if (PG_NARGS() == 3)
    1404             :         {
    1405             :             /* must be text,text,bool */
    1406           0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1407           0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1408           0 :             fail = PG_GETARG_BOOL(2);
    1409           0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1410             :         }
    1411          52 :         else if (PG_NARGS() == 2)
    1412             :         {
    1413             :             /* might be text,text or text,bool */
    1414          34 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1415             :             {
    1416           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1417           2 :                 fail = PG_GETARG_BOOL(1);
    1418           2 :                 conn = pconn->conn;
    1419             :             }
    1420             :             else
    1421             :             {
    1422          32 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1423          32 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1424          32 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1425             :             }
    1426             :         }
    1427          18 :         else if (PG_NARGS() == 1)
    1428             :         {
    1429             :             /* must be single text argument */
    1430          18 :             conn = pconn->conn;
    1431          18 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1432             :         }
    1433             :         else
    1434             :             /* shouldn't happen */
    1435           0 :             elog(ERROR, "wrong number of arguments");
    1436             : 
    1437          52 :         if (!conn)
    1438           0 :             dblink_conn_not_avail(conname);
    1439             : 
    1440          52 :         res = libpqsrv_exec(conn, sql, dblink_we_get_result);
    1441          52 :         if (!res ||
    1442          52 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1443           4 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1444             :         {
    1445           4 :             dblink_res_error(conn, conname, res, fail,
    1446             :                              "while executing command");
    1447             : 
    1448             :             /*
    1449             :              * and save a copy of the command status string to return as our
    1450             :              * result tuple
    1451             :              */
    1452           2 :             sql_cmd_status = cstring_to_text("ERROR");
    1453             :         }
    1454          48 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1455             :         {
    1456             :             /*
    1457             :              * and save a copy of the command status string to return as our
    1458             :              * result tuple
    1459             :              */
    1460          48 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1461          48 :             PQclear(res);
    1462             :         }
    1463             :         else
    1464             :         {
    1465           0 :             PQclear(res);
    1466           0 :             ereport(ERROR,
    1467             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1468             :                      errmsg("statement returning results not allowed")));
    1469             :         }
    1470             :     }
    1471           2 :     PG_FINALLY();
    1472             :     {
    1473             :         /* if needed, close the connection to the database */
    1474          52 :         if (freeconn)
    1475           2 :             libpqsrv_disconnect(conn);
    1476             :     }
    1477          52 :     PG_END_TRY();
    1478             : 
    1479          50 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1480             : }
    1481             : 
    1482             : 
    1483             : /*
    1484             :  * dblink_get_pkey
    1485             :  *
    1486             :  * Return list of primary key fields for the supplied relation,
    1487             :  * or NULL if none exists.
    1488             :  */
    1489           6 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1490             : Datum
    1491          18 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1492             : {
    1493             :     int16       indnkeyatts;
    1494             :     char      **results;
    1495             :     FuncCallContext *funcctx;
    1496             :     int32       call_cntr;
    1497             :     int32       max_calls;
    1498             :     AttInMetadata *attinmeta;
    1499             :     MemoryContext oldcontext;
    1500             : 
    1501             :     /* stuff done only on the first call of the function */
    1502          18 :     if (SRF_IS_FIRSTCALL())
    1503             :     {
    1504             :         Relation    rel;
    1505             :         TupleDesc   tupdesc;
    1506             : 
    1507             :         /* create a function context for cross-call persistence */
    1508           6 :         funcctx = SRF_FIRSTCALL_INIT();
    1509             : 
    1510             :         /*
    1511             :          * switch to memory context appropriate for multiple function calls
    1512             :          */
    1513           6 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1514             : 
    1515             :         /* open target relation */
    1516           6 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1517             : 
    1518             :         /* get the array of attnums */
    1519           6 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1520             : 
    1521           6 :         relation_close(rel, AccessShareLock);
    1522             : 
    1523             :         /*
    1524             :          * need a tuple descriptor representing one INT and one TEXT column
    1525             :          */
    1526           6 :         tupdesc = CreateTemplateTupleDesc(2);
    1527           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1528             :                            INT4OID, -1, 0);
    1529           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1530             :                            TEXTOID, -1, 0);
    1531             : 
    1532             :         /*
    1533             :          * Generate attribute metadata needed later to produce tuples from raw
    1534             :          * C strings
    1535             :          */
    1536           6 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1537           6 :         funcctx->attinmeta = attinmeta;
    1538             : 
    1539           6 :         if ((results != NULL) && (indnkeyatts > 0))
    1540             :         {
    1541           6 :             funcctx->max_calls = indnkeyatts;
    1542             : 
    1543             :             /* got results, keep track of them */
    1544           6 :             funcctx->user_fctx = results;
    1545             :         }
    1546             :         else
    1547             :         {
    1548             :             /* fast track when no results */
    1549           0 :             MemoryContextSwitchTo(oldcontext);
    1550           0 :             SRF_RETURN_DONE(funcctx);
    1551             :         }
    1552             : 
    1553           6 :         MemoryContextSwitchTo(oldcontext);
    1554             :     }
    1555             : 
    1556             :     /* stuff done on every call of the function */
    1557          18 :     funcctx = SRF_PERCALL_SETUP();
    1558             : 
    1559             :     /*
    1560             :      * initialize per-call variables
    1561             :      */
    1562          18 :     call_cntr = funcctx->call_cntr;
    1563          18 :     max_calls = funcctx->max_calls;
    1564             : 
    1565          18 :     results = (char **) funcctx->user_fctx;
    1566          18 :     attinmeta = funcctx->attinmeta;
    1567             : 
    1568          18 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1569             :     {
    1570             :         char      **values;
    1571             :         HeapTuple   tuple;
    1572             :         Datum       result;
    1573             : 
    1574          12 :         values = palloc_array(char *, 2);
    1575          12 :         values[0] = psprintf("%d", call_cntr + 1);
    1576          12 :         values[1] = results[call_cntr];
    1577             : 
    1578             :         /* build the tuple */
    1579          12 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1580             : 
    1581             :         /* make the tuple into a datum */
    1582          12 :         result = HeapTupleGetDatum(tuple);
    1583             : 
    1584          12 :         SRF_RETURN_NEXT(funcctx, result);
    1585             :     }
    1586             :     else
    1587             :     {
    1588             :         /* do when there is no more left */
    1589           6 :         SRF_RETURN_DONE(funcctx);
    1590             :     }
    1591             : }
    1592             : 
    1593             : 
    1594             : /*
    1595             :  * dblink_build_sql_insert
    1596             :  *
    1597             :  * Used to generate an SQL insert statement
    1598             :  * based on an existing tuple in a local relation.
    1599             :  * This is useful for selectively replicating data
    1600             :  * to another server via dblink.
    1601             :  *
    1602             :  * API:
    1603             :  * <relname> - name of local table of interest
    1604             :  * <pkattnums> - an int2vector of attnums which will be used
    1605             :  * to identify the local tuple of interest
    1606             :  * <pknumatts> - number of attnums in pkattnums
    1607             :  * <src_pkattvals_arry> - text array of key values which will be used
    1608             :  * to identify the local tuple of interest
    1609             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1610             :  * to build the string for execution remotely. These are substituted
    1611             :  * for their counterparts in src_pkattvals_arry
    1612             :  */
    1613           8 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1614             : Datum
    1615          12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1616             : {
    1617          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1618          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1619          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1620          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1621          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1622             :     Relation    rel;
    1623             :     int        *pkattnums;
    1624             :     int         pknumatts;
    1625             :     char      **src_pkattvals;
    1626             :     char      **tgt_pkattvals;
    1627             :     int         src_nitems;
    1628             :     int         tgt_nitems;
    1629             :     char       *sql;
    1630             : 
    1631             :     /*
    1632             :      * Open target relation.
    1633             :      */
    1634          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1635             : 
    1636             :     /*
    1637             :      * Process pkattnums argument.
    1638             :      */
    1639          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1640             :                        &pkattnums, &pknumatts);
    1641             : 
    1642             :     /*
    1643             :      * Source array is made up of key values that will be used to locate the
    1644             :      * tuple of interest from the local system.
    1645             :      */
    1646           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1647             : 
    1648             :     /*
    1649             :      * There should be one source array key value for each key attnum
    1650             :      */
    1651           8 :     if (src_nitems != pknumatts)
    1652           0 :         ereport(ERROR,
    1653             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1654             :                  errmsg("source key array length must match number of key attributes")));
    1655             : 
    1656             :     /*
    1657             :      * Target array is made up of key values that will be used to build the
    1658             :      * SQL string for use on the remote system.
    1659             :      */
    1660           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1661             : 
    1662             :     /*
    1663             :      * There should be one target array key value for each key attnum
    1664             :      */
    1665           8 :     if (tgt_nitems != pknumatts)
    1666           0 :         ereport(ERROR,
    1667             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1668             :                  errmsg("target key array length must match number of key attributes")));
    1669             : 
    1670             :     /*
    1671             :      * Prep work is finally done. Go get the SQL string.
    1672             :      */
    1673           8 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1674             : 
    1675             :     /*
    1676             :      * Now we can close the relation.
    1677             :      */
    1678           8 :     relation_close(rel, AccessShareLock);
    1679             : 
    1680             :     /*
    1681             :      * And send it
    1682             :      */
    1683           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1684             : }
    1685             : 
    1686             : 
    1687             : /*
    1688             :  * dblink_build_sql_delete
    1689             :  *
    1690             :  * Used to generate an SQL delete statement.
    1691             :  * This is useful for selectively replicating a
    1692             :  * delete to another server via dblink.
    1693             :  *
    1694             :  * API:
    1695             :  * <relname> - name of remote table of interest
    1696             :  * <pkattnums> - an int2vector of attnums which will be used
    1697             :  * to identify the remote tuple of interest
    1698             :  * <pknumatts> - number of attnums in pkattnums
    1699             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1700             :  * to build the string for execution remotely.
    1701             :  */
    1702           8 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1703             : Datum
    1704          12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1705             : {
    1706          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1707          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1708          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1709          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1710             :     Relation    rel;
    1711             :     int        *pkattnums;
    1712             :     int         pknumatts;
    1713             :     char      **tgt_pkattvals;
    1714             :     int         tgt_nitems;
    1715             :     char       *sql;
    1716             : 
    1717             :     /*
    1718             :      * Open target relation.
    1719             :      */
    1720          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1721             : 
    1722             :     /*
    1723             :      * Process pkattnums argument.
    1724             :      */
    1725          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1726             :                        &pkattnums, &pknumatts);
    1727             : 
    1728             :     /*
    1729             :      * Target array is made up of key values that will be used to build the
    1730             :      * SQL string for use on the remote system.
    1731             :      */
    1732           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1733             : 
    1734             :     /*
    1735             :      * There should be one target array key value for each key attnum
    1736             :      */
    1737           8 :     if (tgt_nitems != pknumatts)
    1738           0 :         ereport(ERROR,
    1739             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1740             :                  errmsg("target key array length must match number of key attributes")));
    1741             : 
    1742             :     /*
    1743             :      * Prep work is finally done. Go get the SQL string.
    1744             :      */
    1745           8 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1746             : 
    1747             :     /*
    1748             :      * Now we can close the relation.
    1749             :      */
    1750           8 :     relation_close(rel, AccessShareLock);
    1751             : 
    1752             :     /*
    1753             :      * And send it
    1754             :      */
    1755           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1756             : }
    1757             : 
    1758             : 
    1759             : /*
    1760             :  * dblink_build_sql_update
    1761             :  *
    1762             :  * Used to generate an SQL update statement
    1763             :  * based on an existing tuple in a local relation.
    1764             :  * This is useful for selectively replicating data
    1765             :  * to another server via dblink.
    1766             :  *
    1767             :  * API:
    1768             :  * <relname> - name of local table of interest
    1769             :  * <pkattnums> - an int2vector of attnums which will be used
    1770             :  * to identify the local tuple of interest
    1771             :  * <pknumatts> - number of attnums in pkattnums
    1772             :  * <src_pkattvals_arry> - text array of key values which will be used
    1773             :  * to identify the local tuple of interest
    1774             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1775             :  * to build the string for execution remotely. These are substituted
    1776             :  * for their counterparts in src_pkattvals_arry
    1777             :  */
    1778           8 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1779             : Datum
    1780          12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1781             : {
    1782          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1783          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1784          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1785          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1786          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1787             :     Relation    rel;
    1788             :     int        *pkattnums;
    1789             :     int         pknumatts;
    1790             :     char      **src_pkattvals;
    1791             :     char      **tgt_pkattvals;
    1792             :     int         src_nitems;
    1793             :     int         tgt_nitems;
    1794             :     char       *sql;
    1795             : 
    1796             :     /*
    1797             :      * Open target relation.
    1798             :      */
    1799          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1800             : 
    1801             :     /*
    1802             :      * Process pkattnums argument.
    1803             :      */
    1804          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1805             :                        &pkattnums, &pknumatts);
    1806             : 
    1807             :     /*
    1808             :      * Source array is made up of key values that will be used to locate the
    1809             :      * tuple of interest from the local system.
    1810             :      */
    1811           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1812             : 
    1813             :     /*
    1814             :      * There should be one source array key value for each key attnum
    1815             :      */
    1816           8 :     if (src_nitems != pknumatts)
    1817           0 :         ereport(ERROR,
    1818             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1819             :                  errmsg("source key array length must match number of key attributes")));
    1820             : 
    1821             :     /*
    1822             :      * Target array is made up of key values that will be used to build the
    1823             :      * SQL string for use on the remote system.
    1824             :      */
    1825           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1826             : 
    1827             :     /*
    1828             :      * There should be one target array key value for each key attnum
    1829             :      */
    1830           8 :     if (tgt_nitems != pknumatts)
    1831           0 :         ereport(ERROR,
    1832             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1833             :                  errmsg("target key array length must match number of key attributes")));
    1834             : 
    1835             :     /*
    1836             :      * Prep work is finally done. Go get the SQL string.
    1837             :      */
    1838           8 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1839             : 
    1840             :     /*
    1841             :      * Now we can close the relation.
    1842             :      */
    1843           8 :     relation_close(rel, AccessShareLock);
    1844             : 
    1845             :     /*
    1846             :      * And send it
    1847             :      */
    1848           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1849             : }
    1850             : 
    1851             : /*
    1852             :  * dblink_current_query
    1853             :  * return the current query string
    1854             :  * to allow its use in (among other things)
    1855             :  * rewrite rules
    1856             :  */
    1857           4 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1858             : Datum
    1859           0 : dblink_current_query(PG_FUNCTION_ARGS)
    1860             : {
    1861             :     /* This is now just an alias for the built-in function current_query() */
    1862           0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1863             : }
    1864             : 
    1865             : /*
    1866             :  * Retrieve async notifications for a connection.
    1867             :  *
    1868             :  * Returns a setof record of notifications, or an empty set if none received.
    1869             :  * Can optionally take a named connection as parameter, but uses the unnamed
    1870             :  * connection per default.
    1871             :  *
    1872             :  */
    1873             : #define DBLINK_NOTIFY_COLS      3
    1874             : 
    1875          10 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1876             : Datum
    1877           4 : dblink_get_notify(PG_FUNCTION_ARGS)
    1878             : {
    1879             :     PGconn     *conn;
    1880             :     PGnotify   *notify;
    1881           4 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1882             : 
    1883           4 :     dblink_init();
    1884           4 :     if (PG_NARGS() == 1)
    1885           0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1886             :     else
    1887           4 :         conn = pconn->conn;
    1888             : 
    1889           4 :     InitMaterializedSRF(fcinfo, 0);
    1890             : 
    1891           4 :     PQconsumeInput(conn);
    1892           8 :     while ((notify = PQnotifies(conn)) != NULL)
    1893             :     {
    1894             :         Datum       values[DBLINK_NOTIFY_COLS];
    1895             :         bool        nulls[DBLINK_NOTIFY_COLS];
    1896             : 
    1897           4 :         memset(values, 0, sizeof(values));
    1898           4 :         memset(nulls, 0, sizeof(nulls));
    1899             : 
    1900           4 :         if (notify->relname != NULL)
    1901           4 :             values[0] = CStringGetTextDatum(notify->relname);
    1902             :         else
    1903           0 :             nulls[0] = true;
    1904             : 
    1905           4 :         values[1] = Int32GetDatum(notify->be_pid);
    1906             : 
    1907           4 :         if (notify->extra != NULL)
    1908           4 :             values[2] = CStringGetTextDatum(notify->extra);
    1909             :         else
    1910           0 :             nulls[2] = true;
    1911             : 
    1912           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1913             : 
    1914           4 :         PQfreemem(notify);
    1915           4 :         PQconsumeInput(conn);
    1916             :     }
    1917             : 
    1918           4 :     return (Datum) 0;
    1919             : }
    1920             : 
    1921             : /*
    1922             :  * Validate the options given to a dblink foreign server or user mapping.
    1923             :  * Raise an error if any option is invalid.
    1924             :  *
    1925             :  * We just check the names of options here, so semantic errors in options,
    1926             :  * such as invalid numeric format, will be detected at the attempt to connect.
    1927             :  */
    1928          28 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1929             : Datum
    1930          38 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1931             : {
    1932          38 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1933          38 :     Oid         context = PG_GETARG_OID(1);
    1934             :     ListCell   *cell;
    1935             : 
    1936             :     static const PQconninfoOption *options = NULL;
    1937             : 
    1938             :     /*
    1939             :      * Get list of valid libpq options.
    1940             :      *
    1941             :      * To avoid unnecessary work, we get the list once and use it throughout
    1942             :      * the lifetime of this backend process.  We don't need to care about
    1943             :      * memory context issues, because PQconndefaults allocates with malloc.
    1944             :      */
    1945          38 :     if (!options)
    1946             :     {
    1947          24 :         options = PQconndefaults();
    1948          24 :         if (!options)           /* assume reason for failure is OOM */
    1949           0 :             ereport(ERROR,
    1950             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1951             :                      errmsg("out of memory"),
    1952             :                      errdetail("Could not get libpq's default connection options.")));
    1953             :     }
    1954             : 
    1955             :     /* Validate each supplied option. */
    1956         100 :     foreach(cell, options_list)
    1957             :     {
    1958          78 :         DefElem    *def = (DefElem *) lfirst(cell);
    1959             : 
    1960          78 :         if (!is_valid_dblink_fdw_option(options, def->defname, context))
    1961             :         {
    1962             :             /*
    1963             :              * Unknown option, or invalid option for the context specified, so
    1964             :              * complain about it.  Provide a hint with a valid option that
    1965             :              * looks similar, if there is one.
    1966             :              */
    1967             :             const PQconninfoOption *opt;
    1968             :             const char *closest_match;
    1969             :             ClosestMatchState match_state;
    1970          16 :             bool        has_valid_options = false;
    1971             : 
    1972          16 :             initClosestMatch(&match_state, def->defname, 4);
    1973         832 :             for (opt = options; opt->keyword; opt++)
    1974             :             {
    1975         816 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    1976             :                 {
    1977         186 :                     has_valid_options = true;
    1978         186 :                     updateClosestMatch(&match_state, opt->keyword);
    1979             :                 }
    1980             :             }
    1981             : 
    1982          16 :             closest_match = getClosestMatch(&match_state);
    1983          16 :             ereport(ERROR,
    1984             :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    1985             :                      errmsg("invalid option \"%s\"", def->defname),
    1986             :                      has_valid_options ? closest_match ?
    1987             :                      errhint("Perhaps you meant the option \"%s\".",
    1988             :                              closest_match) : 0 :
    1989             :                      errhint("There are no valid options in this context.")));
    1990             :         }
    1991             :     }
    1992             : 
    1993          22 :     PG_RETURN_VOID();
    1994             : }
    1995             : 
    1996             : 
    1997             : /*************************************************************
    1998             :  * internal functions
    1999             :  */
    2000             : 
    2001             : 
    2002             : /*
    2003             :  * get_pkey_attnames
    2004             :  *
    2005             :  * Get the primary key attnames for the given relation.
    2006             :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2007             :  */
    2008             : static char **
    2009           6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2010             : {
    2011             :     Relation    indexRelation;
    2012             :     ScanKeyData skey;
    2013             :     SysScanDesc scan;
    2014             :     HeapTuple   indexTuple;
    2015             :     int         i;
    2016           6 :     char      **result = NULL;
    2017             :     TupleDesc   tupdesc;
    2018             : 
    2019             :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2020           6 :     *indnkeyatts = 0;
    2021             : 
    2022           6 :     tupdesc = rel->rd_att;
    2023             : 
    2024             :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2025           6 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2026           6 :     ScanKeyInit(&skey,
    2027             :                 Anum_pg_index_indrelid,
    2028             :                 BTEqualStrategyNumber, F_OIDEQ,
    2029             :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2030             : 
    2031           6 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2032             :                               NULL, 1, &skey);
    2033             : 
    2034           6 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2035             :     {
    2036           6 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2037             : 
    2038             :         /* we're only interested if it is the primary key */
    2039           6 :         if (index->indisprimary)
    2040             :         {
    2041           6 :             *indnkeyatts = index->indnkeyatts;
    2042           6 :             if (*indnkeyatts > 0)
    2043             :             {
    2044           6 :                 result = palloc_array(char *, *indnkeyatts);
    2045             : 
    2046          18 :                 for (i = 0; i < *indnkeyatts; i++)
    2047          12 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2048             :             }
    2049           6 :             break;
    2050             :         }
    2051             :     }
    2052             : 
    2053           6 :     systable_endscan(scan);
    2054           6 :     table_close(indexRelation, AccessShareLock);
    2055             : 
    2056           6 :     return result;
    2057             : }
    2058             : 
    2059             : /*
    2060             :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2061             :  * returned as NULL pointers)
    2062             :  */
    2063             : static char **
    2064          40 : get_text_array_contents(ArrayType *array, int *numitems)
    2065             : {
    2066          40 :     int         ndim = ARR_NDIM(array);
    2067          40 :     int        *dims = ARR_DIMS(array);
    2068             :     int         nitems;
    2069             :     int16       typlen;
    2070             :     bool        typbyval;
    2071             :     char        typalign;
    2072             :     char      **values;
    2073             :     char       *ptr;
    2074             :     bits8      *bitmap;
    2075             :     int         bitmask;
    2076             :     int         i;
    2077             : 
    2078             :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2079             : 
    2080          40 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2081             : 
    2082          40 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2083             :                          &typlen, &typbyval, &typalign);
    2084             : 
    2085          40 :     values = palloc_array(char *, nitems);
    2086             : 
    2087          40 :     ptr = ARR_DATA_PTR(array);
    2088          40 :     bitmap = ARR_NULLBITMAP(array);
    2089          40 :     bitmask = 1;
    2090             : 
    2091         110 :     for (i = 0; i < nitems; i++)
    2092             :     {
    2093          70 :         if (bitmap && (*bitmap & bitmask) == 0)
    2094             :         {
    2095           0 :             values[i] = NULL;
    2096             :         }
    2097             :         else
    2098             :         {
    2099          70 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2100          70 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2101          70 :             ptr = (char *) att_align_nominal(ptr, typalign);
    2102             :         }
    2103             : 
    2104             :         /* advance bitmap pointer if any */
    2105          70 :         if (bitmap)
    2106             :         {
    2107           0 :             bitmask <<= 1;
    2108           0 :             if (bitmask == 0x100)
    2109             :             {
    2110           0 :                 bitmap++;
    2111           0 :                 bitmask = 1;
    2112             :             }
    2113             :         }
    2114             :     }
    2115             : 
    2116          40 :     return values;
    2117             : }
    2118             : 
    2119             : static char *
    2120           8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2121             : {
    2122             :     char       *relname;
    2123             :     HeapTuple   tuple;
    2124             :     TupleDesc   tupdesc;
    2125             :     int         natts;
    2126             :     StringInfoData buf;
    2127             :     char       *val;
    2128             :     int         key;
    2129             :     int         i;
    2130             :     bool        needComma;
    2131             : 
    2132           8 :     initStringInfo(&buf);
    2133             : 
    2134             :     /* get relation name including any needed schema prefix and quoting */
    2135           8 :     relname = generate_relation_name(rel);
    2136             : 
    2137           8 :     tupdesc = rel->rd_att;
    2138           8 :     natts = tupdesc->natts;
    2139             : 
    2140           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2141           8 :     if (!tuple)
    2142           0 :         ereport(ERROR,
    2143             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2144             :                  errmsg("source row not found")));
    2145             : 
    2146           8 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2147             : 
    2148           8 :     needComma = false;
    2149          38 :     for (i = 0; i < natts; i++)
    2150             :     {
    2151          30 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2152             : 
    2153          30 :         if (att->attisdropped)
    2154           4 :             continue;
    2155             : 
    2156          26 :         if (needComma)
    2157          18 :             appendStringInfoChar(&buf, ',');
    2158             : 
    2159          26 :         appendStringInfoString(&buf,
    2160          26 :                                quote_ident_cstr(NameStr(att->attname)));
    2161          26 :         needComma = true;
    2162             :     }
    2163             : 
    2164           8 :     appendStringInfoString(&buf, ") VALUES(");
    2165             : 
    2166             :     /*
    2167             :      * Note: i is physical column number (counting from 0).
    2168             :      */
    2169           8 :     needComma = false;
    2170          38 :     for (i = 0; i < natts; i++)
    2171             :     {
    2172          30 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2173           4 :             continue;
    2174             : 
    2175          26 :         if (needComma)
    2176          18 :             appendStringInfoChar(&buf, ',');
    2177             : 
    2178          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2179             : 
    2180          26 :         if (key >= 0)
    2181          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2182             :         else
    2183          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2184             : 
    2185          26 :         if (val != NULL)
    2186             :         {
    2187          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2188          26 :             pfree(val);
    2189             :         }
    2190             :         else
    2191           0 :             appendStringInfoString(&buf, "NULL");
    2192          26 :         needComma = true;
    2193             :     }
    2194           8 :     appendStringInfoChar(&buf, ')');
    2195             : 
    2196           8 :     return buf.data;
    2197             : }
    2198             : 
    2199             : static char *
    2200           8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2201             : {
    2202             :     char       *relname;
    2203             :     TupleDesc   tupdesc;
    2204             :     StringInfoData buf;
    2205             :     int         i;
    2206             : 
    2207           8 :     initStringInfo(&buf);
    2208             : 
    2209             :     /* get relation name including any needed schema prefix and quoting */
    2210           8 :     relname = generate_relation_name(rel);
    2211             : 
    2212           8 :     tupdesc = rel->rd_att;
    2213             : 
    2214           8 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2215          22 :     for (i = 0; i < pknumatts; i++)
    2216             :     {
    2217          14 :         int         pkattnum = pkattnums[i];
    2218          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2219             : 
    2220          14 :         if (i > 0)
    2221           6 :             appendStringInfoString(&buf, " AND ");
    2222             : 
    2223          14 :         appendStringInfoString(&buf,
    2224          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2225             : 
    2226          14 :         if (tgt_pkattvals[i] != NULL)
    2227          14 :             appendStringInfo(&buf, " = %s",
    2228          14 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2229             :         else
    2230           0 :             appendStringInfoString(&buf, " IS NULL");
    2231             :     }
    2232             : 
    2233           8 :     return buf.data;
    2234             : }
    2235             : 
    2236             : static char *
    2237           8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2238             : {
    2239             :     char       *relname;
    2240             :     HeapTuple   tuple;
    2241             :     TupleDesc   tupdesc;
    2242             :     int         natts;
    2243             :     StringInfoData buf;
    2244             :     char       *val;
    2245             :     int         key;
    2246             :     int         i;
    2247             :     bool        needComma;
    2248             : 
    2249           8 :     initStringInfo(&buf);
    2250             : 
    2251             :     /* get relation name including any needed schema prefix and quoting */
    2252           8 :     relname = generate_relation_name(rel);
    2253             : 
    2254           8 :     tupdesc = rel->rd_att;
    2255           8 :     natts = tupdesc->natts;
    2256             : 
    2257           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2258           8 :     if (!tuple)
    2259           0 :         ereport(ERROR,
    2260             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2261             :                  errmsg("source row not found")));
    2262             : 
    2263           8 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2264             : 
    2265             :     /*
    2266             :      * Note: i is physical column number (counting from 0).
    2267             :      */
    2268           8 :     needComma = false;
    2269          38 :     for (i = 0; i < natts; i++)
    2270             :     {
    2271          30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2272             : 
    2273          30 :         if (attr->attisdropped)
    2274           4 :             continue;
    2275             : 
    2276          26 :         if (needComma)
    2277          18 :             appendStringInfoString(&buf, ", ");
    2278             : 
    2279          26 :         appendStringInfo(&buf, "%s = ",
    2280          26 :                          quote_ident_cstr(NameStr(attr->attname)));
    2281             : 
    2282          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2283             : 
    2284          26 :         if (key >= 0)
    2285          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2286             :         else
    2287          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2288             : 
    2289          26 :         if (val != NULL)
    2290             :         {
    2291          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2292          26 :             pfree(val);
    2293             :         }
    2294             :         else
    2295           0 :             appendStringInfoString(&buf, "NULL");
    2296          26 :         needComma = true;
    2297             :     }
    2298             : 
    2299           8 :     appendStringInfoString(&buf, " WHERE ");
    2300             : 
    2301          22 :     for (i = 0; i < pknumatts; i++)
    2302             :     {
    2303          14 :         int         pkattnum = pkattnums[i];
    2304          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2305             : 
    2306          14 :         if (i > 0)
    2307           6 :             appendStringInfoString(&buf, " AND ");
    2308             : 
    2309          14 :         appendStringInfoString(&buf,
    2310          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2311             : 
    2312          14 :         val = tgt_pkattvals[i];
    2313             : 
    2314          14 :         if (val != NULL)
    2315          14 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2316             :         else
    2317           0 :             appendStringInfoString(&buf, " IS NULL");
    2318             :     }
    2319             : 
    2320           8 :     return buf.data;
    2321             : }
    2322             : 
    2323             : /*
    2324             :  * Return a properly quoted identifier.
    2325             :  * Uses quote_ident in quote.c
    2326             :  */
    2327             : static char *
    2328         160 : quote_ident_cstr(char *rawstr)
    2329             : {
    2330             :     text       *rawstr_text;
    2331             :     text       *result_text;
    2332             :     char       *result;
    2333             : 
    2334         160 :     rawstr_text = cstring_to_text(rawstr);
    2335         160 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2336             :                                                      PointerGetDatum(rawstr_text)));
    2337         160 :     result = text_to_cstring(result_text);
    2338             : 
    2339         160 :     return result;
    2340             : }
    2341             : 
    2342             : static int
    2343          52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2344             : {
    2345             :     int         i;
    2346             : 
    2347             :     /*
    2348             :      * Not likely a long list anyway, so just scan for the value
    2349             :      */
    2350         100 :     for (i = 0; i < pknumatts; i++)
    2351          76 :         if (key == pkattnums[i])
    2352          28 :             return i;
    2353             : 
    2354          24 :     return -1;
    2355             : }
    2356             : 
    2357             : static HeapTuple
    2358          16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2359             : {
    2360             :     char       *relname;
    2361             :     TupleDesc   tupdesc;
    2362             :     int         natts;
    2363             :     StringInfoData buf;
    2364             :     int         ret;
    2365             :     HeapTuple   tuple;
    2366             :     int         i;
    2367             : 
    2368             :     /*
    2369             :      * Connect to SPI manager
    2370             :      */
    2371          16 :     SPI_connect();
    2372             : 
    2373          16 :     initStringInfo(&buf);
    2374             : 
    2375             :     /* get relation name including any needed schema prefix and quoting */
    2376          16 :     relname = generate_relation_name(rel);
    2377             : 
    2378          16 :     tupdesc = rel->rd_att;
    2379          16 :     natts = tupdesc->natts;
    2380             : 
    2381             :     /*
    2382             :      * Build sql statement to look up tuple of interest, ie, the one matching
    2383             :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2384             :      * generate a result tuple that matches the table's physical structure,
    2385             :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2386             :      * different tupdescs and everything's very confusing.
    2387             :      */
    2388          16 :     appendStringInfoString(&buf, "SELECT ");
    2389             : 
    2390          76 :     for (i = 0; i < natts; i++)
    2391             :     {
    2392          60 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2393             : 
    2394          60 :         if (i > 0)
    2395          44 :             appendStringInfoString(&buf, ", ");
    2396             : 
    2397          60 :         if (attr->attisdropped)
    2398           8 :             appendStringInfoString(&buf, "NULL");
    2399             :         else
    2400          52 :             appendStringInfoString(&buf,
    2401          52 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2402             :     }
    2403             : 
    2404          16 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2405             : 
    2406          44 :     for (i = 0; i < pknumatts; i++)
    2407             :     {
    2408          28 :         int         pkattnum = pkattnums[i];
    2409          28 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2410             : 
    2411          28 :         if (i > 0)
    2412          12 :             appendStringInfoString(&buf, " AND ");
    2413             : 
    2414          28 :         appendStringInfoString(&buf,
    2415          28 :                                quote_ident_cstr(NameStr(attr->attname)));
    2416             : 
    2417          28 :         if (src_pkattvals[i] != NULL)
    2418          28 :             appendStringInfo(&buf, " = %s",
    2419          28 :                              quote_literal_cstr(src_pkattvals[i]));
    2420             :         else
    2421           0 :             appendStringInfoString(&buf, " IS NULL");
    2422             :     }
    2423             : 
    2424             :     /*
    2425             :      * Retrieve the desired tuple
    2426             :      */
    2427          16 :     ret = SPI_exec(buf.data, 0);
    2428          16 :     pfree(buf.data);
    2429             : 
    2430             :     /*
    2431             :      * Only allow one qualifying tuple
    2432             :      */
    2433          16 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2434           0 :         ereport(ERROR,
    2435             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2436             :                  errmsg("source criteria matched more than one record")));
    2437             : 
    2438          16 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2439             :     {
    2440          16 :         SPITupleTable *tuptable = SPI_tuptable;
    2441             : 
    2442          16 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2443          16 :         SPI_finish();
    2444             : 
    2445          16 :         return tuple;
    2446             :     }
    2447             :     else
    2448             :     {
    2449             :         /*
    2450             :          * no qualifying tuples
    2451             :          */
    2452           0 :         SPI_finish();
    2453             : 
    2454           0 :         return NULL;
    2455             :     }
    2456             : 
    2457             :     /*
    2458             :      * never reached, but keep compiler quiet
    2459             :      */
    2460             :     return NULL;
    2461             : }
    2462             : 
    2463             : /*
    2464             :  * Open the relation named by relname_text, acquire specified type of lock,
    2465             :  * verify we have specified permissions.
    2466             :  * Caller must close rel when done with it.
    2467             :  */
    2468             : static Relation
    2469          42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2470             : {
    2471             :     RangeVar   *relvar;
    2472             :     Relation    rel;
    2473             :     AclResult   aclresult;
    2474             : 
    2475          42 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2476          42 :     rel = table_openrv(relvar, lockmode);
    2477             : 
    2478          42 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    2479             :                                   aclmode);
    2480          42 :     if (aclresult != ACLCHECK_OK)
    2481           0 :         aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
    2482           0 :                        RelationGetRelationName(rel));
    2483             : 
    2484          42 :     return rel;
    2485             : }
    2486             : 
    2487             : /*
    2488             :  * generate_relation_name - copied from ruleutils.c
    2489             :  *      Compute the name to display for a relation
    2490             :  *
    2491             :  * The result includes all necessary quoting and schema-prefixing.
    2492             :  */
    2493             : static char *
    2494          40 : generate_relation_name(Relation rel)
    2495             : {
    2496             :     char       *nspname;
    2497             :     char       *result;
    2498             : 
    2499             :     /* Qualify the name if not visible in search path */
    2500          40 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2501          30 :         nspname = NULL;
    2502             :     else
    2503          10 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2504             : 
    2505          40 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2506             : 
    2507          40 :     return result;
    2508             : }
    2509             : 
    2510             : 
    2511             : static remoteConn *
    2512         156 : getConnectionByName(const char *name)
    2513             : {
    2514             :     remoteConnHashEnt *hentry;
    2515             :     char       *key;
    2516             : 
    2517         156 :     if (!remoteConnHash)
    2518          10 :         remoteConnHash = createConnHash();
    2519             : 
    2520         156 :     key = pstrdup(name);
    2521         156 :     truncate_identifier(key, strlen(key), false);
    2522         156 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2523             :                                                key, HASH_FIND, NULL);
    2524             : 
    2525         156 :     if (hentry && hentry->rconn.conn != NULL)
    2526         136 :         return &hentry->rconn;
    2527             : 
    2528          20 :     return NULL;
    2529             : }
    2530             : 
    2531             : static HTAB *
    2532          12 : createConnHash(void)
    2533             : {
    2534             :     HASHCTL     ctl;
    2535             : 
    2536          12 :     ctl.keysize = NAMEDATALEN;
    2537          12 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2538             : 
    2539          12 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2540             :                        HASH_ELEM | HASH_STRINGS);
    2541             : }
    2542             : 
    2543             : static remoteConn *
    2544          20 : createNewConnection(const char *name)
    2545             : {
    2546             :     remoteConnHashEnt *hentry;
    2547             :     bool        found;
    2548             :     char       *key;
    2549             : 
    2550          20 :     if (!remoteConnHash)
    2551           2 :         remoteConnHash = createConnHash();
    2552             : 
    2553          20 :     key = pstrdup(name);
    2554          20 :     truncate_identifier(key, strlen(key), true);
    2555          20 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2556             :                                                HASH_ENTER, &found);
    2557             : 
    2558          20 :     if (found && hentry->rconn.conn != NULL)
    2559           2 :         ereport(ERROR,
    2560             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2561             :                  errmsg("duplicate connection name")));
    2562             : 
    2563             :     /* New, or reusable, so initialize the rconn struct to zeroes */
    2564          18 :     memset(&hentry->rconn, 0, sizeof(remoteConn));
    2565             : 
    2566          18 :     return &hentry->rconn;
    2567             : }
    2568             : 
    2569             : static void
    2570          16 : deleteConnection(const char *name)
    2571             : {
    2572             :     remoteConnHashEnt *hentry;
    2573             :     bool        found;
    2574             :     char       *key;
    2575             : 
    2576          16 :     if (!remoteConnHash)
    2577           0 :         remoteConnHash = createConnHash();
    2578             : 
    2579          16 :     key = pstrdup(name);
    2580          16 :     truncate_identifier(key, strlen(key), false);
    2581          16 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2582             :                                                key, HASH_REMOVE, &found);
    2583             : 
    2584          16 :     if (!hentry)
    2585           0 :         ereport(ERROR,
    2586             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2587             :                  errmsg("undefined connection name")));
    2588          16 : }
    2589             : 
    2590             :  /*
    2591             :   * Ensure that require_auth and SCRAM keys are correctly set on connstr.
    2592             :   * SCRAM keys used to pass-through are coming from the initial connection
    2593             :   * from the client with the server.
    2594             :   *
    2595             :   * All required SCRAM options are set by dblink, so we just need to ensure
    2596             :   * that these options are not overwritten by the user.
    2597             :   *
    2598             :   * See appendSCRAMKeysInfo and its usage for more.
    2599             :   */
    2600             : bool
    2601          10 : dblink_connstr_has_required_scram_options(const char *connstr)
    2602             : {
    2603             :     PQconninfoOption *options;
    2604          10 :     bool        has_scram_server_key = false;
    2605          10 :     bool        has_scram_client_key = false;
    2606          10 :     bool        has_require_auth = false;
    2607          10 :     bool        has_scram_keys = false;
    2608             : 
    2609          10 :     options = PQconninfoParse(connstr, NULL);
    2610          10 :     if (options)
    2611             :     {
    2612             :         /*
    2613             :          * Continue iterating even if we found the keys that we need to
    2614             :          * validate to make sure that there is no other declaration of these
    2615             :          * keys that can overwrite the first.
    2616             :          */
    2617         520 :         for (PQconninfoOption *option = options; option->keyword != NULL; option++)
    2618             :         {
    2619         510 :             if (strcmp(option->keyword, "require_auth") == 0)
    2620             :             {
    2621          10 :                 if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
    2622           8 :                     has_require_auth = true;
    2623             :                 else
    2624           2 :                     has_require_auth = false;
    2625             :             }
    2626             : 
    2627         510 :             if (strcmp(option->keyword, "scram_client_key") == 0)
    2628             :             {
    2629          10 :                 if (option->val != NULL && option->val[0] != '\0')
    2630          10 :                     has_scram_client_key = true;
    2631             :                 else
    2632           0 :                     has_scram_client_key = false;
    2633             :             }
    2634             : 
    2635         510 :             if (strcmp(option->keyword, "scram_server_key") == 0)
    2636             :             {
    2637          10 :                 if (option->val != NULL && option->val[0] != '\0')
    2638          10 :                     has_scram_server_key = true;
    2639             :                 else
    2640           0 :                     has_scram_server_key = false;
    2641             :             }
    2642             :         }
    2643          10 :         PQconninfoFree(options);
    2644             :     }
    2645             : 
    2646          10 :     has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
    2647             : 
    2648          10 :     return (has_scram_keys && has_require_auth);
    2649             : }
    2650             : 
    2651             : /*
    2652             :  * We need to make sure that the connection made used credentials
    2653             :  * which were provided by the user, so check what credentials were
    2654             :  * used to connect and then make sure that they came from the user.
    2655             :  *
    2656             :  * On failure, we close "conn" and also delete the hashtable entry
    2657             :  * identified by "connname" (if that's not NULL).
    2658             :  */
    2659             : static void
    2660          40 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
    2661             : {
    2662             :     /* Superuser bypasses security check */
    2663          40 :     if (superuser())
    2664          36 :         return;
    2665             : 
    2666             :     /* If password was used to connect, make sure it was one provided */
    2667           4 :     if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
    2668           0 :         return;
    2669             : 
    2670             :     /*
    2671             :      * Password was not used to connect, check if SCRAM pass-through is in
    2672             :      * use.
    2673             :      *
    2674             :      * If dblink_connstr_has_required_scram_options is true we assume that
    2675             :      * UseScramPassthrough is also true because the required SCRAM keys are
    2676             :      * only added if UseScramPassthrough is set, and the user is not allowed
    2677             :      * to add the SCRAM keys on fdw and user mapping options.
    2678             :      */
    2679           4 :     if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2680           4 :         return;
    2681             : 
    2682             : #ifdef ENABLE_GSS
    2683             :     /* If GSSAPI creds used to connect, make sure it was one delegated */
    2684             :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
    2685             :         return;
    2686             : #endif
    2687             : 
    2688             :     /* Otherwise, fail out */
    2689           0 :     libpqsrv_disconnect(conn);
    2690           0 :     if (connname)
    2691           0 :         deleteConnection(connname);
    2692             : 
    2693           0 :     ereport(ERROR,
    2694             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2695             :              errmsg("password or GSSAPI delegated credentials required"),
    2696             :              errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
    2697             :              errhint("Ensure provided credentials match target server's authentication method.")));
    2698             : }
    2699             : 
    2700             : /*
    2701             :  * Function to check if the connection string includes an explicit
    2702             :  * password, needed to ensure that non-superuser password-based auth
    2703             :  * is using a provided password and not one picked up from the
    2704             :  * environment.
    2705             :  */
    2706             : static bool
    2707          12 : dblink_connstr_has_pw(const char *connstr)
    2708             : {
    2709             :     PQconninfoOption *options;
    2710             :     PQconninfoOption *option;
    2711          12 :     bool        connstr_gives_password = false;
    2712             : 
    2713          12 :     options = PQconninfoParse(connstr, NULL);
    2714          12 :     if (options)
    2715             :     {
    2716         624 :         for (option = options; option->keyword != NULL; option++)
    2717             :         {
    2718         612 :             if (strcmp(option->keyword, "password") == 0)
    2719             :             {
    2720          12 :                 if (option->val != NULL && option->val[0] != '\0')
    2721             :                 {
    2722           0 :                     connstr_gives_password = true;
    2723           0 :                     break;
    2724             :                 }
    2725             :             }
    2726             :         }
    2727          12 :         PQconninfoFree(options);
    2728             :     }
    2729             : 
    2730          12 :     return connstr_gives_password;
    2731             : }
    2732             : 
    2733             : /*
    2734             :  * For non-superusers, insist that the connstr specify a password, except if
    2735             :  * GSSAPI credentials have been delegated (and we check that they are used for
    2736             :  * the connection in dblink_security_check later) or if SCRAM pass-through is
    2737             :  * being used.  This prevents a password or GSSAPI credentials from being
    2738             :  * picked up from .pgpass, a service file, the environment, etc.  We don't want
    2739             :  * the postgres user's passwords or Kerberos credentials to be accessible to
    2740             :  * non-superusers. In case of SCRAM pass-through insist that the connstr
    2741             :  * has the required SCRAM pass-through options.
    2742             :  */
    2743             : static void
    2744          50 : dblink_connstr_check(const char *connstr)
    2745             : {
    2746          50 :     if (superuser())
    2747          42 :         return;
    2748             : 
    2749           8 :     if (dblink_connstr_has_pw(connstr))
    2750           0 :         return;
    2751             : 
    2752           8 :     if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2753           4 :         return;
    2754             : 
    2755             : #ifdef ENABLE_GSS
    2756             :     if (be_gssapi_get_delegation(MyProcPort))
    2757             :         return;
    2758             : #endif
    2759             : 
    2760           4 :     ereport(ERROR,
    2761             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2762             :              errmsg("password or GSSAPI delegated credentials required"),
    2763             :              errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
    2764             : }
    2765             : 
    2766             : /*
    2767             :  * Report an error received from the remote server
    2768             :  *
    2769             :  * res: the received error result
    2770             :  * fail: true for ERROR ereport, false for NOTICE
    2771             :  * fmt and following args: sprintf-style format and values for errcontext;
    2772             :  * the resulting string should be worded like "while <some action>"
    2773             :  *
    2774             :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
    2775             :  * in which case memory context cleanup will clear it eventually).
    2776             :  */
    2777             : static void
    2778          24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2779             :                  bool fail, const char *fmt,...)
    2780             : {
    2781             :     int         level;
    2782          24 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2783          24 :     char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2784          24 :     char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2785          24 :     char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2786          24 :     char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2787             :     int         sqlstate;
    2788             :     va_list     ap;
    2789             :     char        dblink_context_msg[512];
    2790             : 
    2791          24 :     if (fail)
    2792           6 :         level = ERROR;
    2793             :     else
    2794          18 :         level = NOTICE;
    2795             : 
    2796          24 :     if (pg_diag_sqlstate)
    2797          24 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2798             :                                  pg_diag_sqlstate[1],
    2799             :                                  pg_diag_sqlstate[2],
    2800             :                                  pg_diag_sqlstate[3],
    2801             :                                  pg_diag_sqlstate[4]);
    2802             :     else
    2803           0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2804             : 
    2805             :     /*
    2806             :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2807             :      * needed because for connection-level failures, PQgetResult may just
    2808             :      * return NULL, not a PGresult at all.
    2809             :      */
    2810          24 :     if (message_primary == NULL)
    2811           0 :         message_primary = pchomp(PQerrorMessage(conn));
    2812             : 
    2813             :     /*
    2814             :      * Format the basic errcontext string.  Below, we'll add on something
    2815             :      * about the connection name.  That's a violation of the translatability
    2816             :      * guidelines about constructing error messages out of parts, but since
    2817             :      * there's no translation support for dblink, there's no need to worry
    2818             :      * about that (yet).
    2819             :      */
    2820          24 :     va_start(ap, fmt);
    2821          24 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2822          24 :     va_end(ap);
    2823             : 
    2824          24 :     ereport(level,
    2825             :             (errcode(sqlstate),
    2826             :              (message_primary != NULL && message_primary[0] != '\0') ?
    2827             :              errmsg_internal("%s", message_primary) :
    2828             :              errmsg("could not obtain message string for remote error"),
    2829             :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2830             :              message_hint ? errhint("%s", message_hint) : 0,
    2831             :              message_context ? (errcontext("%s", message_context)) : 0,
    2832             :              conname ?
    2833             :              (errcontext("%s on dblink connection named \"%s\"",
    2834             :                          dblink_context_msg, conname)) :
    2835             :              (errcontext("%s on unnamed dblink connection",
    2836             :                          dblink_context_msg))));
    2837          18 :     PQclear(res);
    2838          18 : }
    2839             : 
    2840             : /*
    2841             :  * Obtain connection string for a foreign server
    2842             :  */
    2843             : static char *
    2844          50 : get_connect_string(const char *servername)
    2845             : {
    2846          50 :     ForeignServer *foreign_server = NULL;
    2847             :     UserMapping *user_mapping;
    2848             :     ListCell   *cell;
    2849             :     StringInfoData buf;
    2850             :     ForeignDataWrapper *fdw;
    2851             :     AclResult   aclresult;
    2852             :     char       *srvname;
    2853             : 
    2854             :     static const PQconninfoOption *options = NULL;
    2855             : 
    2856          50 :     initStringInfo(&buf);
    2857             : 
    2858             :     /*
    2859             :      * Get list of valid libpq options.
    2860             :      *
    2861             :      * To avoid unnecessary work, we get the list once and use it throughout
    2862             :      * the lifetime of this backend process.  We don't need to care about
    2863             :      * memory context issues, because PQconndefaults allocates with malloc.
    2864             :      */
    2865          50 :     if (!options)
    2866             :     {
    2867          12 :         options = PQconndefaults();
    2868          12 :         if (!options)           /* assume reason for failure is OOM */
    2869           0 :             ereport(ERROR,
    2870             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2871             :                      errmsg("out of memory"),
    2872             :                      errdetail("Could not get libpq's default connection options.")));
    2873             :     }
    2874             : 
    2875             :     /* first gather the server connstr options */
    2876          50 :     srvname = pstrdup(servername);
    2877          50 :     truncate_identifier(srvname, strlen(srvname), false);
    2878          50 :     foreign_server = GetForeignServerByName(srvname, true);
    2879             : 
    2880          50 :     if (foreign_server)
    2881             :     {
    2882          10 :         Oid         serverid = foreign_server->serverid;
    2883          10 :         Oid         fdwid = foreign_server->fdwid;
    2884          10 :         Oid         userid = GetUserId();
    2885             : 
    2886          10 :         user_mapping = GetUserMapping(userid, serverid);
    2887          10 :         fdw = GetForeignDataWrapper(fdwid);
    2888             : 
    2889             :         /* Check permissions, user must have usage on the server. */
    2890          10 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2891          10 :         if (aclresult != ACLCHECK_OK)
    2892           0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2893             : 
    2894             :         /*
    2895             :          * First append hardcoded options needed for SCRAM pass-through, so if
    2896             :          * the user overwrites these options we can ereport on
    2897             :          * dblink_connstr_check and dblink_security_check.
    2898             :          */
    2899          10 :         if (MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
    2900           6 :             appendSCRAMKeysInfo(&buf);
    2901             : 
    2902          10 :         foreach(cell, fdw->options)
    2903             :         {
    2904           0 :             DefElem    *def = lfirst(cell);
    2905             : 
    2906           0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2907           0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2908           0 :                                  escape_param_str(strVal(def->arg)));
    2909             :         }
    2910             : 
    2911          44 :         foreach(cell, foreign_server->options)
    2912             :         {
    2913          34 :             DefElem    *def = lfirst(cell);
    2914             : 
    2915          34 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2916          28 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2917          28 :                                  escape_param_str(strVal(def->arg)));
    2918             :         }
    2919             : 
    2920          20 :         foreach(cell, user_mapping->options)
    2921             :         {
    2922             : 
    2923          10 :             DefElem    *def = lfirst(cell);
    2924             : 
    2925          10 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2926          10 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2927          10 :                                  escape_param_str(strVal(def->arg)));
    2928             :         }
    2929             : 
    2930          10 :         return buf.data;
    2931             :     }
    2932             :     else
    2933          40 :         return NULL;
    2934             : }
    2935             : 
    2936             : /*
    2937             :  * Escaping libpq connect parameter strings.
    2938             :  *
    2939             :  * Replaces "'" with "\'" and "\" with "\\".
    2940             :  */
    2941             : static char *
    2942          38 : escape_param_str(const char *str)
    2943             : {
    2944             :     const char *cp;
    2945             :     StringInfoData buf;
    2946             : 
    2947          38 :     initStringInfo(&buf);
    2948             : 
    2949         344 :     for (cp = str; *cp; cp++)
    2950             :     {
    2951         306 :         if (*cp == '\\' || *cp == '\'')
    2952           0 :             appendStringInfoChar(&buf, '\\');
    2953         306 :         appendStringInfoChar(&buf, *cp);
    2954             :     }
    2955             : 
    2956          38 :     return buf.data;
    2957             : }
    2958             : 
    2959             : /*
    2960             :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2961             :  * functions, and translate to the internal representation.
    2962             :  *
    2963             :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2964             :  * argument (the need for the separate count argument is historical, but we
    2965             :  * still check it).  We check that each attnum corresponds to a valid,
    2966             :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2967             :  * listed twice, though the actual use-case for such things is dubious.
    2968             :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2969             :  * physical not logical column numbers; this was changed for future-proofing.
    2970             :  *
    2971             :  * The internal representation is a palloc'd int array of 0-based physical
    2972             :  * attnums.
    2973             :  */
    2974             : static void
    2975          36 : validate_pkattnums(Relation rel,
    2976             :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2977             :                    int **pkattnums, int *pknumatts)
    2978             : {
    2979          36 :     TupleDesc   tupdesc = rel->rd_att;
    2980          36 :     int         natts = tupdesc->natts;
    2981             :     int         i;
    2982             : 
    2983             :     /* Don't take more array elements than there are */
    2984          36 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    2985             : 
    2986             :     /* Must have at least one pk attnum selected */
    2987          36 :     if (pknumatts_arg <= 0)
    2988           0 :         ereport(ERROR,
    2989             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2990             :                  errmsg("number of key attributes must be > 0")));
    2991             : 
    2992             :     /* Allocate output array */
    2993          36 :     *pkattnums = palloc_array(int, pknumatts_arg);
    2994          36 :     *pknumatts = pknumatts_arg;
    2995             : 
    2996             :     /* Validate attnums and convert to internal form */
    2997         114 :     for (i = 0; i < pknumatts_arg; i++)
    2998             :     {
    2999          90 :         int         pkattnum = pkattnums_arg->values[i];
    3000             :         int         lnum;
    3001             :         int         j;
    3002             : 
    3003             :         /* Can throw error immediately if out of range */
    3004          90 :         if (pkattnum <= 0 || pkattnum > natts)
    3005          12 :             ereport(ERROR,
    3006             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3007             :                      errmsg("invalid attribute number %d", pkattnum)));
    3008             : 
    3009             :         /* Identify which physical column has this logical number */
    3010          78 :         lnum = 0;
    3011         138 :         for (j = 0; j < natts; j++)
    3012             :         {
    3013             :             /* dropped columns don't count */
    3014         138 :             if (TupleDescAttr(tupdesc, j)->attisdropped)
    3015           6 :                 continue;
    3016             : 
    3017         132 :             if (++lnum == pkattnum)
    3018          78 :                 break;
    3019             :         }
    3020             : 
    3021          78 :         if (j < natts)
    3022          78 :             (*pkattnums)[i] = j;
    3023             :         else
    3024           0 :             ereport(ERROR,
    3025             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3026             :                      errmsg("invalid attribute number %d", pkattnum)));
    3027             :     }
    3028          24 : }
    3029             : 
    3030             : /*
    3031             :  * Check if the specified connection option is valid.
    3032             :  *
    3033             :  * We basically allow whatever libpq thinks is an option, with these
    3034             :  * restrictions:
    3035             :  *      debug options: disallowed
    3036             :  *      "client_encoding": disallowed
    3037             :  *      "user": valid only in USER MAPPING options
    3038             :  *      secure options (eg password): valid only in USER MAPPING options
    3039             :  *      others: valid only in FOREIGN SERVER options
    3040             :  *
    3041             :  * We disallow client_encoding because it would be overridden anyway via
    3042             :  * PQclientEncoding; allowing it to be specified would merely promote
    3043             :  * confusion.
    3044             :  */
    3045             : static bool
    3046         930 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    3047             :                        Oid context)
    3048             : {
    3049             :     const PQconninfoOption *opt;
    3050             : 
    3051             :     /* Look up the option in libpq result */
    3052       23090 :     for (opt = options; opt->keyword; opt++)
    3053             :     {
    3054       23080 :         if (strcmp(opt->keyword, option) == 0)
    3055         920 :             break;
    3056             :     }
    3057         930 :     if (opt->keyword == NULL)
    3058          10 :         return false;
    3059             : 
    3060             :     /* Disallow debug options (particularly "replication") */
    3061         920 :     if (strchr(opt->dispchar, 'D'))
    3062          68 :         return false;
    3063             : 
    3064             :     /* Disallow "client_encoding" */
    3065         852 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    3066          16 :         return false;
    3067             : 
    3068             :     /*
    3069             :      * Disallow OAuth options for now, since the builtin flow communicates on
    3070             :      * stderr by default and can't cache tokens yet.
    3071             :      */
    3072         836 :     if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
    3073          72 :         return false;
    3074             : 
    3075             :     /*
    3076             :      * If the option is "user" or marked secure, it should be specified only
    3077             :      * in USER MAPPING.  Others should be specified only in SERVER.
    3078             :      */
    3079         764 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3080             :     {
    3081          74 :         if (context != UserMappingRelationId)
    3082          18 :             return false;
    3083             :     }
    3084             :     else
    3085             :     {
    3086         690 :         if (context != ForeignServerRelationId)
    3087         468 :             return false;
    3088             :     }
    3089             : 
    3090         278 :     return true;
    3091             : }
    3092             : 
    3093             : /*
    3094             :  * Same as is_valid_dblink_option but also check for only dblink_fdw specific
    3095             :  * options.
    3096             :  */
    3097             : static bool
    3098          78 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
    3099             :                            Oid context)
    3100             : {
    3101          78 :     if (strcmp(option, "use_scram_passthrough") == 0)
    3102           8 :         return true;
    3103             : 
    3104          70 :     return is_valid_dblink_option(options, option, context);
    3105             : }
    3106             : 
    3107             : /*
    3108             :  * Copy the remote session's values of GUCs that affect datatype I/O
    3109             :  * and apply them locally in a new GUC nesting level.  Returns the new
    3110             :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3111             :  * or -1 if no new nestlevel was needed.
    3112             :  *
    3113             :  * We use the equivalent of a function SET option to allow the settings to
    3114             :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3115             :  * thrown in between, guc.c will take care of undoing the settings.
    3116             :  */
    3117             : static int
    3118          68 : applyRemoteGucs(PGconn *conn)
    3119             : {
    3120             :     static const char *const GUCsAffectingIO[] = {
    3121             :         "DateStyle",
    3122             :         "IntervalStyle"
    3123             :     };
    3124             : 
    3125          68 :     int         nestlevel = -1;
    3126             :     int         i;
    3127             : 
    3128         204 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3129             :     {
    3130         136 :         const char *gucName = GUCsAffectingIO[i];
    3131         136 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3132             :         const char *localVal;
    3133             : 
    3134             :         /*
    3135             :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3136             :          * that's okay because its output format won't be ambiguous.  So just
    3137             :          * skip the GUC if we don't get a value for it.  (We might eventually
    3138             :          * need more complicated logic with remote-version checks here.)
    3139             :          */
    3140         136 :         if (remoteVal == NULL)
    3141           0 :             continue;
    3142             : 
    3143             :         /*
    3144             :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3145             :          * have the same value.
    3146             :          */
    3147         136 :         localVal = GetConfigOption(gucName, false, false);
    3148             :         Assert(localVal != NULL);
    3149             : 
    3150         136 :         if (strcmp(remoteVal, localVal) == 0)
    3151         102 :             continue;
    3152             : 
    3153             :         /* Create new GUC nest level if we didn't already */
    3154          34 :         if (nestlevel < 0)
    3155          18 :             nestlevel = NewGUCNestLevel();
    3156             : 
    3157             :         /* Apply the option (this will throw error on failure) */
    3158          34 :         (void) set_config_option(gucName, remoteVal,
    3159             :                                  PGC_USERSET, PGC_S_SESSION,
    3160             :                                  GUC_ACTION_SAVE, true, 0, false);
    3161             :     }
    3162             : 
    3163          68 :     return nestlevel;
    3164             : }
    3165             : 
    3166             : /*
    3167             :  * Restore local GUCs after they have been overlaid with remote settings.
    3168             :  */
    3169             : static void
    3170          70 : restoreLocalGucs(int nestlevel)
    3171             : {
    3172             :     /* Do nothing if no new nestlevel was created */
    3173          70 :     if (nestlevel > 0)
    3174          16 :         AtEOXact_GUC(true, nestlevel);
    3175          70 : }
    3176             : 
    3177             : /*
    3178             :  * Append SCRAM client key and server key information from the global
    3179             :  * MyProcPort into the given StringInfo buffer.
    3180             :  */
    3181             : static void
    3182           6 : appendSCRAMKeysInfo(StringInfo buf)
    3183             : {
    3184             :     int         len;
    3185             :     int         encoded_len;
    3186             :     char       *client_key;
    3187             :     char       *server_key;
    3188             : 
    3189           6 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
    3190             :     /* don't forget the zero-terminator */
    3191           6 :     client_key = palloc0(len + 1);
    3192           6 :     encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
    3193             :                                 sizeof(MyProcPort->scram_ClientKey),
    3194             :                                 client_key, len);
    3195           6 :     if (encoded_len < 0)
    3196           0 :         elog(ERROR, "could not encode SCRAM client key");
    3197             : 
    3198           6 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
    3199             :     /* don't forget the zero-terminator */
    3200           6 :     server_key = palloc0(len + 1);
    3201           6 :     encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
    3202             :                                 sizeof(MyProcPort->scram_ServerKey),
    3203             :                                 server_key, len);
    3204           6 :     if (encoded_len < 0)
    3205           0 :         elog(ERROR, "could not encode SCRAM server key");
    3206             : 
    3207           6 :     appendStringInfo(buf, "scram_client_key='%s' ", client_key);
    3208           6 :     appendStringInfo(buf, "scram_server_key='%s' ", server_key);
    3209           6 :     appendStringInfoString(buf, "require_auth='scram-sha-256' ");
    3210             : 
    3211           6 :     pfree(client_key);
    3212           6 :     pfree(server_key);
    3213           6 : }
    3214             : 
    3215             : 
    3216             : static bool
    3217           6 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
    3218             : {
    3219             :     ListCell   *cell;
    3220             : 
    3221          24 :     foreach(cell, foreign_server->options)
    3222             :     {
    3223          24 :         DefElem    *def = lfirst(cell);
    3224             : 
    3225          24 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3226           6 :             return defGetBoolean(def);
    3227             :     }
    3228             : 
    3229           0 :     foreach(cell, user->options)
    3230             :     {
    3231           0 :         DefElem    *def = (DefElem *) lfirst(cell);
    3232             : 
    3233           0 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3234           0 :             return defGetBoolean(def);
    3235             :     }
    3236             : 
    3237           0 :     return false;
    3238             : }

Generated by: LCOV version 1.16