LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 936 1077 86.9 %
Date: 2024-12-12 17:14:55 Functions: 75 77 97.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * dblink.c
       3             :  *
       4             :  * Functions returning results from a remote database
       5             :  *
       6             :  * Joe Conway <mail@joeconway.com>
       7             :  * And contributors:
       8             :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9             :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10             :  *
      11             :  * contrib/dblink/dblink.c
      12             :  * Copyright (c) 2001-2024, PostgreSQL Global Development Group
      13             :  * ALL RIGHTS RESERVED;
      14             :  *
      15             :  * Permission to use, copy, modify, and distribute this software and its
      16             :  * documentation for any purpose, without fee, and without a written agreement
      17             :  * is hereby granted, provided that the above copyright notice and this
      18             :  * paragraph and the following two paragraphs appear in all copies.
      19             :  *
      20             :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21             :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22             :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23             :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24             :  * POSSIBILITY OF SUCH DAMAGE.
      25             :  *
      26             :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27             :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28             :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29             :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30             :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31             :  *
      32             :  */
      33             : #include "postgres.h"
      34             : 
      35             : #include <limits.h>
      36             : 
      37             : #include "access/htup_details.h"
      38             : #include "access/relation.h"
      39             : #include "access/reloptions.h"
      40             : #include "access/table.h"
      41             : #include "catalog/namespace.h"
      42             : #include "catalog/pg_foreign_data_wrapper.h"
      43             : #include "catalog/pg_foreign_server.h"
      44             : #include "catalog/pg_type.h"
      45             : #include "catalog/pg_user_mapping.h"
      46             : #include "executor/spi.h"
      47             : #include "foreign/foreign.h"
      48             : #include "funcapi.h"
      49             : #include "lib/stringinfo.h"
      50             : #include "libpq-fe.h"
      51             : #include "libpq/libpq-be.h"
      52             : #include "libpq/libpq-be-fe-helpers.h"
      53             : #include "mb/pg_wchar.h"
      54             : #include "miscadmin.h"
      55             : #include "parser/scansup.h"
      56             : #include "utils/acl.h"
      57             : #include "utils/builtins.h"
      58             : #include "utils/fmgroids.h"
      59             : #include "utils/guc.h"
      60             : #include "utils/lsyscache.h"
      61             : #include "utils/memutils.h"
      62             : #include "utils/rel.h"
      63             : #include "utils/varlena.h"
      64             : #include "utils/wait_event.h"
      65             : 
      66           6 : PG_MODULE_MAGIC;
      67             : 
      68             : typedef struct remoteConn
      69             : {
      70             :     PGconn     *conn;           /* Hold the remote connection */
      71             :     int         openCursorCount;    /* The number of open cursors */
      72             :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      73             : } remoteConn;
      74             : 
      75             : typedef struct storeInfo
      76             : {
      77             :     FunctionCallInfo fcinfo;
      78             :     Tuplestorestate *tuplestore;
      79             :     AttInMetadata *attinmeta;
      80             :     MemoryContext tmpcontext;
      81             :     char      **cstrs;
      82             :     /* temp storage for results to avoid leaks on exception */
      83             :     PGresult   *last_res;
      84             :     PGresult   *cur_res;
      85             : } storeInfo;
      86             : 
      87             : /*
      88             :  * Internal declarations
      89             :  */
      90             : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      91             : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      92             : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      93             :                               PGresult *res);
      94             : static void materializeQueryResult(FunctionCallInfo fcinfo,
      95             :                                    PGconn *conn,
      96             :                                    const char *conname,
      97             :                                    const char *sql,
      98             :                                    bool fail);
      99             : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
     100             : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
     101             : static remoteConn *getConnectionByName(const char *name);
     102             : static HTAB *createConnHash(void);
     103             : static void createNewConnection(const char *name, remoteConn *rconn);
     104             : static void deleteConnection(const char *name);
     105             : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     106             : static char **get_text_array_contents(ArrayType *array, int *numitems);
     107             : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     108             : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     109             : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     110             : static char *quote_ident_cstr(char *rawstr);
     111             : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     112             : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     113             : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     114             : static char *generate_relation_name(Relation rel);
     115             : static void dblink_connstr_check(const char *connstr);
     116             : static bool dblink_connstr_has_pw(const char *connstr);
     117             : static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
     118             : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     119             :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     120             : static char *get_connect_string(const char *servername);
     121             : static char *escape_param_str(const char *str);
     122             : static void validate_pkattnums(Relation rel,
     123             :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     124             :                                int **pkattnums, int *pknumatts);
     125             : static bool is_valid_dblink_option(const PQconninfoOption *options,
     126             :                                    const char *option, Oid context);
     127             : static int  applyRemoteGucs(PGconn *conn);
     128             : static void restoreLocalGucs(int nestlevel);
     129             : 
     130             : /* Global */
     131             : static remoteConn *pconn = NULL;
     132             : static HTAB *remoteConnHash = NULL;
     133             : 
     134             : /* custom wait event values, retrieved from shared memory */
     135             : static uint32 dblink_we_connect = 0;
     136             : static uint32 dblink_we_get_conn = 0;
     137             : static uint32 dblink_we_get_result = 0;
     138             : 
     139             : /*
     140             :  *  Following is list that holds multiple remote connections.
     141             :  *  Calling convention of each dblink function changes to accept
     142             :  *  connection name as the first parameter. The connection list is
     143             :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     144             :  */
     145             : 
     146             : typedef struct remoteConnHashEnt
     147             : {
     148             :     char        name[NAMEDATALEN];
     149             :     remoteConn *rconn;
     150             : } remoteConnHashEnt;
     151             : 
     152             : /* initial number of connection hashes */
     153             : #define NUMCONN 16
     154             : 
     155             : static char *
     156          96 : xpstrdup(const char *in)
     157             : {
     158          96 :     if (in == NULL)
     159          72 :         return NULL;
     160          24 :     return pstrdup(in);
     161             : }
     162             : 
     163             : static void
     164             : pg_attribute_noreturn()
     165           0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     166             : {
     167           0 :     char       *msg = pchomp(PQerrorMessage(conn));
     168             : 
     169           0 :     PQclear(res);
     170           0 :     elog(ERROR, "%s: %s", p2, msg);
     171             : }
     172             : 
     173             : static void
     174             : pg_attribute_noreturn()
     175           6 : dblink_conn_not_avail(const char *conname)
     176             : {
     177           6 :     if (conname)
     178           2 :         ereport(ERROR,
     179             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     180             :                  errmsg("connection \"%s\" not available", conname)));
     181             :     else
     182           4 :         ereport(ERROR,
     183             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     184             :                  errmsg("connection not available")));
     185             : }
     186             : 
     187             : static void
     188          66 : dblink_get_conn(char *conname_or_str,
     189             :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     190             : {
     191          66 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     192             :     PGconn     *conn;
     193             :     char       *conname;
     194             :     bool        freeconn;
     195             : 
     196          66 :     if (rconn)
     197             :     {
     198          54 :         conn = rconn->conn;
     199          54 :         conname = conname_or_str;
     200          54 :         freeconn = false;
     201             :     }
     202             :     else
     203             :     {
     204             :         const char *connstr;
     205             : 
     206          12 :         connstr = get_connect_string(conname_or_str);
     207          12 :         if (connstr == NULL)
     208          12 :             connstr = conname_or_str;
     209          12 :         dblink_connstr_check(connstr);
     210             : 
     211             :         /* first time, allocate or get the custom wait event */
     212          12 :         if (dblink_we_get_conn == 0)
     213           4 :             dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
     214             : 
     215             :         /* OK to make connection */
     216          12 :         conn = libpqsrv_connect(connstr, dblink_we_get_conn);
     217             : 
     218          12 :         if (PQstatus(conn) == CONNECTION_BAD)
     219             :         {
     220           4 :             char       *msg = pchomp(PQerrorMessage(conn));
     221             : 
     222           4 :             libpqsrv_disconnect(conn);
     223           4 :             ereport(ERROR,
     224             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     225             :                      errmsg("could not establish connection"),
     226             :                      errdetail_internal("%s", msg)));
     227             :         }
     228           8 :         dblink_security_check(conn, rconn, connstr);
     229           8 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     230           0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     231           8 :         freeconn = true;
     232           8 :         conname = NULL;
     233             :     }
     234             : 
     235          62 :     *conn_p = conn;
     236          62 :     *conname_p = conname;
     237          62 :     *freeconn_p = freeconn;
     238          62 : }
     239             : 
     240             : static PGconn *
     241          34 : dblink_get_named_conn(const char *conname)
     242             : {
     243          34 :     remoteConn *rconn = getConnectionByName(conname);
     244             : 
     245          34 :     if (rconn)
     246          34 :         return rconn->conn;
     247             : 
     248           0 :     dblink_conn_not_avail(conname);
     249             :     return NULL;                /* keep compiler quiet */
     250             : }
     251             : 
     252             : static void
     253         240 : dblink_init(void)
     254             : {
     255         240 :     if (!pconn)
     256             :     {
     257           6 :         if (dblink_we_get_result == 0)
     258           6 :             dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
     259             : 
     260           6 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     261           6 :         pconn->conn = NULL;
     262           6 :         pconn->openCursorCount = 0;
     263           6 :         pconn->newXactForCursor = false;
     264             :     }
     265         240 : }
     266             : 
     267             : /*
     268             :  * Create a persistent connection to another database
     269             :  */
     270          18 : PG_FUNCTION_INFO_V1(dblink_connect);
     271             : Datum
     272          32 : dblink_connect(PG_FUNCTION_ARGS)
     273             : {
     274          32 :     char       *conname_or_str = NULL;
     275          32 :     char       *connstr = NULL;
     276          32 :     char       *connname = NULL;
     277             :     char       *msg;
     278          32 :     PGconn     *conn = NULL;
     279          32 :     remoteConn *rconn = NULL;
     280             : 
     281          32 :     dblink_init();
     282             : 
     283          32 :     if (PG_NARGS() == 2)
     284             :     {
     285          22 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     286          22 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     287             :     }
     288          10 :     else if (PG_NARGS() == 1)
     289          10 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     290             : 
     291          32 :     if (connname)
     292             :     {
     293          22 :         rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
     294             :                                                   sizeof(remoteConn));
     295          22 :         rconn->conn = NULL;
     296          22 :         rconn->openCursorCount = 0;
     297          22 :         rconn->newXactForCursor = false;
     298             :     }
     299             : 
     300             :     /* first check for valid foreign data server */
     301          32 :     connstr = get_connect_string(conname_or_str);
     302          32 :     if (connstr == NULL)
     303          28 :         connstr = conname_or_str;
     304             : 
     305             :     /* check password in connection string if not superuser */
     306          32 :     dblink_connstr_check(connstr);
     307             : 
     308             :     /* first time, allocate or get the custom wait event */
     309          30 :     if (dblink_we_connect == 0)
     310           4 :         dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
     311             : 
     312             :     /* OK to make connection */
     313          30 :     conn = libpqsrv_connect(connstr, dblink_we_connect);
     314             : 
     315          30 :     if (PQstatus(conn) == CONNECTION_BAD)
     316             :     {
     317           0 :         msg = pchomp(PQerrorMessage(conn));
     318           0 :         libpqsrv_disconnect(conn);
     319           0 :         if (rconn)
     320           0 :             pfree(rconn);
     321             : 
     322           0 :         ereport(ERROR,
     323             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     324             :                  errmsg("could not establish connection"),
     325             :                  errdetail_internal("%s", msg)));
     326             :     }
     327             : 
     328             :     /* check password actually used if not superuser */
     329          30 :     dblink_security_check(conn, rconn, connstr);
     330             : 
     331             :     /* attempt to set client encoding to match server encoding, if needed */
     332          30 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     333           0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     334             : 
     335          30 :     if (connname)
     336             :     {
     337          20 :         rconn->conn = conn;
     338          20 :         createNewConnection(connname, rconn);
     339             :     }
     340             :     else
     341             :     {
     342          10 :         if (pconn->conn)
     343           2 :             libpqsrv_disconnect(pconn->conn);
     344          10 :         pconn->conn = conn;
     345             :     }
     346             : 
     347          28 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     348             : }
     349             : 
     350             : /*
     351             :  * Clear a persistent connection to another database
     352             :  */
     353          12 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     354             : Datum
     355          26 : dblink_disconnect(PG_FUNCTION_ARGS)
     356             : {
     357          26 :     char       *conname = NULL;
     358          26 :     remoteConn *rconn = NULL;
     359          26 :     PGconn     *conn = NULL;
     360             : 
     361          26 :     dblink_init();
     362             : 
     363          26 :     if (PG_NARGS() == 1)
     364             :     {
     365          18 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     366          18 :         rconn = getConnectionByName(conname);
     367          18 :         if (rconn)
     368          16 :             conn = rconn->conn;
     369             :     }
     370             :     else
     371           8 :         conn = pconn->conn;
     372             : 
     373          26 :     if (!conn)
     374           2 :         dblink_conn_not_avail(conname);
     375             : 
     376          24 :     libpqsrv_disconnect(conn);
     377          24 :     if (rconn)
     378             :     {
     379          16 :         deleteConnection(conname);
     380          16 :         pfree(rconn);
     381             :     }
     382             :     else
     383           8 :         pconn->conn = NULL;
     384             : 
     385          24 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     386             : }
     387             : 
     388             : /*
     389             :  * opens a cursor using a persistent connection
     390             :  */
     391          18 : PG_FUNCTION_INFO_V1(dblink_open);
     392             : Datum
     393          18 : dblink_open(PG_FUNCTION_ARGS)
     394             : {
     395          18 :     PGresult   *res = NULL;
     396             :     PGconn     *conn;
     397          18 :     char       *curname = NULL;
     398          18 :     char       *sql = NULL;
     399          18 :     char       *conname = NULL;
     400             :     StringInfoData buf;
     401          18 :     remoteConn *rconn = NULL;
     402          18 :     bool        fail = true;    /* default to backward compatible behavior */
     403             : 
     404          18 :     dblink_init();
     405          18 :     initStringInfo(&buf);
     406             : 
     407          18 :     if (PG_NARGS() == 2)
     408             :     {
     409             :         /* text,text */
     410           4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     411           4 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     412           4 :         rconn = pconn;
     413             :     }
     414          14 :     else if (PG_NARGS() == 3)
     415             :     {
     416             :         /* might be text,text,text or text,text,bool */
     417          12 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     418             :         {
     419           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     420           2 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     421           2 :             fail = PG_GETARG_BOOL(2);
     422           2 :             rconn = pconn;
     423             :         }
     424             :         else
     425             :         {
     426          10 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     427          10 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     428          10 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     429          10 :             rconn = getConnectionByName(conname);
     430             :         }
     431             :     }
     432           2 :     else if (PG_NARGS() == 4)
     433             :     {
     434             :         /* text,text,text,bool */
     435           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     436           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     437           2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     438           2 :         fail = PG_GETARG_BOOL(3);
     439           2 :         rconn = getConnectionByName(conname);
     440             :     }
     441             : 
     442          18 :     if (!rconn || !rconn->conn)
     443           0 :         dblink_conn_not_avail(conname);
     444             : 
     445          18 :     conn = rconn->conn;
     446             : 
     447             :     /* If we are not in a transaction, start one */
     448          18 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     449             :     {
     450          14 :         res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
     451          14 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     452           0 :             dblink_res_internalerror(conn, res, "begin error");
     453          14 :         PQclear(res);
     454          14 :         rconn->newXactForCursor = true;
     455             : 
     456             :         /*
     457             :          * Since transaction state was IDLE, we force cursor count to
     458             :          * initially be 0. This is needed as a previous ABORT might have wiped
     459             :          * out our transaction without maintaining the cursor count for us.
     460             :          */
     461          14 :         rconn->openCursorCount = 0;
     462             :     }
     463             : 
     464             :     /* if we started a transaction, increment cursor count */
     465          18 :     if (rconn->newXactForCursor)
     466          18 :         (rconn->openCursorCount)++;
     467             : 
     468          18 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     469          18 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     470          18 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     471             :     {
     472           4 :         dblink_res_error(conn, conname, res, fail,
     473             :                          "while opening cursor \"%s\"", curname);
     474           4 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     475             :     }
     476             : 
     477          14 :     PQclear(res);
     478          14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     479             : }
     480             : 
     481             : /*
     482             :  * closes a cursor
     483             :  */
     484          12 : PG_FUNCTION_INFO_V1(dblink_close);
     485             : Datum
     486          10 : dblink_close(PG_FUNCTION_ARGS)
     487             : {
     488             :     PGconn     *conn;
     489          10 :     PGresult   *res = NULL;
     490          10 :     char       *curname = NULL;
     491          10 :     char       *conname = NULL;
     492             :     StringInfoData buf;
     493          10 :     remoteConn *rconn = NULL;
     494          10 :     bool        fail = true;    /* default to backward compatible behavior */
     495             : 
     496          10 :     dblink_init();
     497          10 :     initStringInfo(&buf);
     498             : 
     499          10 :     if (PG_NARGS() == 1)
     500             :     {
     501             :         /* text */
     502           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     503           0 :         rconn = pconn;
     504             :     }
     505          10 :     else if (PG_NARGS() == 2)
     506             :     {
     507             :         /* might be text,text or text,bool */
     508          10 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     509             :         {
     510           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     511           4 :             fail = PG_GETARG_BOOL(1);
     512           4 :             rconn = pconn;
     513             :         }
     514             :         else
     515             :         {
     516           6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     517           6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     518           6 :             rconn = getConnectionByName(conname);
     519             :         }
     520             :     }
     521          10 :     if (PG_NARGS() == 3)
     522             :     {
     523             :         /* text,text,bool */
     524           0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     525           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     526           0 :         fail = PG_GETARG_BOOL(2);
     527           0 :         rconn = getConnectionByName(conname);
     528             :     }
     529             : 
     530          10 :     if (!rconn || !rconn->conn)
     531           0 :         dblink_conn_not_avail(conname);
     532             : 
     533          10 :     conn = rconn->conn;
     534             : 
     535          10 :     appendStringInfo(&buf, "CLOSE %s", curname);
     536             : 
     537             :     /* close the cursor */
     538          10 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     539          10 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     540             :     {
     541           2 :         dblink_res_error(conn, conname, res, fail,
     542             :                          "while closing cursor \"%s\"", curname);
     543           2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     544             :     }
     545             : 
     546           8 :     PQclear(res);
     547             : 
     548             :     /* if we started a transaction, decrement cursor count */
     549           8 :     if (rconn->newXactForCursor)
     550             :     {
     551           8 :         (rconn->openCursorCount)--;
     552             : 
     553             :         /* if count is zero, commit the transaction */
     554           8 :         if (rconn->openCursorCount == 0)
     555             :         {
     556           4 :             rconn->newXactForCursor = false;
     557             : 
     558           4 :             res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
     559           4 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     560           0 :                 dblink_res_internalerror(conn, res, "commit error");
     561           4 :             PQclear(res);
     562             :         }
     563             :     }
     564             : 
     565           8 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     566             : }
     567             : 
     568             : /*
     569             :  * Fetch results from an open cursor
     570             :  */
     571          18 : PG_FUNCTION_INFO_V1(dblink_fetch);
     572             : Datum
     573          26 : dblink_fetch(PG_FUNCTION_ARGS)
     574             : {
     575          26 :     PGresult   *res = NULL;
     576          26 :     char       *conname = NULL;
     577          26 :     remoteConn *rconn = NULL;
     578          26 :     PGconn     *conn = NULL;
     579             :     StringInfoData buf;
     580          26 :     char       *curname = NULL;
     581          26 :     int         howmany = 0;
     582          26 :     bool        fail = true;    /* default to backward compatible */
     583             : 
     584          26 :     prepTuplestoreResult(fcinfo);
     585             : 
     586          26 :     dblink_init();
     587             : 
     588          26 :     if (PG_NARGS() == 4)
     589             :     {
     590             :         /* text,text,int,bool */
     591           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     592           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     593           2 :         howmany = PG_GETARG_INT32(2);
     594           2 :         fail = PG_GETARG_BOOL(3);
     595             : 
     596           2 :         rconn = getConnectionByName(conname);
     597           2 :         if (rconn)
     598           2 :             conn = rconn->conn;
     599             :     }
     600          24 :     else if (PG_NARGS() == 3)
     601             :     {
     602             :         /* text,text,int or text,int,bool */
     603          16 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     604             :         {
     605           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     606           4 :             howmany = PG_GETARG_INT32(1);
     607           4 :             fail = PG_GETARG_BOOL(2);
     608           4 :             conn = pconn->conn;
     609             :         }
     610             :         else
     611             :         {
     612          12 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     613          12 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     614          12 :             howmany = PG_GETARG_INT32(2);
     615             : 
     616          12 :             rconn = getConnectionByName(conname);
     617          12 :             if (rconn)
     618          12 :                 conn = rconn->conn;
     619             :         }
     620             :     }
     621           8 :     else if (PG_NARGS() == 2)
     622             :     {
     623             :         /* text,int */
     624           8 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     625           8 :         howmany = PG_GETARG_INT32(1);
     626           8 :         conn = pconn->conn;
     627             :     }
     628             : 
     629          26 :     if (!conn)
     630           0 :         dblink_conn_not_avail(conname);
     631             : 
     632          26 :     initStringInfo(&buf);
     633          26 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     634             : 
     635             :     /*
     636             :      * Try to execute the query.  Note that since libpq uses malloc, the
     637             :      * PGresult will be long-lived even though we are still in a short-lived
     638             :      * memory context.
     639             :      */
     640          26 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     641          52 :     if (!res ||
     642          52 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     643          26 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     644             :     {
     645          10 :         dblink_res_error(conn, conname, res, fail,
     646             :                          "while fetching from cursor \"%s\"", curname);
     647           6 :         return (Datum) 0;
     648             :     }
     649          16 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     650             :     {
     651             :         /* cursor does not exist - closed already or bad name */
     652           0 :         PQclear(res);
     653           0 :         ereport(ERROR,
     654             :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     655             :                  errmsg("cursor \"%s\" does not exist", curname)));
     656             :     }
     657             : 
     658          16 :     materializeResult(fcinfo, conn, res);
     659          14 :     return (Datum) 0;
     660             : }
     661             : 
     662             : /*
     663             :  * Note: this is the new preferred version of dblink
     664             :  */
     665          22 : PG_FUNCTION_INFO_V1(dblink_record);
     666             : Datum
     667          50 : dblink_record(PG_FUNCTION_ARGS)
     668             : {
     669          50 :     return dblink_record_internal(fcinfo, false);
     670             : }
     671             : 
     672           6 : PG_FUNCTION_INFO_V1(dblink_send_query);
     673             : Datum
     674          12 : dblink_send_query(PG_FUNCTION_ARGS)
     675             : {
     676             :     PGconn     *conn;
     677             :     char       *sql;
     678             :     int         retval;
     679             : 
     680          12 :     if (PG_NARGS() == 2)
     681             :     {
     682          12 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     683          12 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     684             :     }
     685             :     else
     686             :         /* shouldn't happen */
     687           0 :         elog(ERROR, "wrong number of arguments");
     688             : 
     689             :     /* async query send */
     690          12 :     retval = PQsendQuery(conn, sql);
     691          12 :     if (retval != 1)
     692           0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     693             : 
     694          12 :     PG_RETURN_INT32(retval);
     695             : }
     696             : 
     697           8 : PG_FUNCTION_INFO_V1(dblink_get_result);
     698             : Datum
     699          16 : dblink_get_result(PG_FUNCTION_ARGS)
     700             : {
     701          16 :     return dblink_record_internal(fcinfo, true);
     702             : }
     703             : 
     704             : static Datum
     705          66 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     706             : {
     707          66 :     PGconn     *volatile conn = NULL;
     708          66 :     volatile bool freeconn = false;
     709             : 
     710          66 :     prepTuplestoreResult(fcinfo);
     711             : 
     712          66 :     dblink_init();
     713             : 
     714          66 :     PG_TRY();
     715             :     {
     716          66 :         char       *sql = NULL;
     717          66 :         char       *conname = NULL;
     718          66 :         bool        fail = true;    /* default to backward compatible */
     719             : 
     720          66 :         if (!is_async)
     721             :         {
     722          50 :             if (PG_NARGS() == 3)
     723             :             {
     724             :                 /* text,text,bool */
     725           2 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     726           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     727           2 :                 fail = PG_GETARG_BOOL(2);
     728           2 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     729             :             }
     730          48 :             else if (PG_NARGS() == 2)
     731             :             {
     732             :                 /* text,text or text,bool */
     733          34 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     734             :                 {
     735           2 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     736           2 :                     fail = PG_GETARG_BOOL(1);
     737           2 :                     conn = pconn->conn;
     738             :                 }
     739             :                 else
     740             :                 {
     741          32 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     742          32 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     743          32 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     744             :                 }
     745             :             }
     746          14 :             else if (PG_NARGS() == 1)
     747             :             {
     748             :                 /* text */
     749          14 :                 conn = pconn->conn;
     750          14 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     751             :             }
     752             :             else
     753             :                 /* shouldn't happen */
     754           0 :                 elog(ERROR, "wrong number of arguments");
     755             :         }
     756             :         else                    /* is_async */
     757             :         {
     758             :             /* get async result */
     759          16 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     760             : 
     761          16 :             if (PG_NARGS() == 2)
     762             :             {
     763             :                 /* text,bool */
     764           0 :                 fail = PG_GETARG_BOOL(1);
     765           0 :                 conn = dblink_get_named_conn(conname);
     766             :             }
     767          16 :             else if (PG_NARGS() == 1)
     768             :             {
     769             :                 /* text */
     770          16 :                 conn = dblink_get_named_conn(conname);
     771             :             }
     772             :             else
     773             :                 /* shouldn't happen */
     774           0 :                 elog(ERROR, "wrong number of arguments");
     775             :         }
     776             : 
     777          62 :         if (!conn)
     778           4 :             dblink_conn_not_avail(conname);
     779             : 
     780          58 :         if (!is_async)
     781             :         {
     782             :             /* synchronous query, use efficient tuple collection method */
     783          42 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     784             :         }
     785             :         else
     786             :         {
     787             :             /* async result retrieval, do it the old way */
     788          16 :             PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
     789             : 
     790             :             /* NULL means we're all done with the async results */
     791          16 :             if (res)
     792             :             {
     793          10 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     794          10 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     795             :                 {
     796           0 :                     dblink_res_error(conn, conname, res, fail,
     797             :                                      "while executing query");
     798             :                     /* if fail isn't set, we'll return an empty query result */
     799             :                 }
     800             :                 else
     801             :                 {
     802          10 :                     materializeResult(fcinfo, conn, res);
     803             :                 }
     804             :             }
     805             :         }
     806             :     }
     807           8 :     PG_FINALLY();
     808             :     {
     809             :         /* if needed, close the connection to the database */
     810          66 :         if (freeconn)
     811           6 :             libpqsrv_disconnect(conn);
     812             :     }
     813          66 :     PG_END_TRY();
     814             : 
     815          58 :     return (Datum) 0;
     816             : }
     817             : 
     818             : /*
     819             :  * Verify function caller can handle a tuplestore result, and set up for that.
     820             :  *
     821             :  * Note: if the caller returns without actually creating a tuplestore, the
     822             :  * executor will treat the function result as an empty set.
     823             :  */
     824             : static void
     825          92 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     826             : {
     827          92 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     828             : 
     829             :     /* check to see if query supports us returning a tuplestore */
     830          92 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     831           0 :         ereport(ERROR,
     832             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     833             :                  errmsg("set-valued function called in context that cannot accept a set")));
     834          92 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     835           0 :         ereport(ERROR,
     836             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     837             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     838             : 
     839             :     /* let the executor know we're sending back a tuplestore */
     840          92 :     rsinfo->returnMode = SFRM_Materialize;
     841             : 
     842             :     /* caller must fill these to return a non-empty result */
     843          92 :     rsinfo->setResult = NULL;
     844          92 :     rsinfo->setDesc = NULL;
     845          92 : }
     846             : 
     847             : /*
     848             :  * Copy the contents of the PGresult into a tuplestore to be returned
     849             :  * as the result of the current function.
     850             :  * The PGresult will be released in this function.
     851             :  */
     852             : static void
     853          26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     854             : {
     855          26 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     856             : 
     857             :     /* prepTuplestoreResult must have been called previously */
     858             :     Assert(rsinfo->returnMode == SFRM_Materialize);
     859             : 
     860          26 :     PG_TRY();
     861             :     {
     862             :         TupleDesc   tupdesc;
     863             :         bool        is_sql_cmd;
     864             :         int         ntuples;
     865             :         int         nfields;
     866             : 
     867          26 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     868             :         {
     869           0 :             is_sql_cmd = true;
     870             : 
     871             :             /*
     872             :              * need a tuple descriptor representing one TEXT column to return
     873             :              * the command status string as our result tuple
     874             :              */
     875           0 :             tupdesc = CreateTemplateTupleDesc(1);
     876           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     877             :                                TEXTOID, -1, 0);
     878           0 :             ntuples = 1;
     879           0 :             nfields = 1;
     880             :         }
     881             :         else
     882             :         {
     883             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     884             : 
     885          26 :             is_sql_cmd = false;
     886             : 
     887             :             /* get a tuple descriptor for our result type */
     888          26 :             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     889             :             {
     890          26 :                 case TYPEFUNC_COMPOSITE:
     891             :                     /* success */
     892          26 :                     break;
     893           0 :                 case TYPEFUNC_RECORD:
     894             :                     /* failed to determine actual type of RECORD */
     895           0 :                     ereport(ERROR,
     896             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     897             :                              errmsg("function returning record called in context "
     898             :                                     "that cannot accept type record")));
     899             :                     break;
     900           0 :                 default:
     901             :                     /* result type isn't composite */
     902           0 :                     elog(ERROR, "return type must be a row type");
     903             :                     break;
     904             :             }
     905             : 
     906             :             /* make sure we have a persistent copy of the tupdesc */
     907          26 :             tupdesc = CreateTupleDescCopy(tupdesc);
     908          26 :             ntuples = PQntuples(res);
     909          26 :             nfields = PQnfields(res);
     910             :         }
     911             : 
     912             :         /*
     913             :          * check result and tuple descriptor have the same number of columns
     914             :          */
     915          26 :         if (nfields != tupdesc->natts)
     916           0 :             ereport(ERROR,
     917             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
     918             :                      errmsg("remote query result rowtype does not match "
     919             :                             "the specified FROM clause rowtype")));
     920             : 
     921          26 :         if (ntuples > 0)
     922             :         {
     923             :             AttInMetadata *attinmeta;
     924          26 :             int         nestlevel = -1;
     925             :             Tuplestorestate *tupstore;
     926             :             MemoryContext oldcontext;
     927             :             int         row;
     928             :             char      **values;
     929             : 
     930          26 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
     931             : 
     932             :             /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     933          26 :             if (!is_sql_cmd)
     934          26 :                 nestlevel = applyRemoteGucs(conn);
     935             : 
     936          26 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     937          26 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
     938          26 :             rsinfo->setResult = tupstore;
     939          26 :             rsinfo->setDesc = tupdesc;
     940          26 :             MemoryContextSwitchTo(oldcontext);
     941             : 
     942          26 :             values = palloc_array(char *, nfields);
     943             : 
     944             :             /* put all tuples into the tuplestore */
     945          98 :             for (row = 0; row < ntuples; row++)
     946             :             {
     947             :                 HeapTuple   tuple;
     948             : 
     949          74 :                 if (!is_sql_cmd)
     950             :                 {
     951             :                     int         i;
     952             : 
     953         276 :                     for (i = 0; i < nfields; i++)
     954             :                     {
     955         202 :                         if (PQgetisnull(res, row, i))
     956           0 :                             values[i] = NULL;
     957             :                         else
     958         202 :                             values[i] = PQgetvalue(res, row, i);
     959             :                     }
     960             :                 }
     961             :                 else
     962             :                 {
     963           0 :                     values[0] = PQcmdStatus(res);
     964             :                 }
     965             : 
     966             :                 /* build the tuple and put it into the tuplestore. */
     967          74 :                 tuple = BuildTupleFromCStrings(attinmeta, values);
     968          72 :                 tuplestore_puttuple(tupstore, tuple);
     969             :             }
     970             : 
     971             :             /* clean up GUC settings, if we changed any */
     972          24 :             restoreLocalGucs(nestlevel);
     973             :         }
     974             :     }
     975           2 :     PG_FINALLY();
     976             :     {
     977             :         /* be sure to release the libpq result */
     978          26 :         PQclear(res);
     979             :     }
     980          26 :     PG_END_TRY();
     981          24 : }
     982             : 
     983             : /*
     984             :  * Execute the given SQL command and store its results into a tuplestore
     985             :  * to be returned as the result of the current function.
     986             :  *
     987             :  * This is equivalent to PQexec followed by materializeResult, but we make
     988             :  * use of libpq's single-row mode to avoid accumulating the whole result
     989             :  * inside libpq before it gets transferred to the tuplestore.
     990             :  */
     991             : static void
     992          42 : materializeQueryResult(FunctionCallInfo fcinfo,
     993             :                        PGconn *conn,
     994             :                        const char *conname,
     995             :                        const char *sql,
     996             :                        bool fail)
     997             : {
     998          42 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     999          42 :     PGresult   *volatile res = NULL;
    1000          42 :     volatile storeInfo sinfo = {0};
    1001             : 
    1002             :     /* prepTuplestoreResult must have been called previously */
    1003             :     Assert(rsinfo->returnMode == SFRM_Materialize);
    1004             : 
    1005          42 :     sinfo.fcinfo = fcinfo;
    1006             : 
    1007          42 :     PG_TRY();
    1008             :     {
    1009             :         /* Create short-lived memory context for data conversions */
    1010          42 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1011             :                                                  "dblink temporary context",
    1012             :                                                  ALLOCSET_DEFAULT_SIZES);
    1013             : 
    1014             :         /* execute query, collecting any tuples into the tuplestore */
    1015          42 :         res = storeQueryResult(&sinfo, conn, sql);
    1016             : 
    1017          42 :         if (!res ||
    1018          42 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1019          42 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1020           4 :         {
    1021             :             /*
    1022             :              * dblink_res_error will clear the passed PGresult, so we need
    1023             :              * this ugly dance to avoid doing so twice during error exit
    1024             :              */
    1025           4 :             PGresult   *res1 = res;
    1026             : 
    1027           4 :             res = NULL;
    1028           4 :             dblink_res_error(conn, conname, res1, fail,
    1029             :                              "while executing query");
    1030             :             /* if fail isn't set, we'll return an empty query result */
    1031             :         }
    1032          38 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1033             :         {
    1034             :             /*
    1035             :              * storeRow didn't get called, so we need to convert the command
    1036             :              * status string to a tuple manually
    1037             :              */
    1038             :             TupleDesc   tupdesc;
    1039             :             AttInMetadata *attinmeta;
    1040             :             Tuplestorestate *tupstore;
    1041             :             HeapTuple   tuple;
    1042             :             char       *values[1];
    1043             :             MemoryContext oldcontext;
    1044             : 
    1045             :             /*
    1046             :              * need a tuple descriptor representing one TEXT column to return
    1047             :              * the command status string as our result tuple
    1048             :              */
    1049           0 :             tupdesc = CreateTemplateTupleDesc(1);
    1050           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1051             :                                TEXTOID, -1, 0);
    1052           0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1053             : 
    1054           0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1055           0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1056           0 :             rsinfo->setResult = tupstore;
    1057           0 :             rsinfo->setDesc = tupdesc;
    1058           0 :             MemoryContextSwitchTo(oldcontext);
    1059             : 
    1060           0 :             values[0] = PQcmdStatus(res);
    1061             : 
    1062             :             /* build the tuple and put it into the tuplestore. */
    1063           0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1064           0 :             tuplestore_puttuple(tupstore, tuple);
    1065             : 
    1066           0 :             PQclear(res);
    1067           0 :             res = NULL;
    1068             :         }
    1069             :         else
    1070             :         {
    1071             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1072             :             /* storeRow should have created a tuplestore */
    1073             :             Assert(rsinfo->setResult != NULL);
    1074             : 
    1075          38 :             PQclear(res);
    1076          38 :             res = NULL;
    1077             :         }
    1078             : 
    1079             :         /* clean up data conversion short-lived memory context */
    1080          42 :         if (sinfo.tmpcontext != NULL)
    1081          42 :             MemoryContextDelete(sinfo.tmpcontext);
    1082          42 :         sinfo.tmpcontext = NULL;
    1083             : 
    1084          42 :         PQclear(sinfo.last_res);
    1085          42 :         sinfo.last_res = NULL;
    1086          42 :         PQclear(sinfo.cur_res);
    1087          42 :         sinfo.cur_res = NULL;
    1088             :     }
    1089           0 :     PG_CATCH();
    1090             :     {
    1091             :         /* be sure to release any libpq result we collected */
    1092           0 :         PQclear(res);
    1093           0 :         PQclear(sinfo.last_res);
    1094           0 :         PQclear(sinfo.cur_res);
    1095             :         /* and clear out any pending data in libpq */
    1096           0 :         while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
    1097             :                NULL)
    1098           0 :             PQclear(res);
    1099           0 :         PG_RE_THROW();
    1100             :     }
    1101          42 :     PG_END_TRY();
    1102          42 : }
    1103             : 
    1104             : /*
    1105             :  * Execute query, and send any result rows to sinfo->tuplestore.
    1106             :  */
    1107             : static PGresult *
    1108          42 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
    1109             : {
    1110          42 :     bool        first = true;
    1111          42 :     int         nestlevel = -1;
    1112             :     PGresult   *res;
    1113             : 
    1114          42 :     if (!PQsendQuery(conn, sql))
    1115           0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1116             : 
    1117          42 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1118           0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1119             : 
    1120             :     for (;;)
    1121             :     {
    1122         348 :         CHECK_FOR_INTERRUPTS();
    1123             : 
    1124         348 :         sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
    1125         348 :         if (!sinfo->cur_res)
    1126          42 :             break;
    1127             : 
    1128         306 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1129             :         {
    1130             :             /* got one row from possibly-bigger resultset */
    1131             : 
    1132             :             /*
    1133             :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1134             :              * We shouldn't do this until we have a row in hand, to ensure
    1135             :              * libpq has seen any earlier ParameterStatus protocol messages.
    1136             :              */
    1137         264 :             if (first && nestlevel < 0)
    1138          38 :                 nestlevel = applyRemoteGucs(conn);
    1139             : 
    1140         264 :             storeRow(sinfo, sinfo->cur_res, first);
    1141             : 
    1142         264 :             PQclear(sinfo->cur_res);
    1143         264 :             sinfo->cur_res = NULL;
    1144         264 :             first = false;
    1145             :         }
    1146             :         else
    1147             :         {
    1148             :             /* if empty resultset, fill tuplestore header */
    1149          42 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1150           0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1151             : 
    1152             :             /* store completed result at last_res */
    1153          42 :             PQclear(sinfo->last_res);
    1154          42 :             sinfo->last_res = sinfo->cur_res;
    1155          42 :             sinfo->cur_res = NULL;
    1156          42 :             first = true;
    1157             :         }
    1158             :     }
    1159             : 
    1160             :     /* clean up GUC settings, if we changed any */
    1161          42 :     restoreLocalGucs(nestlevel);
    1162             : 
    1163             :     /* return last_res */
    1164          42 :     res = sinfo->last_res;
    1165          42 :     sinfo->last_res = NULL;
    1166          42 :     return res;
    1167             : }
    1168             : 
    1169             : /*
    1170             :  * Send single row to sinfo->tuplestore.
    1171             :  *
    1172             :  * If "first" is true, create the tuplestore using PGresult's metadata
    1173             :  * (in this case the PGresult might contain either zero or one row).
    1174             :  */
    1175             : static void
    1176         264 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
    1177             : {
    1178         264 :     int         nfields = PQnfields(res);
    1179             :     HeapTuple   tuple;
    1180             :     int         i;
    1181             :     MemoryContext oldcontext;
    1182             : 
    1183         264 :     if (first)
    1184             :     {
    1185             :         /* Prepare for new result set */
    1186          38 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1187             :         TupleDesc   tupdesc;
    1188             : 
    1189             :         /*
    1190             :          * It's possible to get more than one result set if the query string
    1191             :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1192             :          * traditional behavior of throwing away all but the last result.
    1193             :          */
    1194          38 :         if (sinfo->tuplestore)
    1195           0 :             tuplestore_end(sinfo->tuplestore);
    1196          38 :         sinfo->tuplestore = NULL;
    1197             : 
    1198             :         /* get a tuple descriptor for our result type */
    1199          38 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1200             :         {
    1201          38 :             case TYPEFUNC_COMPOSITE:
    1202             :                 /* success */
    1203          38 :                 break;
    1204           0 :             case TYPEFUNC_RECORD:
    1205             :                 /* failed to determine actual type of RECORD */
    1206           0 :                 ereport(ERROR,
    1207             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1208             :                          errmsg("function returning record called in context "
    1209             :                                 "that cannot accept type record")));
    1210             :                 break;
    1211           0 :             default:
    1212             :                 /* result type isn't composite */
    1213           0 :                 elog(ERROR, "return type must be a row type");
    1214             :                 break;
    1215             :         }
    1216             : 
    1217             :         /* make sure we have a persistent copy of the tupdesc */
    1218          38 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1219             : 
    1220             :         /* check result and tuple descriptor have the same number of columns */
    1221          38 :         if (nfields != tupdesc->natts)
    1222           0 :             ereport(ERROR,
    1223             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1224             :                      errmsg("remote query result rowtype does not match "
    1225             :                             "the specified FROM clause rowtype")));
    1226             : 
    1227             :         /* Prepare attinmeta for later data conversions */
    1228          38 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1229             : 
    1230             :         /* Create a new, empty tuplestore */
    1231          38 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1232          38 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1233          38 :         rsinfo->setResult = sinfo->tuplestore;
    1234          38 :         rsinfo->setDesc = tupdesc;
    1235          38 :         MemoryContextSwitchTo(oldcontext);
    1236             : 
    1237             :         /* Done if empty resultset */
    1238          38 :         if (PQntuples(res) == 0)
    1239           0 :             return;
    1240             : 
    1241             :         /*
    1242             :          * Set up sufficiently-wide string pointers array; this won't change
    1243             :          * in size so it's easy to preallocate.
    1244             :          */
    1245          38 :         if (sinfo->cstrs)
    1246           0 :             pfree(sinfo->cstrs);
    1247          38 :         sinfo->cstrs = palloc_array(char *, nfields);
    1248             :     }
    1249             : 
    1250             :     /* Should have a single-row result if we get here */
    1251             :     Assert(PQntuples(res) == 1);
    1252             : 
    1253             :     /*
    1254             :      * Do the following work in a temp context that we reset after each tuple.
    1255             :      * This cleans up not only the data we have direct access to, but any
    1256             :      * cruft the I/O functions might leak.
    1257             :      */
    1258         264 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1259             : 
    1260             :     /*
    1261             :      * Fill cstrs with null-terminated strings of column values.
    1262             :      */
    1263        1020 :     for (i = 0; i < nfields; i++)
    1264             :     {
    1265         756 :         if (PQgetisnull(res, 0, i))
    1266           0 :             sinfo->cstrs[i] = NULL;
    1267             :         else
    1268         756 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1269             :     }
    1270             : 
    1271             :     /* Convert row to a tuple, and add it to the tuplestore */
    1272         264 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1273             : 
    1274         264 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1275             : 
    1276             :     /* Clean up */
    1277         264 :     MemoryContextSwitchTo(oldcontext);
    1278         264 :     MemoryContextReset(sinfo->tmpcontext);
    1279             : }
    1280             : 
    1281             : /*
    1282             :  * List all open dblink connections by name.
    1283             :  * Returns an array of all connection names.
    1284             :  * Takes no params
    1285             :  */
    1286           4 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1287             : Datum
    1288           2 : dblink_get_connections(PG_FUNCTION_ARGS)
    1289             : {
    1290             :     HASH_SEQ_STATUS status;
    1291             :     remoteConnHashEnt *hentry;
    1292           2 :     ArrayBuildState *astate = NULL;
    1293             : 
    1294           2 :     if (remoteConnHash)
    1295             :     {
    1296           2 :         hash_seq_init(&status, remoteConnHash);
    1297           8 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1298             :         {
    1299             :             /* stash away current value */
    1300           6 :             astate = accumArrayResult(astate,
    1301           6 :                                       CStringGetTextDatum(hentry->name),
    1302             :                                       false, TEXTOID, CurrentMemoryContext);
    1303             :         }
    1304             :     }
    1305             : 
    1306           2 :     if (astate)
    1307           2 :         PG_RETURN_DATUM(makeArrayResult(astate,
    1308             :                                         CurrentMemoryContext));
    1309             :     else
    1310           0 :         PG_RETURN_NULL();
    1311             : }
    1312             : 
    1313             : /*
    1314             :  * Checks if a given remote connection is busy
    1315             :  *
    1316             :  * Returns 1 if the connection is busy, 0 otherwise
    1317             :  * Params:
    1318             :  *  text connection_name - name of the connection to check
    1319             :  *
    1320             :  */
    1321           4 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1322             : Datum
    1323           2 : dblink_is_busy(PG_FUNCTION_ARGS)
    1324             : {
    1325             :     PGconn     *conn;
    1326             : 
    1327           2 :     dblink_init();
    1328           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1329             : 
    1330           2 :     PQconsumeInput(conn);
    1331           2 :     PG_RETURN_INT32(PQisBusy(conn));
    1332             : }
    1333             : 
    1334             : /*
    1335             :  * Cancels a running request on a connection
    1336             :  *
    1337             :  * Returns text:
    1338             :  *  "OK" if the cancel request has been sent correctly,
    1339             :  *      an error message otherwise
    1340             :  *
    1341             :  * Params:
    1342             :  *  text connection_name - name of the connection to check
    1343             :  *
    1344             :  */
    1345           4 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1346             : Datum
    1347           2 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1348             : {
    1349             :     PGconn     *conn;
    1350             :     const char *msg;
    1351             :     TimestampTz endtime;
    1352             : 
    1353           2 :     dblink_init();
    1354           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1355           2 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1356             :                                           30000);
    1357           2 :     msg = libpqsrv_cancel(conn, endtime);
    1358           2 :     if (msg == NULL)
    1359           2 :         msg = "OK";
    1360             : 
    1361           2 :     PG_RETURN_TEXT_P(cstring_to_text(msg));
    1362             : }
    1363             : 
    1364             : 
    1365             : /*
    1366             :  * Get error message from a connection
    1367             :  *
    1368             :  * Returns text:
    1369             :  *  "OK" if no error, an error message otherwise
    1370             :  *
    1371             :  * Params:
    1372             :  *  text connection_name - name of the connection to check
    1373             :  *
    1374             :  */
    1375           4 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1376             : Datum
    1377           2 : dblink_error_message(PG_FUNCTION_ARGS)
    1378             : {
    1379             :     char       *msg;
    1380             :     PGconn     *conn;
    1381             : 
    1382           2 :     dblink_init();
    1383           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1384             : 
    1385           2 :     msg = PQerrorMessage(conn);
    1386           2 :     if (msg == NULL || msg[0] == '\0')
    1387           2 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1388             :     else
    1389           0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1390             : }
    1391             : 
    1392             : /*
    1393             :  * Execute an SQL non-SELECT command
    1394             :  */
    1395          18 : PG_FUNCTION_INFO_V1(dblink_exec);
    1396             : Datum
    1397          52 : dblink_exec(PG_FUNCTION_ARGS)
    1398             : {
    1399          52 :     text       *volatile sql_cmd_status = NULL;
    1400          52 :     PGconn     *volatile conn = NULL;
    1401          52 :     volatile bool freeconn = false;
    1402             : 
    1403          52 :     dblink_init();
    1404             : 
    1405          52 :     PG_TRY();
    1406             :     {
    1407          52 :         PGresult   *res = NULL;
    1408          52 :         char       *sql = NULL;
    1409          52 :         char       *conname = NULL;
    1410          52 :         bool        fail = true;    /* default to backward compatible behavior */
    1411             : 
    1412          52 :         if (PG_NARGS() == 3)
    1413             :         {
    1414             :             /* must be text,text,bool */
    1415           0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1416           0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1417           0 :             fail = PG_GETARG_BOOL(2);
    1418           0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1419             :         }
    1420          52 :         else if (PG_NARGS() == 2)
    1421             :         {
    1422             :             /* might be text,text or text,bool */
    1423          34 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1424             :             {
    1425           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1426           2 :                 fail = PG_GETARG_BOOL(1);
    1427           2 :                 conn = pconn->conn;
    1428             :             }
    1429             :             else
    1430             :             {
    1431          32 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1432          32 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1433          32 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1434             :             }
    1435             :         }
    1436          18 :         else if (PG_NARGS() == 1)
    1437             :         {
    1438             :             /* must be single text argument */
    1439          18 :             conn = pconn->conn;
    1440          18 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1441             :         }
    1442             :         else
    1443             :             /* shouldn't happen */
    1444           0 :             elog(ERROR, "wrong number of arguments");
    1445             : 
    1446          52 :         if (!conn)
    1447           0 :             dblink_conn_not_avail(conname);
    1448             : 
    1449          52 :         res = libpqsrv_exec(conn, sql, dblink_we_get_result);
    1450          52 :         if (!res ||
    1451          52 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1452           4 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1453             :         {
    1454           4 :             dblink_res_error(conn, conname, res, fail,
    1455             :                              "while executing command");
    1456             : 
    1457             :             /*
    1458             :              * and save a copy of the command status string to return as our
    1459             :              * result tuple
    1460             :              */
    1461           2 :             sql_cmd_status = cstring_to_text("ERROR");
    1462             :         }
    1463          48 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1464             :         {
    1465             :             /*
    1466             :              * and save a copy of the command status string to return as our
    1467             :              * result tuple
    1468             :              */
    1469          48 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1470          48 :             PQclear(res);
    1471             :         }
    1472             :         else
    1473             :         {
    1474           0 :             PQclear(res);
    1475           0 :             ereport(ERROR,
    1476             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1477             :                      errmsg("statement returning results not allowed")));
    1478             :         }
    1479             :     }
    1480           2 :     PG_FINALLY();
    1481             :     {
    1482             :         /* if needed, close the connection to the database */
    1483          52 :         if (freeconn)
    1484           2 :             libpqsrv_disconnect(conn);
    1485             :     }
    1486          52 :     PG_END_TRY();
    1487             : 
    1488          50 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1489             : }
    1490             : 
    1491             : 
    1492             : /*
    1493             :  * dblink_get_pkey
    1494             :  *
    1495             :  * Return list of primary key fields for the supplied relation,
    1496             :  * or NULL if none exists.
    1497             :  */
    1498           4 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1499             : Datum
    1500          18 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1501             : {
    1502             :     int16       indnkeyatts;
    1503             :     char      **results;
    1504             :     FuncCallContext *funcctx;
    1505             :     int32       call_cntr;
    1506             :     int32       max_calls;
    1507             :     AttInMetadata *attinmeta;
    1508             :     MemoryContext oldcontext;
    1509             : 
    1510             :     /* stuff done only on the first call of the function */
    1511          18 :     if (SRF_IS_FIRSTCALL())
    1512             :     {
    1513             :         Relation    rel;
    1514             :         TupleDesc   tupdesc;
    1515             : 
    1516             :         /* create a function context for cross-call persistence */
    1517           6 :         funcctx = SRF_FIRSTCALL_INIT();
    1518             : 
    1519             :         /*
    1520             :          * switch to memory context appropriate for multiple function calls
    1521             :          */
    1522           6 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1523             : 
    1524             :         /* open target relation */
    1525           6 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1526             : 
    1527             :         /* get the array of attnums */
    1528           6 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1529             : 
    1530           6 :         relation_close(rel, AccessShareLock);
    1531             : 
    1532             :         /*
    1533             :          * need a tuple descriptor representing one INT and one TEXT column
    1534             :          */
    1535           6 :         tupdesc = CreateTemplateTupleDesc(2);
    1536           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1537             :                            INT4OID, -1, 0);
    1538           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1539             :                            TEXTOID, -1, 0);
    1540             : 
    1541             :         /*
    1542             :          * Generate attribute metadata needed later to produce tuples from raw
    1543             :          * C strings
    1544             :          */
    1545           6 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1546           6 :         funcctx->attinmeta = attinmeta;
    1547             : 
    1548           6 :         if ((results != NULL) && (indnkeyatts > 0))
    1549             :         {
    1550           6 :             funcctx->max_calls = indnkeyatts;
    1551             : 
    1552             :             /* got results, keep track of them */
    1553           6 :             funcctx->user_fctx = results;
    1554             :         }
    1555             :         else
    1556             :         {
    1557             :             /* fast track when no results */
    1558           0 :             MemoryContextSwitchTo(oldcontext);
    1559           0 :             SRF_RETURN_DONE(funcctx);
    1560             :         }
    1561             : 
    1562           6 :         MemoryContextSwitchTo(oldcontext);
    1563             :     }
    1564             : 
    1565             :     /* stuff done on every call of the function */
    1566          18 :     funcctx = SRF_PERCALL_SETUP();
    1567             : 
    1568             :     /*
    1569             :      * initialize per-call variables
    1570             :      */
    1571          18 :     call_cntr = funcctx->call_cntr;
    1572          18 :     max_calls = funcctx->max_calls;
    1573             : 
    1574          18 :     results = (char **) funcctx->user_fctx;
    1575          18 :     attinmeta = funcctx->attinmeta;
    1576             : 
    1577          18 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1578             :     {
    1579             :         char      **values;
    1580             :         HeapTuple   tuple;
    1581             :         Datum       result;
    1582             : 
    1583          12 :         values = palloc_array(char *, 2);
    1584          12 :         values[0] = psprintf("%d", call_cntr + 1);
    1585          12 :         values[1] = results[call_cntr];
    1586             : 
    1587             :         /* build the tuple */
    1588          12 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1589             : 
    1590             :         /* make the tuple into a datum */
    1591          12 :         result = HeapTupleGetDatum(tuple);
    1592             : 
    1593          12 :         SRF_RETURN_NEXT(funcctx, result);
    1594             :     }
    1595             :     else
    1596             :     {
    1597             :         /* do when there is no more left */
    1598           6 :         SRF_RETURN_DONE(funcctx);
    1599             :     }
    1600             : }
    1601             : 
    1602             : 
    1603             : /*
    1604             :  * dblink_build_sql_insert
    1605             :  *
    1606             :  * Used to generate an SQL insert statement
    1607             :  * based on an existing tuple in a local relation.
    1608             :  * This is useful for selectively replicating data
    1609             :  * to another server via dblink.
    1610             :  *
    1611             :  * API:
    1612             :  * <relname> - name of local table of interest
    1613             :  * <pkattnums> - an int2vector of attnums which will be used
    1614             :  * to identify the local tuple of interest
    1615             :  * <pknumatts> - number of attnums in pkattnums
    1616             :  * <src_pkattvals_arry> - text array of key values which will be used
    1617             :  * to identify the local tuple of interest
    1618             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1619             :  * to build the string for execution remotely. These are substituted
    1620             :  * for their counterparts in src_pkattvals_arry
    1621             :  */
    1622           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1623             : Datum
    1624          12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1625             : {
    1626          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1627          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1628          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1629          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1630          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1631             :     Relation    rel;
    1632             :     int        *pkattnums;
    1633             :     int         pknumatts;
    1634             :     char      **src_pkattvals;
    1635             :     char      **tgt_pkattvals;
    1636             :     int         src_nitems;
    1637             :     int         tgt_nitems;
    1638             :     char       *sql;
    1639             : 
    1640             :     /*
    1641             :      * Open target relation.
    1642             :      */
    1643          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1644             : 
    1645             :     /*
    1646             :      * Process pkattnums argument.
    1647             :      */
    1648          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1649             :                        &pkattnums, &pknumatts);
    1650             : 
    1651             :     /*
    1652             :      * Source array is made up of key values that will be used to locate the
    1653             :      * tuple of interest from the local system.
    1654             :      */
    1655           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1656             : 
    1657             :     /*
    1658             :      * There should be one source array key value for each key attnum
    1659             :      */
    1660           8 :     if (src_nitems != pknumatts)
    1661           0 :         ereport(ERROR,
    1662             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1663             :                  errmsg("source key array length must match number of key attributes")));
    1664             : 
    1665             :     /*
    1666             :      * Target array is made up of key values that will be used to build the
    1667             :      * SQL string for use on the remote system.
    1668             :      */
    1669           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1670             : 
    1671             :     /*
    1672             :      * There should be one target array key value for each key attnum
    1673             :      */
    1674           8 :     if (tgt_nitems != pknumatts)
    1675           0 :         ereport(ERROR,
    1676             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1677             :                  errmsg("target key array length must match number of key attributes")));
    1678             : 
    1679             :     /*
    1680             :      * Prep work is finally done. Go get the SQL string.
    1681             :      */
    1682           8 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1683             : 
    1684             :     /*
    1685             :      * Now we can close the relation.
    1686             :      */
    1687           8 :     relation_close(rel, AccessShareLock);
    1688             : 
    1689             :     /*
    1690             :      * And send it
    1691             :      */
    1692           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1693             : }
    1694             : 
    1695             : 
    1696             : /*
    1697             :  * dblink_build_sql_delete
    1698             :  *
    1699             :  * Used to generate an SQL delete statement.
    1700             :  * This is useful for selectively replicating a
    1701             :  * delete to another server via dblink.
    1702             :  *
    1703             :  * API:
    1704             :  * <relname> - name of remote table of interest
    1705             :  * <pkattnums> - an int2vector of attnums which will be used
    1706             :  * to identify the remote tuple of interest
    1707             :  * <pknumatts> - number of attnums in pkattnums
    1708             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1709             :  * to build the string for execution remotely.
    1710             :  */
    1711           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1712             : Datum
    1713          12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1714             : {
    1715          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1716          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1717          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1718          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1719             :     Relation    rel;
    1720             :     int        *pkattnums;
    1721             :     int         pknumatts;
    1722             :     char      **tgt_pkattvals;
    1723             :     int         tgt_nitems;
    1724             :     char       *sql;
    1725             : 
    1726             :     /*
    1727             :      * Open target relation.
    1728             :      */
    1729          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1730             : 
    1731             :     /*
    1732             :      * Process pkattnums argument.
    1733             :      */
    1734          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1735             :                        &pkattnums, &pknumatts);
    1736             : 
    1737             :     /*
    1738             :      * Target array is made up of key values that will be used to build the
    1739             :      * SQL string for use on the remote system.
    1740             :      */
    1741           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1742             : 
    1743             :     /*
    1744             :      * There should be one target array key value for each key attnum
    1745             :      */
    1746           8 :     if (tgt_nitems != pknumatts)
    1747           0 :         ereport(ERROR,
    1748             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1749             :                  errmsg("target key array length must match number of key attributes")));
    1750             : 
    1751             :     /*
    1752             :      * Prep work is finally done. Go get the SQL string.
    1753             :      */
    1754           8 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1755             : 
    1756             :     /*
    1757             :      * Now we can close the relation.
    1758             :      */
    1759           8 :     relation_close(rel, AccessShareLock);
    1760             : 
    1761             :     /*
    1762             :      * And send it
    1763             :      */
    1764           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1765             : }
    1766             : 
    1767             : 
    1768             : /*
    1769             :  * dblink_build_sql_update
    1770             :  *
    1771             :  * Used to generate an SQL update statement
    1772             :  * based on an existing tuple in a local relation.
    1773             :  * This is useful for selectively replicating data
    1774             :  * to another server via dblink.
    1775             :  *
    1776             :  * API:
    1777             :  * <relname> - name of local table of interest
    1778             :  * <pkattnums> - an int2vector of attnums which will be used
    1779             :  * to identify the local tuple of interest
    1780             :  * <pknumatts> - number of attnums in pkattnums
    1781             :  * <src_pkattvals_arry> - text array of key values which will be used
    1782             :  * to identify the local tuple of interest
    1783             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1784             :  * to build the string for execution remotely. These are substituted
    1785             :  * for their counterparts in src_pkattvals_arry
    1786             :  */
    1787           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1788             : Datum
    1789          12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1790             : {
    1791          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1792          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1793          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1794          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1795          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1796             :     Relation    rel;
    1797             :     int        *pkattnums;
    1798             :     int         pknumatts;
    1799             :     char      **src_pkattvals;
    1800             :     char      **tgt_pkattvals;
    1801             :     int         src_nitems;
    1802             :     int         tgt_nitems;
    1803             :     char       *sql;
    1804             : 
    1805             :     /*
    1806             :      * Open target relation.
    1807             :      */
    1808          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1809             : 
    1810             :     /*
    1811             :      * Process pkattnums argument.
    1812             :      */
    1813          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1814             :                        &pkattnums, &pknumatts);
    1815             : 
    1816             :     /*
    1817             :      * Source array is made up of key values that will be used to locate the
    1818             :      * tuple of interest from the local system.
    1819             :      */
    1820           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1821             : 
    1822             :     /*
    1823             :      * There should be one source array key value for each key attnum
    1824             :      */
    1825           8 :     if (src_nitems != pknumatts)
    1826           0 :         ereport(ERROR,
    1827             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1828             :                  errmsg("source key array length must match number of key attributes")));
    1829             : 
    1830             :     /*
    1831             :      * Target array is made up of key values that will be used to build the
    1832             :      * SQL string for use on the remote system.
    1833             :      */
    1834           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1835             : 
    1836             :     /*
    1837             :      * There should be one target array key value for each key attnum
    1838             :      */
    1839           8 :     if (tgt_nitems != pknumatts)
    1840           0 :         ereport(ERROR,
    1841             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1842             :                  errmsg("target key array length must match number of key attributes")));
    1843             : 
    1844             :     /*
    1845             :      * Prep work is finally done. Go get the SQL string.
    1846             :      */
    1847           8 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1848             : 
    1849             :     /*
    1850             :      * Now we can close the relation.
    1851             :      */
    1852           8 :     relation_close(rel, AccessShareLock);
    1853             : 
    1854             :     /*
    1855             :      * And send it
    1856             :      */
    1857           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1858             : }
    1859             : 
    1860             : /*
    1861             :  * dblink_current_query
    1862             :  * return the current query string
    1863             :  * to allow its use in (among other things)
    1864             :  * rewrite rules
    1865             :  */
    1866           2 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1867             : Datum
    1868           0 : dblink_current_query(PG_FUNCTION_ARGS)
    1869             : {
    1870             :     /* This is now just an alias for the built-in function current_query() */
    1871           0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1872             : }
    1873             : 
    1874             : /*
    1875             :  * Retrieve async notifications for a connection.
    1876             :  *
    1877             :  * Returns a setof record of notifications, or an empty set if none received.
    1878             :  * Can optionally take a named connection as parameter, but uses the unnamed
    1879             :  * connection per default.
    1880             :  *
    1881             :  */
    1882             : #define DBLINK_NOTIFY_COLS      3
    1883             : 
    1884           6 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1885             : Datum
    1886           4 : dblink_get_notify(PG_FUNCTION_ARGS)
    1887             : {
    1888             :     PGconn     *conn;
    1889             :     PGnotify   *notify;
    1890           4 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1891             : 
    1892           4 :     dblink_init();
    1893           4 :     if (PG_NARGS() == 1)
    1894           0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1895             :     else
    1896           4 :         conn = pconn->conn;
    1897             : 
    1898           4 :     InitMaterializedSRF(fcinfo, 0);
    1899             : 
    1900           4 :     PQconsumeInput(conn);
    1901           8 :     while ((notify = PQnotifies(conn)) != NULL)
    1902             :     {
    1903             :         Datum       values[DBLINK_NOTIFY_COLS];
    1904             :         bool        nulls[DBLINK_NOTIFY_COLS];
    1905             : 
    1906           4 :         memset(values, 0, sizeof(values));
    1907           4 :         memset(nulls, 0, sizeof(nulls));
    1908             : 
    1909           4 :         if (notify->relname != NULL)
    1910           4 :             values[0] = CStringGetTextDatum(notify->relname);
    1911             :         else
    1912           0 :             nulls[0] = true;
    1913             : 
    1914           4 :         values[1] = Int32GetDatum(notify->be_pid);
    1915             : 
    1916           4 :         if (notify->extra != NULL)
    1917           4 :             values[2] = CStringGetTextDatum(notify->extra);
    1918             :         else
    1919           0 :             nulls[2] = true;
    1920             : 
    1921           4 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1922             : 
    1923           4 :         PQfreemem(notify);
    1924           4 :         PQconsumeInput(conn);
    1925             :     }
    1926             : 
    1927           4 :     return (Datum) 0;
    1928             : }
    1929             : 
    1930             : /*
    1931             :  * Validate the options given to a dblink foreign server or user mapping.
    1932             :  * Raise an error if any option is invalid.
    1933             :  *
    1934             :  * We just check the names of options here, so semantic errors in options,
    1935             :  * such as invalid numeric format, will be detected at the attempt to connect.
    1936             :  */
    1937           6 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1938             : Datum
    1939          10 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1940             : {
    1941          10 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1942          10 :     Oid         context = PG_GETARG_OID(1);
    1943             :     ListCell   *cell;
    1944             : 
    1945             :     static const PQconninfoOption *options = NULL;
    1946             : 
    1947             :     /*
    1948             :      * Get list of valid libpq options.
    1949             :      *
    1950             :      * To avoid unnecessary work, we get the list once and use it throughout
    1951             :      * the lifetime of this backend process.  We don't need to care about
    1952             :      * memory context issues, because PQconndefaults allocates with malloc.
    1953             :      */
    1954          10 :     if (!options)
    1955             :     {
    1956           4 :         options = PQconndefaults();
    1957           4 :         if (!options)           /* assume reason for failure is OOM */
    1958           0 :             ereport(ERROR,
    1959             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1960             :                      errmsg("out of memory"),
    1961             :                      errdetail("Could not get libpq's default connection options.")));
    1962             :     }
    1963             : 
    1964             :     /* Validate each supplied option. */
    1965          16 :     foreach(cell, options_list)
    1966             :     {
    1967          10 :         DefElem    *def = (DefElem *) lfirst(cell);
    1968             : 
    1969          10 :         if (!is_valid_dblink_option(options, def->defname, context))
    1970             :         {
    1971             :             /*
    1972             :              * Unknown option, or invalid option for the context specified, so
    1973             :              * complain about it.  Provide a hint with a valid option that
    1974             :              * looks similar, if there is one.
    1975             :              */
    1976             :             const PQconninfoOption *opt;
    1977             :             const char *closest_match;
    1978             :             ClosestMatchState match_state;
    1979           4 :             bool        has_valid_options = false;
    1980             : 
    1981           4 :             initClosestMatch(&match_state, def->defname, 4);
    1982         168 :             for (opt = options; opt->keyword; opt++)
    1983             :             {
    1984         164 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    1985             :                 {
    1986           6 :                     has_valid_options = true;
    1987           6 :                     updateClosestMatch(&match_state, opt->keyword);
    1988             :                 }
    1989             :             }
    1990             : 
    1991           4 :             closest_match = getClosestMatch(&match_state);
    1992           4 :             ereport(ERROR,
    1993             :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    1994             :                      errmsg("invalid option \"%s\"", def->defname),
    1995             :                      has_valid_options ? closest_match ?
    1996             :                      errhint("Perhaps you meant the option \"%s\".",
    1997             :                              closest_match) : 0 :
    1998             :                      errhint("There are no valid options in this context.")));
    1999             :         }
    2000             :     }
    2001             : 
    2002           6 :     PG_RETURN_VOID();
    2003             : }
    2004             : 
    2005             : 
    2006             : /*************************************************************
    2007             :  * internal functions
    2008             :  */
    2009             : 
    2010             : 
    2011             : /*
    2012             :  * get_pkey_attnames
    2013             :  *
    2014             :  * Get the primary key attnames for the given relation.
    2015             :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2016             :  */
    2017             : static char **
    2018           6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2019             : {
    2020             :     Relation    indexRelation;
    2021             :     ScanKeyData skey;
    2022             :     SysScanDesc scan;
    2023             :     HeapTuple   indexTuple;
    2024             :     int         i;
    2025           6 :     char      **result = NULL;
    2026             :     TupleDesc   tupdesc;
    2027             : 
    2028             :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2029           6 :     *indnkeyatts = 0;
    2030             : 
    2031           6 :     tupdesc = rel->rd_att;
    2032             : 
    2033             :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2034           6 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2035           6 :     ScanKeyInit(&skey,
    2036             :                 Anum_pg_index_indrelid,
    2037             :                 BTEqualStrategyNumber, F_OIDEQ,
    2038             :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2039             : 
    2040           6 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2041             :                               NULL, 1, &skey);
    2042             : 
    2043           6 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2044             :     {
    2045           6 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2046             : 
    2047             :         /* we're only interested if it is the primary key */
    2048           6 :         if (index->indisprimary)
    2049             :         {
    2050           6 :             *indnkeyatts = index->indnkeyatts;
    2051           6 :             if (*indnkeyatts > 0)
    2052             :             {
    2053           6 :                 result = palloc_array(char *, *indnkeyatts);
    2054             : 
    2055          18 :                 for (i = 0; i < *indnkeyatts; i++)
    2056          12 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2057             :             }
    2058           6 :             break;
    2059             :         }
    2060             :     }
    2061             : 
    2062           6 :     systable_endscan(scan);
    2063           6 :     table_close(indexRelation, AccessShareLock);
    2064             : 
    2065           6 :     return result;
    2066             : }
    2067             : 
    2068             : /*
    2069             :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2070             :  * returned as NULL pointers)
    2071             :  */
    2072             : static char **
    2073          40 : get_text_array_contents(ArrayType *array, int *numitems)
    2074             : {
    2075          40 :     int         ndim = ARR_NDIM(array);
    2076          40 :     int        *dims = ARR_DIMS(array);
    2077             :     int         nitems;
    2078             :     int16       typlen;
    2079             :     bool        typbyval;
    2080             :     char        typalign;
    2081             :     char      **values;
    2082             :     char       *ptr;
    2083             :     bits8      *bitmap;
    2084             :     int         bitmask;
    2085             :     int         i;
    2086             : 
    2087             :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2088             : 
    2089          40 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2090             : 
    2091          40 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2092             :                          &typlen, &typbyval, &typalign);
    2093             : 
    2094          40 :     values = palloc_array(char *, nitems);
    2095             : 
    2096          40 :     ptr = ARR_DATA_PTR(array);
    2097          40 :     bitmap = ARR_NULLBITMAP(array);
    2098          40 :     bitmask = 1;
    2099             : 
    2100         110 :     for (i = 0; i < nitems; i++)
    2101             :     {
    2102          70 :         if (bitmap && (*bitmap & bitmask) == 0)
    2103             :         {
    2104           0 :             values[i] = NULL;
    2105             :         }
    2106             :         else
    2107             :         {
    2108          70 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2109          70 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2110          70 :             ptr = (char *) att_align_nominal(ptr, typalign);
    2111             :         }
    2112             : 
    2113             :         /* advance bitmap pointer if any */
    2114          70 :         if (bitmap)
    2115             :         {
    2116           0 :             bitmask <<= 1;
    2117           0 :             if (bitmask == 0x100)
    2118             :             {
    2119           0 :                 bitmap++;
    2120           0 :                 bitmask = 1;
    2121             :             }
    2122             :         }
    2123             :     }
    2124             : 
    2125          40 :     return values;
    2126             : }
    2127             : 
    2128             : static char *
    2129           8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2130             : {
    2131             :     char       *relname;
    2132             :     HeapTuple   tuple;
    2133             :     TupleDesc   tupdesc;
    2134             :     int         natts;
    2135             :     StringInfoData buf;
    2136             :     char       *val;
    2137             :     int         key;
    2138             :     int         i;
    2139             :     bool        needComma;
    2140             : 
    2141           8 :     initStringInfo(&buf);
    2142             : 
    2143             :     /* get relation name including any needed schema prefix and quoting */
    2144           8 :     relname = generate_relation_name(rel);
    2145             : 
    2146           8 :     tupdesc = rel->rd_att;
    2147           8 :     natts = tupdesc->natts;
    2148             : 
    2149           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2150           8 :     if (!tuple)
    2151           0 :         ereport(ERROR,
    2152             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2153             :                  errmsg("source row not found")));
    2154             : 
    2155           8 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2156             : 
    2157           8 :     needComma = false;
    2158          38 :     for (i = 0; i < natts; i++)
    2159             :     {
    2160          30 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2161             : 
    2162          30 :         if (att->attisdropped)
    2163           4 :             continue;
    2164             : 
    2165          26 :         if (needComma)
    2166          18 :             appendStringInfoChar(&buf, ',');
    2167             : 
    2168          26 :         appendStringInfoString(&buf,
    2169          26 :                                quote_ident_cstr(NameStr(att->attname)));
    2170          26 :         needComma = true;
    2171             :     }
    2172             : 
    2173           8 :     appendStringInfoString(&buf, ") VALUES(");
    2174             : 
    2175             :     /*
    2176             :      * Note: i is physical column number (counting from 0).
    2177             :      */
    2178           8 :     needComma = false;
    2179          38 :     for (i = 0; i < natts; i++)
    2180             :     {
    2181          30 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2182           4 :             continue;
    2183             : 
    2184          26 :         if (needComma)
    2185          18 :             appendStringInfoChar(&buf, ',');
    2186             : 
    2187          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2188             : 
    2189          26 :         if (key >= 0)
    2190          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2191             :         else
    2192          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2193             : 
    2194          26 :         if (val != NULL)
    2195             :         {
    2196          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2197          26 :             pfree(val);
    2198             :         }
    2199             :         else
    2200           0 :             appendStringInfoString(&buf, "NULL");
    2201          26 :         needComma = true;
    2202             :     }
    2203           8 :     appendStringInfoChar(&buf, ')');
    2204             : 
    2205           8 :     return buf.data;
    2206             : }
    2207             : 
    2208             : static char *
    2209           8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2210             : {
    2211             :     char       *relname;
    2212             :     TupleDesc   tupdesc;
    2213             :     StringInfoData buf;
    2214             :     int         i;
    2215             : 
    2216           8 :     initStringInfo(&buf);
    2217             : 
    2218             :     /* get relation name including any needed schema prefix and quoting */
    2219           8 :     relname = generate_relation_name(rel);
    2220             : 
    2221           8 :     tupdesc = rel->rd_att;
    2222             : 
    2223           8 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2224          22 :     for (i = 0; i < pknumatts; i++)
    2225             :     {
    2226          14 :         int         pkattnum = pkattnums[i];
    2227          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2228             : 
    2229          14 :         if (i > 0)
    2230           6 :             appendStringInfoString(&buf, " AND ");
    2231             : 
    2232          14 :         appendStringInfoString(&buf,
    2233          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2234             : 
    2235          14 :         if (tgt_pkattvals[i] != NULL)
    2236          14 :             appendStringInfo(&buf, " = %s",
    2237          14 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2238             :         else
    2239           0 :             appendStringInfoString(&buf, " IS NULL");
    2240             :     }
    2241             : 
    2242           8 :     return buf.data;
    2243             : }
    2244             : 
    2245             : static char *
    2246           8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2247             : {
    2248             :     char       *relname;
    2249             :     HeapTuple   tuple;
    2250             :     TupleDesc   tupdesc;
    2251             :     int         natts;
    2252             :     StringInfoData buf;
    2253             :     char       *val;
    2254             :     int         key;
    2255             :     int         i;
    2256             :     bool        needComma;
    2257             : 
    2258           8 :     initStringInfo(&buf);
    2259             : 
    2260             :     /* get relation name including any needed schema prefix and quoting */
    2261           8 :     relname = generate_relation_name(rel);
    2262             : 
    2263           8 :     tupdesc = rel->rd_att;
    2264           8 :     natts = tupdesc->natts;
    2265             : 
    2266           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2267           8 :     if (!tuple)
    2268           0 :         ereport(ERROR,
    2269             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2270             :                  errmsg("source row not found")));
    2271             : 
    2272           8 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2273             : 
    2274             :     /*
    2275             :      * Note: i is physical column number (counting from 0).
    2276             :      */
    2277           8 :     needComma = false;
    2278          38 :     for (i = 0; i < natts; i++)
    2279             :     {
    2280          30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2281             : 
    2282          30 :         if (attr->attisdropped)
    2283           4 :             continue;
    2284             : 
    2285          26 :         if (needComma)
    2286          18 :             appendStringInfoString(&buf, ", ");
    2287             : 
    2288          26 :         appendStringInfo(&buf, "%s = ",
    2289          26 :                          quote_ident_cstr(NameStr(attr->attname)));
    2290             : 
    2291          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2292             : 
    2293          26 :         if (key >= 0)
    2294          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2295             :         else
    2296          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2297             : 
    2298          26 :         if (val != NULL)
    2299             :         {
    2300          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2301          26 :             pfree(val);
    2302             :         }
    2303             :         else
    2304           0 :             appendStringInfoString(&buf, "NULL");
    2305          26 :         needComma = true;
    2306             :     }
    2307             : 
    2308           8 :     appendStringInfoString(&buf, " WHERE ");
    2309             : 
    2310          22 :     for (i = 0; i < pknumatts; i++)
    2311             :     {
    2312          14 :         int         pkattnum = pkattnums[i];
    2313          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2314             : 
    2315          14 :         if (i > 0)
    2316           6 :             appendStringInfoString(&buf, " AND ");
    2317             : 
    2318          14 :         appendStringInfoString(&buf,
    2319          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2320             : 
    2321          14 :         val = tgt_pkattvals[i];
    2322             : 
    2323          14 :         if (val != NULL)
    2324          14 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2325             :         else
    2326           0 :             appendStringInfoString(&buf, " IS NULL");
    2327             :     }
    2328             : 
    2329           8 :     return buf.data;
    2330             : }
    2331             : 
    2332             : /*
    2333             :  * Return a properly quoted identifier.
    2334             :  * Uses quote_ident in quote.c
    2335             :  */
    2336             : static char *
    2337         160 : quote_ident_cstr(char *rawstr)
    2338             : {
    2339             :     text       *rawstr_text;
    2340             :     text       *result_text;
    2341             :     char       *result;
    2342             : 
    2343         160 :     rawstr_text = cstring_to_text(rawstr);
    2344         160 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2345             :                                                      PointerGetDatum(rawstr_text)));
    2346         160 :     result = text_to_cstring(result_text);
    2347             : 
    2348         160 :     return result;
    2349             : }
    2350             : 
    2351             : static int
    2352          52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2353             : {
    2354             :     int         i;
    2355             : 
    2356             :     /*
    2357             :      * Not likely a long list anyway, so just scan for the value
    2358             :      */
    2359         100 :     for (i = 0; i < pknumatts; i++)
    2360          76 :         if (key == pkattnums[i])
    2361          28 :             return i;
    2362             : 
    2363          24 :     return -1;
    2364             : }
    2365             : 
    2366             : static HeapTuple
    2367          16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2368             : {
    2369             :     char       *relname;
    2370             :     TupleDesc   tupdesc;
    2371             :     int         natts;
    2372             :     StringInfoData buf;
    2373             :     int         ret;
    2374             :     HeapTuple   tuple;
    2375             :     int         i;
    2376             : 
    2377             :     /*
    2378             :      * Connect to SPI manager
    2379             :      */
    2380          16 :     SPI_connect();
    2381             : 
    2382          16 :     initStringInfo(&buf);
    2383             : 
    2384             :     /* get relation name including any needed schema prefix and quoting */
    2385          16 :     relname = generate_relation_name(rel);
    2386             : 
    2387          16 :     tupdesc = rel->rd_att;
    2388          16 :     natts = tupdesc->natts;
    2389             : 
    2390             :     /*
    2391             :      * Build sql statement to look up tuple of interest, ie, the one matching
    2392             :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2393             :      * generate a result tuple that matches the table's physical structure,
    2394             :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2395             :      * different tupdescs and everything's very confusing.
    2396             :      */
    2397          16 :     appendStringInfoString(&buf, "SELECT ");
    2398             : 
    2399          76 :     for (i = 0; i < natts; i++)
    2400             :     {
    2401          60 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2402             : 
    2403          60 :         if (i > 0)
    2404          44 :             appendStringInfoString(&buf, ", ");
    2405             : 
    2406          60 :         if (attr->attisdropped)
    2407           8 :             appendStringInfoString(&buf, "NULL");
    2408             :         else
    2409          52 :             appendStringInfoString(&buf,
    2410          52 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2411             :     }
    2412             : 
    2413          16 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2414             : 
    2415          44 :     for (i = 0; i < pknumatts; i++)
    2416             :     {
    2417          28 :         int         pkattnum = pkattnums[i];
    2418          28 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2419             : 
    2420          28 :         if (i > 0)
    2421          12 :             appendStringInfoString(&buf, " AND ");
    2422             : 
    2423          28 :         appendStringInfoString(&buf,
    2424          28 :                                quote_ident_cstr(NameStr(attr->attname)));
    2425             : 
    2426          28 :         if (src_pkattvals[i] != NULL)
    2427          28 :             appendStringInfo(&buf, " = %s",
    2428          28 :                              quote_literal_cstr(src_pkattvals[i]));
    2429             :         else
    2430           0 :             appendStringInfoString(&buf, " IS NULL");
    2431             :     }
    2432             : 
    2433             :     /*
    2434             :      * Retrieve the desired tuple
    2435             :      */
    2436          16 :     ret = SPI_exec(buf.data, 0);
    2437          16 :     pfree(buf.data);
    2438             : 
    2439             :     /*
    2440             :      * Only allow one qualifying tuple
    2441             :      */
    2442          16 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2443           0 :         ereport(ERROR,
    2444             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2445             :                  errmsg("source criteria matched more than one record")));
    2446             : 
    2447          16 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2448             :     {
    2449          16 :         SPITupleTable *tuptable = SPI_tuptable;
    2450             : 
    2451          16 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2452          16 :         SPI_finish();
    2453             : 
    2454          16 :         return tuple;
    2455             :     }
    2456             :     else
    2457             :     {
    2458             :         /*
    2459             :          * no qualifying tuples
    2460             :          */
    2461           0 :         SPI_finish();
    2462             : 
    2463           0 :         return NULL;
    2464             :     }
    2465             : 
    2466             :     /*
    2467             :      * never reached, but keep compiler quiet
    2468             :      */
    2469             :     return NULL;
    2470             : }
    2471             : 
    2472             : /*
    2473             :  * Open the relation named by relname_text, acquire specified type of lock,
    2474             :  * verify we have specified permissions.
    2475             :  * Caller must close rel when done with it.
    2476             :  */
    2477             : static Relation
    2478          42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2479             : {
    2480             :     RangeVar   *relvar;
    2481             :     Relation    rel;
    2482             :     AclResult   aclresult;
    2483             : 
    2484          42 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2485          42 :     rel = table_openrv(relvar, lockmode);
    2486             : 
    2487          42 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    2488             :                                   aclmode);
    2489          42 :     if (aclresult != ACLCHECK_OK)
    2490           0 :         aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
    2491           0 :                        RelationGetRelationName(rel));
    2492             : 
    2493          42 :     return rel;
    2494             : }
    2495             : 
    2496             : /*
    2497             :  * generate_relation_name - copied from ruleutils.c
    2498             :  *      Compute the name to display for a relation
    2499             :  *
    2500             :  * The result includes all necessary quoting and schema-prefixing.
    2501             :  */
    2502             : static char *
    2503          40 : generate_relation_name(Relation rel)
    2504             : {
    2505             :     char       *nspname;
    2506             :     char       *result;
    2507             : 
    2508             :     /* Qualify the name if not visible in search path */
    2509          40 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2510          30 :         nspname = NULL;
    2511             :     else
    2512          10 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2513             : 
    2514          40 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2515             : 
    2516          40 :     return result;
    2517             : }
    2518             : 
    2519             : 
    2520             : static remoteConn *
    2521         150 : getConnectionByName(const char *name)
    2522             : {
    2523             :     remoteConnHashEnt *hentry;
    2524             :     char       *key;
    2525             : 
    2526         150 :     if (!remoteConnHash)
    2527           4 :         remoteConnHash = createConnHash();
    2528             : 
    2529         150 :     key = pstrdup(name);
    2530         150 :     truncate_identifier(key, strlen(key), false);
    2531         150 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2532             :                                                key, HASH_FIND, NULL);
    2533             : 
    2534         150 :     if (hentry)
    2535         136 :         return hentry->rconn;
    2536             : 
    2537          14 :     return NULL;
    2538             : }
    2539             : 
    2540             : static HTAB *
    2541           6 : createConnHash(void)
    2542             : {
    2543             :     HASHCTL     ctl;
    2544             : 
    2545           6 :     ctl.keysize = NAMEDATALEN;
    2546           6 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2547             : 
    2548           6 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2549             :                        HASH_ELEM | HASH_STRINGS);
    2550             : }
    2551             : 
    2552             : static void
    2553          20 : createNewConnection(const char *name, remoteConn *rconn)
    2554             : {
    2555             :     remoteConnHashEnt *hentry;
    2556             :     bool        found;
    2557             :     char       *key;
    2558             : 
    2559          20 :     if (!remoteConnHash)
    2560           2 :         remoteConnHash = createConnHash();
    2561             : 
    2562          20 :     key = pstrdup(name);
    2563          20 :     truncate_identifier(key, strlen(key), true);
    2564          20 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2565             :                                                HASH_ENTER, &found);
    2566             : 
    2567          20 :     if (found)
    2568             :     {
    2569           2 :         libpqsrv_disconnect(rconn->conn);
    2570           2 :         pfree(rconn);
    2571             : 
    2572           2 :         ereport(ERROR,
    2573             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2574             :                  errmsg("duplicate connection name")));
    2575             :     }
    2576             : 
    2577          18 :     hentry->rconn = rconn;
    2578          18 : }
    2579             : 
    2580             : static void
    2581          16 : deleteConnection(const char *name)
    2582             : {
    2583             :     remoteConnHashEnt *hentry;
    2584             :     bool        found;
    2585             :     char       *key;
    2586             : 
    2587          16 :     if (!remoteConnHash)
    2588           0 :         remoteConnHash = createConnHash();
    2589             : 
    2590          16 :     key = pstrdup(name);
    2591          16 :     truncate_identifier(key, strlen(key), false);
    2592          16 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2593             :                                                key, HASH_REMOVE, &found);
    2594             : 
    2595          16 :     if (!hentry)
    2596           0 :         ereport(ERROR,
    2597             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2598             :                  errmsg("undefined connection name")));
    2599          16 : }
    2600             : 
    2601             : /*
    2602             :  * We need to make sure that the connection made used credentials
    2603             :  * which were provided by the user, so check what credentials were
    2604             :  * used to connect and then make sure that they came from the user.
    2605             :  */
    2606             : static void
    2607          38 : dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr)
    2608             : {
    2609             :     /* Superuser bypasses security check */
    2610          38 :     if (superuser())
    2611          38 :         return;
    2612             : 
    2613             :     /* If password was used to connect, make sure it was one provided */
    2614           0 :     if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
    2615           0 :         return;
    2616             : 
    2617             : #ifdef ENABLE_GSS
    2618             :     /* If GSSAPI creds used to connect, make sure it was one delegated */
    2619             :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
    2620             :         return;
    2621             : #endif
    2622             : 
    2623             :     /* Otherwise, fail out */
    2624           0 :     libpqsrv_disconnect(conn);
    2625           0 :     if (rconn)
    2626           0 :         pfree(rconn);
    2627             : 
    2628           0 :     ereport(ERROR,
    2629             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2630             :              errmsg("password or GSSAPI delegated credentials required"),
    2631             :              errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
    2632             :              errhint("Ensure provided credentials match target server's authentication method.")));
    2633             : }
    2634             : 
    2635             : /*
    2636             :  * Function to check if the connection string includes an explicit
    2637             :  * password, needed to ensure that non-superuser password-based auth
    2638             :  * is using a provided password and not one picked up from the
    2639             :  * environment.
    2640             :  */
    2641             : static bool
    2642           2 : dblink_connstr_has_pw(const char *connstr)
    2643             : {
    2644             :     PQconninfoOption *options;
    2645             :     PQconninfoOption *option;
    2646           2 :     bool        connstr_gives_password = false;
    2647             : 
    2648           2 :     options = PQconninfoParse(connstr, NULL);
    2649           2 :     if (options)
    2650             :     {
    2651          84 :         for (option = options; option->keyword != NULL; option++)
    2652             :         {
    2653          82 :             if (strcmp(option->keyword, "password") == 0)
    2654             :             {
    2655           2 :                 if (option->val != NULL && option->val[0] != '\0')
    2656             :                 {
    2657           0 :                     connstr_gives_password = true;
    2658           0 :                     break;
    2659             :                 }
    2660             :             }
    2661             :         }
    2662           2 :         PQconninfoFree(options);
    2663             :     }
    2664             : 
    2665           2 :     return connstr_gives_password;
    2666             : }
    2667             : 
    2668             : /*
    2669             :  * For non-superusers, insist that the connstr specify a password, except
    2670             :  * if GSSAPI credentials have been delegated (and we check that they are used
    2671             :  * for the connection in dblink_security_check later).  This prevents a
    2672             :  * password or GSSAPI credentials from being picked up from .pgpass, a
    2673             :  * service file, the environment, etc.  We don't want the postgres user's
    2674             :  * passwords or Kerberos credentials to be accessible to non-superusers.
    2675             :  */
    2676             : static void
    2677          44 : dblink_connstr_check(const char *connstr)
    2678             : {
    2679          44 :     if (superuser())
    2680          42 :         return;
    2681             : 
    2682           2 :     if (dblink_connstr_has_pw(connstr))
    2683           0 :         return;
    2684             : 
    2685             : #ifdef ENABLE_GSS
    2686             :     if (be_gssapi_get_delegation(MyProcPort))
    2687             :         return;
    2688             : #endif
    2689             : 
    2690           2 :     ereport(ERROR,
    2691             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2692             :              errmsg("password or GSSAPI delegated credentials required"),
    2693             :              errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
    2694             : }
    2695             : 
    2696             : /*
    2697             :  * Report an error received from the remote server
    2698             :  *
    2699             :  * res: the received error result (will be freed)
    2700             :  * fail: true for ERROR ereport, false for NOTICE
    2701             :  * fmt and following args: sprintf-style format and values for errcontext;
    2702             :  * the resulting string should be worded like "while <some action>"
    2703             :  */
    2704             : static void
    2705          24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2706             :                  bool fail, const char *fmt,...)
    2707             : {
    2708             :     int         level;
    2709          24 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2710          24 :     char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2711          24 :     char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2712          24 :     char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2713          24 :     char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2714             :     int         sqlstate;
    2715             :     char       *message_primary;
    2716             :     char       *message_detail;
    2717             :     char       *message_hint;
    2718             :     char       *message_context;
    2719             :     va_list     ap;
    2720             :     char        dblink_context_msg[512];
    2721             : 
    2722          24 :     if (fail)
    2723           6 :         level = ERROR;
    2724             :     else
    2725          18 :         level = NOTICE;
    2726             : 
    2727          24 :     if (pg_diag_sqlstate)
    2728          24 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2729             :                                  pg_diag_sqlstate[1],
    2730             :                                  pg_diag_sqlstate[2],
    2731             :                                  pg_diag_sqlstate[3],
    2732             :                                  pg_diag_sqlstate[4]);
    2733             :     else
    2734           0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2735             : 
    2736          24 :     message_primary = xpstrdup(pg_diag_message_primary);
    2737          24 :     message_detail = xpstrdup(pg_diag_message_detail);
    2738          24 :     message_hint = xpstrdup(pg_diag_message_hint);
    2739          24 :     message_context = xpstrdup(pg_diag_context);
    2740             : 
    2741             :     /*
    2742             :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2743             :      * needed because for connection-level failures, PQgetResult may just
    2744             :      * return NULL, not a PGresult at all.
    2745             :      */
    2746          24 :     if (message_primary == NULL)
    2747           0 :         message_primary = pchomp(PQerrorMessage(conn));
    2748             : 
    2749             :     /*
    2750             :      * Now that we've copied all the data we need out of the PGresult, it's
    2751             :      * safe to free it.  We must do this to avoid PGresult leakage.  We're
    2752             :      * leaking all the strings too, but those are in palloc'd memory that will
    2753             :      * get cleaned up eventually.
    2754             :      */
    2755          24 :     PQclear(res);
    2756             : 
    2757             :     /*
    2758             :      * Format the basic errcontext string.  Below, we'll add on something
    2759             :      * about the connection name.  That's a violation of the translatability
    2760             :      * guidelines about constructing error messages out of parts, but since
    2761             :      * there's no translation support for dblink, there's no need to worry
    2762             :      * about that (yet).
    2763             :      */
    2764          24 :     va_start(ap, fmt);
    2765          24 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2766          24 :     va_end(ap);
    2767             : 
    2768          24 :     ereport(level,
    2769             :             (errcode(sqlstate),
    2770             :              (message_primary != NULL && message_primary[0] != '\0') ?
    2771             :              errmsg_internal("%s", message_primary) :
    2772             :              errmsg("could not obtain message string for remote error"),
    2773             :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2774             :              message_hint ? errhint("%s", message_hint) : 0,
    2775             :              message_context ? (errcontext("%s", message_context)) : 0,
    2776             :              conname ?
    2777             :              (errcontext("%s on dblink connection named \"%s\"",
    2778             :                          dblink_context_msg, conname)) :
    2779             :              (errcontext("%s on unnamed dblink connection",
    2780             :                          dblink_context_msg))));
    2781          18 : }
    2782             : 
    2783             : /*
    2784             :  * Obtain connection string for a foreign server
    2785             :  */
    2786             : static char *
    2787          44 : get_connect_string(const char *servername)
    2788             : {
    2789          44 :     ForeignServer *foreign_server = NULL;
    2790             :     UserMapping *user_mapping;
    2791             :     ListCell   *cell;
    2792             :     StringInfoData buf;
    2793             :     ForeignDataWrapper *fdw;
    2794             :     AclResult   aclresult;
    2795             :     char       *srvname;
    2796             : 
    2797             :     static const PQconninfoOption *options = NULL;
    2798             : 
    2799          44 :     initStringInfo(&buf);
    2800             : 
    2801             :     /*
    2802             :      * Get list of valid libpq options.
    2803             :      *
    2804             :      * To avoid unnecessary work, we get the list once and use it throughout
    2805             :      * the lifetime of this backend process.  We don't need to care about
    2806             :      * memory context issues, because PQconndefaults allocates with malloc.
    2807             :      */
    2808          44 :     if (!options)
    2809             :     {
    2810           6 :         options = PQconndefaults();
    2811           6 :         if (!options)           /* assume reason for failure is OOM */
    2812           0 :             ereport(ERROR,
    2813             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2814             :                      errmsg("out of memory"),
    2815             :                      errdetail("Could not get libpq's default connection options.")));
    2816             :     }
    2817             : 
    2818             :     /* first gather the server connstr options */
    2819          44 :     srvname = pstrdup(servername);
    2820          44 :     truncate_identifier(srvname, strlen(srvname), false);
    2821          44 :     foreign_server = GetForeignServerByName(srvname, true);
    2822             : 
    2823          44 :     if (foreign_server)
    2824             :     {
    2825           4 :         Oid         serverid = foreign_server->serverid;
    2826           4 :         Oid         fdwid = foreign_server->fdwid;
    2827           4 :         Oid         userid = GetUserId();
    2828             : 
    2829           4 :         user_mapping = GetUserMapping(userid, serverid);
    2830           4 :         fdw = GetForeignDataWrapper(fdwid);
    2831             : 
    2832             :         /* Check permissions, user must have usage on the server. */
    2833           4 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2834           4 :         if (aclresult != ACLCHECK_OK)
    2835           0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2836             : 
    2837           4 :         foreach(cell, fdw->options)
    2838             :         {
    2839           0 :             DefElem    *def = lfirst(cell);
    2840             : 
    2841           0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2842           0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2843           0 :                                  escape_param_str(strVal(def->arg)));
    2844             :         }
    2845             : 
    2846          12 :         foreach(cell, foreign_server->options)
    2847             :         {
    2848           8 :             DefElem    *def = lfirst(cell);
    2849             : 
    2850           8 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2851           8 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2852           8 :                                  escape_param_str(strVal(def->arg)));
    2853             :         }
    2854             : 
    2855           8 :         foreach(cell, user_mapping->options)
    2856             :         {
    2857             : 
    2858           4 :             DefElem    *def = lfirst(cell);
    2859             : 
    2860           4 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2861           4 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2862           4 :                                  escape_param_str(strVal(def->arg)));
    2863             :         }
    2864             : 
    2865           4 :         return buf.data;
    2866             :     }
    2867             :     else
    2868          40 :         return NULL;
    2869             : }
    2870             : 
    2871             : /*
    2872             :  * Escaping libpq connect parameter strings.
    2873             :  *
    2874             :  * Replaces "'" with "\'" and "\" with "\\".
    2875             :  */
    2876             : static char *
    2877          12 : escape_param_str(const char *str)
    2878             : {
    2879             :     const char *cp;
    2880             :     StringInfoData buf;
    2881             : 
    2882          12 :     initStringInfo(&buf);
    2883             : 
    2884         136 :     for (cp = str; *cp; cp++)
    2885             :     {
    2886         124 :         if (*cp == '\\' || *cp == '\'')
    2887           0 :             appendStringInfoChar(&buf, '\\');
    2888         124 :         appendStringInfoChar(&buf, *cp);
    2889             :     }
    2890             : 
    2891          12 :     return buf.data;
    2892             : }
    2893             : 
    2894             : /*
    2895             :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2896             :  * functions, and translate to the internal representation.
    2897             :  *
    2898             :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2899             :  * argument (the need for the separate count argument is historical, but we
    2900             :  * still check it).  We check that each attnum corresponds to a valid,
    2901             :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2902             :  * listed twice, though the actual use-case for such things is dubious.
    2903             :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2904             :  * physical not logical column numbers; this was changed for future-proofing.
    2905             :  *
    2906             :  * The internal representation is a palloc'd int array of 0-based physical
    2907             :  * attnums.
    2908             :  */
    2909             : static void
    2910          36 : validate_pkattnums(Relation rel,
    2911             :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2912             :                    int **pkattnums, int *pknumatts)
    2913             : {
    2914          36 :     TupleDesc   tupdesc = rel->rd_att;
    2915          36 :     int         natts = tupdesc->natts;
    2916             :     int         i;
    2917             : 
    2918             :     /* Don't take more array elements than there are */
    2919          36 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    2920             : 
    2921             :     /* Must have at least one pk attnum selected */
    2922          36 :     if (pknumatts_arg <= 0)
    2923           0 :         ereport(ERROR,
    2924             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2925             :                  errmsg("number of key attributes must be > 0")));
    2926             : 
    2927             :     /* Allocate output array */
    2928          36 :     *pkattnums = palloc_array(int, pknumatts_arg);
    2929          36 :     *pknumatts = pknumatts_arg;
    2930             : 
    2931             :     /* Validate attnums and convert to internal form */
    2932         114 :     for (i = 0; i < pknumatts_arg; i++)
    2933             :     {
    2934          90 :         int         pkattnum = pkattnums_arg->values[i];
    2935             :         int         lnum;
    2936             :         int         j;
    2937             : 
    2938             :         /* Can throw error immediately if out of range */
    2939          90 :         if (pkattnum <= 0 || pkattnum > natts)
    2940          12 :             ereport(ERROR,
    2941             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2942             :                      errmsg("invalid attribute number %d", pkattnum)));
    2943             : 
    2944             :         /* Identify which physical column has this logical number */
    2945          78 :         lnum = 0;
    2946         138 :         for (j = 0; j < natts; j++)
    2947             :         {
    2948             :             /* dropped columns don't count */
    2949         138 :             if (TupleDescAttr(tupdesc, j)->attisdropped)
    2950           6 :                 continue;
    2951             : 
    2952         132 :             if (++lnum == pkattnum)
    2953          78 :                 break;
    2954             :         }
    2955             : 
    2956          78 :         if (j < natts)
    2957          78 :             (*pkattnums)[i] = j;
    2958             :         else
    2959           0 :             ereport(ERROR,
    2960             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2961             :                      errmsg("invalid attribute number %d", pkattnum)));
    2962             :     }
    2963          24 : }
    2964             : 
    2965             : /*
    2966             :  * Check if the specified connection option is valid.
    2967             :  *
    2968             :  * We basically allow whatever libpq thinks is an option, with these
    2969             :  * restrictions:
    2970             :  *      debug options: disallowed
    2971             :  *      "client_encoding": disallowed
    2972             :  *      "user": valid only in USER MAPPING options
    2973             :  *      secure options (eg password): valid only in USER MAPPING options
    2974             :  *      others: valid only in FOREIGN SERVER options
    2975             :  *
    2976             :  * We disallow client_encoding because it would be overridden anyway via
    2977             :  * PQclientEncoding; allowing it to be specified would merely promote
    2978             :  * confusion.
    2979             :  */
    2980             : static bool
    2981         186 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    2982             :                        Oid context)
    2983             : {
    2984             :     const PQconninfoOption *opt;
    2985             : 
    2986             :     /* Look up the option in libpq result */
    2987        3726 :     for (opt = options; opt->keyword; opt++)
    2988             :     {
    2989        3722 :         if (strcmp(opt->keyword, option) == 0)
    2990         182 :             break;
    2991             :     }
    2992         186 :     if (opt->keyword == NULL)
    2993           4 :         return false;
    2994             : 
    2995             :     /* Disallow debug options (particularly "replication") */
    2996         182 :     if (strchr(opt->dispchar, 'D'))
    2997           4 :         return false;
    2998             : 
    2999             :     /* Disallow "client_encoding" */
    3000         178 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    3001           4 :         return false;
    3002             : 
    3003             :     /*
    3004             :      * If the option is "user" or marked secure, it should be specified only
    3005             :      * in USER MAPPING.  Others should be specified only in SERVER.
    3006             :      */
    3007         174 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3008             :     {
    3009          18 :         if (context != UserMappingRelationId)
    3010           6 :             return false;
    3011             :     }
    3012             :     else
    3013             :     {
    3014         156 :         if (context != ForeignServerRelationId)
    3015         144 :             return false;
    3016             :     }
    3017             : 
    3018          24 :     return true;
    3019             : }
    3020             : 
    3021             : /*
    3022             :  * Copy the remote session's values of GUCs that affect datatype I/O
    3023             :  * and apply them locally in a new GUC nesting level.  Returns the new
    3024             :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3025             :  * or -1 if no new nestlevel was needed.
    3026             :  *
    3027             :  * We use the equivalent of a function SET option to allow the settings to
    3028             :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3029             :  * thrown in between, guc.c will take care of undoing the settings.
    3030             :  */
    3031             : static int
    3032          64 : applyRemoteGucs(PGconn *conn)
    3033             : {
    3034             :     static const char *const GUCsAffectingIO[] = {
    3035             :         "DateStyle",
    3036             :         "IntervalStyle"
    3037             :     };
    3038             : 
    3039          64 :     int         nestlevel = -1;
    3040             :     int         i;
    3041             : 
    3042         192 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3043             :     {
    3044         128 :         const char *gucName = GUCsAffectingIO[i];
    3045         128 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3046             :         const char *localVal;
    3047             : 
    3048             :         /*
    3049             :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3050             :          * that's okay because its output format won't be ambiguous.  So just
    3051             :          * skip the GUC if we don't get a value for it.  (We might eventually
    3052             :          * need more complicated logic with remote-version checks here.)
    3053             :          */
    3054         128 :         if (remoteVal == NULL)
    3055           0 :             continue;
    3056             : 
    3057             :         /*
    3058             :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3059             :          * have the same value.
    3060             :          */
    3061         128 :         localVal = GetConfigOption(gucName, false, false);
    3062             :         Assert(localVal != NULL);
    3063             : 
    3064         128 :         if (strcmp(remoteVal, localVal) == 0)
    3065          94 :             continue;
    3066             : 
    3067             :         /* Create new GUC nest level if we didn't already */
    3068          34 :         if (nestlevel < 0)
    3069          18 :             nestlevel = NewGUCNestLevel();
    3070             : 
    3071             :         /* Apply the option (this will throw error on failure) */
    3072          34 :         (void) set_config_option(gucName, remoteVal,
    3073             :                                  PGC_USERSET, PGC_S_SESSION,
    3074             :                                  GUC_ACTION_SAVE, true, 0, false);
    3075             :     }
    3076             : 
    3077          64 :     return nestlevel;
    3078             : }
    3079             : 
    3080             : /*
    3081             :  * Restore local GUCs after they have been overlaid with remote settings.
    3082             :  */
    3083             : static void
    3084          66 : restoreLocalGucs(int nestlevel)
    3085             : {
    3086             :     /* Do nothing if no new nestlevel was created */
    3087          66 :     if (nestlevel > 0)
    3088          16 :         AtEOXact_GUC(true, nestlevel);
    3089          66 : }

Generated by: LCOV version 1.14