LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Hit Total Coverage
Test: PostgreSQL 15devel Lines: 945 1093 86.5 %
Date: 2021-11-29 06:09:26 Functions: 74 76 97.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * dblink.c
       3             :  *
       4             :  * Functions returning results from a remote database
       5             :  *
       6             :  * Joe Conway <mail@joeconway.com>
       7             :  * And contributors:
       8             :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9             :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10             :  *
      11             :  * contrib/dblink/dblink.c
      12             :  * Copyright (c) 2001-2021, PostgreSQL Global Development Group
      13             :  * ALL RIGHTS RESERVED;
      14             :  *
      15             :  * Permission to use, copy, modify, and distribute this software and its
      16             :  * documentation for any purpose, without fee, and without a written agreement
      17             :  * is hereby granted, provided that the above copyright notice and this
      18             :  * paragraph and the following two paragraphs appear in all copies.
      19             :  *
      20             :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21             :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22             :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23             :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24             :  * POSSIBILITY OF SUCH DAMAGE.
      25             :  *
      26             :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27             :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28             :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29             :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30             :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31             :  *
      32             :  */
      33             : #include "postgres.h"
      34             : 
      35             : #include <limits.h>
      36             : 
      37             : #include "access/htup_details.h"
      38             : #include "access/relation.h"
      39             : #include "access/reloptions.h"
      40             : #include "access/table.h"
      41             : #include "catalog/namespace.h"
      42             : #include "catalog/pg_foreign_data_wrapper.h"
      43             : #include "catalog/pg_foreign_server.h"
      44             : #include "catalog/pg_type.h"
      45             : #include "catalog/pg_user_mapping.h"
      46             : #include "executor/spi.h"
      47             : #include "foreign/foreign.h"
      48             : #include "funcapi.h"
      49             : #include "lib/stringinfo.h"
      50             : #include "libpq-fe.h"
      51             : #include "mb/pg_wchar.h"
      52             : #include "miscadmin.h"
      53             : #include "parser/scansup.h"
      54             : #include "utils/acl.h"
      55             : #include "utils/builtins.h"
      56             : #include "utils/fmgroids.h"
      57             : #include "utils/guc.h"
      58             : #include "utils/lsyscache.h"
      59             : #include "utils/memutils.h"
      60             : #include "utils/rel.h"
      61             : #include "utils/varlena.h"
      62             : 
      63           6 : PG_MODULE_MAGIC;
      64             : 
      65             : typedef struct remoteConn
      66             : {
      67             :     PGconn     *conn;           /* Hold the remote connection */
      68             :     int         openCursorCount;    /* The number of open cursors */
      69             :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      70             : } remoteConn;
      71             : 
      72             : typedef struct storeInfo
      73             : {
      74             :     FunctionCallInfo fcinfo;
      75             :     Tuplestorestate *tuplestore;
      76             :     AttInMetadata *attinmeta;
      77             :     MemoryContext tmpcontext;
      78             :     char      **cstrs;
      79             :     /* temp storage for results to avoid leaks on exception */
      80             :     PGresult   *last_res;
      81             :     PGresult   *cur_res;
      82             : } storeInfo;
      83             : 
      84             : /*
      85             :  * Internal declarations
      86             :  */
      87             : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      88             : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      89             : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      90             :                               PGresult *res);
      91             : static void materializeQueryResult(FunctionCallInfo fcinfo,
      92             :                                    PGconn *conn,
      93             :                                    const char *conname,
      94             :                                    const char *sql,
      95             :                                    bool fail);
      96             : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
      97             : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
      98             : static remoteConn *getConnectionByName(const char *name);
      99             : static HTAB *createConnHash(void);
     100             : static void createNewConnection(const char *name, remoteConn *rconn);
     101             : static void deleteConnection(const char *name);
     102             : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     103             : static char **get_text_array_contents(ArrayType *array, int *numitems);
     104             : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     105             : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     106             : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     107             : static char *quote_ident_cstr(char *rawstr);
     108             : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     109             : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     110             : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     111             : static char *generate_relation_name(Relation rel);
     112             : static void dblink_connstr_check(const char *connstr);
     113             : static void dblink_security_check(PGconn *conn, remoteConn *rconn);
     114             : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     115             :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     116             : static char *get_connect_string(const char *servername);
     117             : static char *escape_param_str(const char *from);
     118             : static void validate_pkattnums(Relation rel,
     119             :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     120             :                                int **pkattnums, int *pknumatts);
     121             : static bool is_valid_dblink_option(const PQconninfoOption *options,
     122             :                                    const char *option, Oid context);
     123             : static int  applyRemoteGucs(PGconn *conn);
     124             : static void restoreLocalGucs(int nestlevel);
     125             : 
     126             : /* Global */
     127             : static remoteConn *pconn = NULL;
     128             : static HTAB *remoteConnHash = NULL;
     129             : 
     130             : /*
     131             :  *  Following is list that holds multiple remote connections.
     132             :  *  Calling convention of each dblink function changes to accept
     133             :  *  connection name as the first parameter. The connection list is
     134             :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     135             :  */
     136             : 
     137             : typedef struct remoteConnHashEnt
     138             : {
     139             :     char        name[NAMEDATALEN];
     140             :     remoteConn *rconn;
     141             : } remoteConnHashEnt;
     142             : 
     143             : /* initial number of connection hashes */
     144             : #define NUMCONN 16
     145             : 
     146             : static char *
     147          96 : xpstrdup(const char *in)
     148             : {
     149          96 :     if (in == NULL)
     150          72 :         return NULL;
     151          24 :     return pstrdup(in);
     152             : }
     153             : 
     154             : static void
     155             : pg_attribute_noreturn()
     156           0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     157             : {
     158           0 :     char       *msg = pchomp(PQerrorMessage(conn));
     159             : 
     160           0 :     if (res)
     161           0 :         PQclear(res);
     162           0 :     elog(ERROR, "%s: %s", p2, msg);
     163             : }
     164             : 
     165             : static void
     166             : pg_attribute_noreturn()
     167           6 : dblink_conn_not_avail(const char *conname)
     168             : {
     169           6 :     if (conname)
     170           2 :         ereport(ERROR,
     171             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     172             :                  errmsg("connection \"%s\" not available", conname)));
     173             :     else
     174           4 :         ereport(ERROR,
     175             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     176             :                  errmsg("connection not available")));
     177             : }
     178             : 
     179             : static void
     180          66 : dblink_get_conn(char *conname_or_str,
     181             :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     182             : {
     183          66 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     184             :     PGconn     *conn;
     185             :     char       *conname;
     186             :     bool        freeconn;
     187             : 
     188          66 :     if (rconn)
     189             :     {
     190          54 :         conn = rconn->conn;
     191          54 :         conname = conname_or_str;
     192          54 :         freeconn = false;
     193             :     }
     194             :     else
     195             :     {
     196             :         const char *connstr;
     197             : 
     198          12 :         connstr = get_connect_string(conname_or_str);
     199          12 :         if (connstr == NULL)
     200          12 :             connstr = conname_or_str;
     201          12 :         dblink_connstr_check(connstr);
     202             : 
     203             :         /*
     204             :          * We must obey fd.c's limit on non-virtual file descriptors.  Assume
     205             :          * that a PGconn represents one long-lived FD.  (Doing this here also
     206             :          * ensures that VFDs are closed if needed to make room.)
     207             :          */
     208          12 :         if (!AcquireExternalFD())
     209             :         {
     210             : #ifndef WIN32                   /* can't write #if within ereport() macro */
     211           0 :             ereport(ERROR,
     212             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     213             :                      errmsg("could not establish connection"),
     214             :                      errdetail("There are too many open files on the local server."),
     215             :                      errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
     216             : #else
     217             :             ereport(ERROR,
     218             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     219             :                      errmsg("could not establish connection"),
     220             :                      errdetail("There are too many open files on the local server."),
     221             :                      errhint("Raise the server's max_files_per_process setting.")));
     222             : #endif
     223             :         }
     224             : 
     225             :         /* OK to make connection */
     226          12 :         conn = PQconnectdb(connstr);
     227             : 
     228          12 :         if (PQstatus(conn) == CONNECTION_BAD)
     229             :         {
     230           4 :             char       *msg = pchomp(PQerrorMessage(conn));
     231             : 
     232           4 :             PQfinish(conn);
     233           4 :             ReleaseExternalFD();
     234           4 :             ereport(ERROR,
     235             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     236             :                      errmsg("could not establish connection"),
     237             :                      errdetail_internal("%s", msg)));
     238             :         }
     239           8 :         dblink_security_check(conn, rconn);
     240           8 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     241           0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     242           8 :         freeconn = true;
     243           8 :         conname = NULL;
     244             :     }
     245             : 
     246          62 :     *conn_p = conn;
     247          62 :     *conname_p = conname;
     248          62 :     *freeconn_p = freeconn;
     249          62 : }
     250             : 
     251             : static PGconn *
     252          34 : dblink_get_named_conn(const char *conname)
     253             : {
     254          34 :     remoteConn *rconn = getConnectionByName(conname);
     255             : 
     256          34 :     if (rconn)
     257          34 :         return rconn->conn;
     258             : 
     259           0 :     dblink_conn_not_avail(conname);
     260             :     return NULL;                /* keep compiler quiet */
     261             : }
     262             : 
     263             : static void
     264         234 : dblink_init(void)
     265             : {
     266         234 :     if (!pconn)
     267             :     {
     268           6 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     269           6 :         pconn->conn = NULL;
     270           6 :         pconn->openCursorCount = 0;
     271           6 :         pconn->newXactForCursor = false;
     272             :     }
     273         234 : }
     274             : 
     275             : /*
     276             :  * Create a persistent connection to another database
     277             :  */
     278          18 : PG_FUNCTION_INFO_V1(dblink_connect);
     279             : Datum
     280          28 : dblink_connect(PG_FUNCTION_ARGS)
     281             : {
     282          28 :     char       *conname_or_str = NULL;
     283          28 :     char       *connstr = NULL;
     284          28 :     char       *connname = NULL;
     285             :     char       *msg;
     286          28 :     PGconn     *conn = NULL;
     287          28 :     remoteConn *rconn = NULL;
     288             : 
     289          28 :     dblink_init();
     290             : 
     291          28 :     if (PG_NARGS() == 2)
     292             :     {
     293          22 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     294          22 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     295             :     }
     296           6 :     else if (PG_NARGS() == 1)
     297           6 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     298             : 
     299          28 :     if (connname)
     300             :     {
     301          22 :         rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
     302             :                                                   sizeof(remoteConn));
     303          22 :         rconn->conn = NULL;
     304          22 :         rconn->openCursorCount = 0;
     305          22 :         rconn->newXactForCursor = false;
     306             :     }
     307             : 
     308             :     /* first check for valid foreign data server */
     309          28 :     connstr = get_connect_string(conname_or_str);
     310          28 :     if (connstr == NULL)
     311          24 :         connstr = conname_or_str;
     312             : 
     313             :     /* check password in connection string if not superuser */
     314          28 :     dblink_connstr_check(connstr);
     315             : 
     316             :     /*
     317             :      * We must obey fd.c's limit on non-virtual file descriptors.  Assume that
     318             :      * a PGconn represents one long-lived FD.  (Doing this here also ensures
     319             :      * that VFDs are closed if needed to make room.)
     320             :      */
     321          26 :     if (!AcquireExternalFD())
     322             :     {
     323             : #ifndef WIN32                   /* can't write #if within ereport() macro */
     324           0 :         ereport(ERROR,
     325             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     326             :                  errmsg("could not establish connection"),
     327             :                  errdetail("There are too many open files on the local server."),
     328             :                  errhint("Raise the server's max_files_per_process and/or \"ulimit -n\" limits.")));
     329             : #else
     330             :         ereport(ERROR,
     331             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     332             :                  errmsg("could not establish connection"),
     333             :                  errdetail("There are too many open files on the local server."),
     334             :                  errhint("Raise the server's max_files_per_process setting.")));
     335             : #endif
     336             :     }
     337             : 
     338             :     /* OK to make connection */
     339          26 :     conn = PQconnectdb(connstr);
     340             : 
     341          26 :     if (PQstatus(conn) == CONNECTION_BAD)
     342             :     {
     343           0 :         msg = pchomp(PQerrorMessage(conn));
     344           0 :         PQfinish(conn);
     345           0 :         ReleaseExternalFD();
     346           0 :         if (rconn)
     347           0 :             pfree(rconn);
     348             : 
     349           0 :         ereport(ERROR,
     350             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     351             :                  errmsg("could not establish connection"),
     352             :                  errdetail_internal("%s", msg)));
     353             :     }
     354             : 
     355             :     /* check password actually used if not superuser */
     356          26 :     dblink_security_check(conn, rconn);
     357             : 
     358             :     /* attempt to set client encoding to match server encoding, if needed */
     359          26 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     360           0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     361             : 
     362          26 :     if (connname)
     363             :     {
     364          20 :         rconn->conn = conn;
     365          20 :         createNewConnection(connname, rconn);
     366             :     }
     367             :     else
     368             :     {
     369           6 :         if (pconn->conn)
     370             :         {
     371           0 :             PQfinish(pconn->conn);
     372           0 :             ReleaseExternalFD();
     373             :         }
     374           6 :         pconn->conn = conn;
     375             :     }
     376             : 
     377          24 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     378             : }
     379             : 
     380             : /*
     381             :  * Clear a persistent connection to another database
     382             :  */
     383          12 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     384             : Datum
     385          24 : dblink_disconnect(PG_FUNCTION_ARGS)
     386             : {
     387          24 :     char       *conname = NULL;
     388          24 :     remoteConn *rconn = NULL;
     389          24 :     PGconn     *conn = NULL;
     390             : 
     391          24 :     dblink_init();
     392             : 
     393          24 :     if (PG_NARGS() == 1)
     394             :     {
     395          18 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     396          18 :         rconn = getConnectionByName(conname);
     397          18 :         if (rconn)
     398          16 :             conn = rconn->conn;
     399             :     }
     400             :     else
     401           6 :         conn = pconn->conn;
     402             : 
     403          24 :     if (!conn)
     404           2 :         dblink_conn_not_avail(conname);
     405             : 
     406          22 :     PQfinish(conn);
     407          22 :     ReleaseExternalFD();
     408          22 :     if (rconn)
     409             :     {
     410          16 :         deleteConnection(conname);
     411          16 :         pfree(rconn);
     412             :     }
     413             :     else
     414           6 :         pconn->conn = NULL;
     415             : 
     416          22 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     417             : }
     418             : 
     419             : /*
     420             :  * opens a cursor using a persistent connection
     421             :  */
     422          18 : PG_FUNCTION_INFO_V1(dblink_open);
     423             : Datum
     424          18 : dblink_open(PG_FUNCTION_ARGS)
     425             : {
     426          18 :     PGresult   *res = NULL;
     427             :     PGconn     *conn;
     428          18 :     char       *curname = NULL;
     429          18 :     char       *sql = NULL;
     430          18 :     char       *conname = NULL;
     431             :     StringInfoData buf;
     432          18 :     remoteConn *rconn = NULL;
     433          18 :     bool        fail = true;    /* default to backward compatible behavior */
     434             : 
     435          18 :     dblink_init();
     436          18 :     initStringInfo(&buf);
     437             : 
     438          18 :     if (PG_NARGS() == 2)
     439             :     {
     440             :         /* text,text */
     441           4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     442           4 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     443           4 :         rconn = pconn;
     444             :     }
     445          14 :     else if (PG_NARGS() == 3)
     446             :     {
     447             :         /* might be text,text,text or text,text,bool */
     448          12 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     449             :         {
     450           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     451           2 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     452           2 :             fail = PG_GETARG_BOOL(2);
     453           2 :             rconn = pconn;
     454             :         }
     455             :         else
     456             :         {
     457          10 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     458          10 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     459          10 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     460          10 :             rconn = getConnectionByName(conname);
     461             :         }
     462             :     }
     463           2 :     else if (PG_NARGS() == 4)
     464             :     {
     465             :         /* text,text,text,bool */
     466           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     467           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     468           2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     469           2 :         fail = PG_GETARG_BOOL(3);
     470           2 :         rconn = getConnectionByName(conname);
     471             :     }
     472             : 
     473          18 :     if (!rconn || !rconn->conn)
     474           0 :         dblink_conn_not_avail(conname);
     475             : 
     476          18 :     conn = rconn->conn;
     477             : 
     478             :     /* If we are not in a transaction, start one */
     479          18 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     480             :     {
     481          14 :         res = PQexec(conn, "BEGIN");
     482          14 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     483           0 :             dblink_res_internalerror(conn, res, "begin error");
     484          14 :         PQclear(res);
     485          14 :         rconn->newXactForCursor = true;
     486             : 
     487             :         /*
     488             :          * Since transaction state was IDLE, we force cursor count to
     489             :          * initially be 0. This is needed as a previous ABORT might have wiped
     490             :          * out our transaction without maintaining the cursor count for us.
     491             :          */
     492          14 :         rconn->openCursorCount = 0;
     493             :     }
     494             : 
     495             :     /* if we started a transaction, increment cursor count */
     496          18 :     if (rconn->newXactForCursor)
     497          18 :         (rconn->openCursorCount)++;
     498             : 
     499          18 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     500          18 :     res = PQexec(conn, buf.data);
     501          18 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     502             :     {
     503           4 :         dblink_res_error(conn, conname, res, fail,
     504             :                          "while opening cursor \"%s\"", curname);
     505           4 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     506             :     }
     507             : 
     508          14 :     PQclear(res);
     509          14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     510             : }
     511             : 
     512             : /*
     513             :  * closes a cursor
     514             :  */
     515          12 : PG_FUNCTION_INFO_V1(dblink_close);
     516             : Datum
     517          10 : dblink_close(PG_FUNCTION_ARGS)
     518             : {
     519             :     PGconn     *conn;
     520          10 :     PGresult   *res = NULL;
     521          10 :     char       *curname = NULL;
     522          10 :     char       *conname = NULL;
     523             :     StringInfoData buf;
     524          10 :     remoteConn *rconn = NULL;
     525          10 :     bool        fail = true;    /* default to backward compatible behavior */
     526             : 
     527          10 :     dblink_init();
     528          10 :     initStringInfo(&buf);
     529             : 
     530          10 :     if (PG_NARGS() == 1)
     531             :     {
     532             :         /* text */
     533           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     534           0 :         rconn = pconn;
     535             :     }
     536          10 :     else if (PG_NARGS() == 2)
     537             :     {
     538             :         /* might be text,text or text,bool */
     539          10 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     540             :         {
     541           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     542           4 :             fail = PG_GETARG_BOOL(1);
     543           4 :             rconn = pconn;
     544             :         }
     545             :         else
     546             :         {
     547           6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     548           6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     549           6 :             rconn = getConnectionByName(conname);
     550             :         }
     551             :     }
     552          10 :     if (PG_NARGS() == 3)
     553             :     {
     554             :         /* text,text,bool */
     555           0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     556           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     557           0 :         fail = PG_GETARG_BOOL(2);
     558           0 :         rconn = getConnectionByName(conname);
     559             :     }
     560             : 
     561          10 :     if (!rconn || !rconn->conn)
     562           0 :         dblink_conn_not_avail(conname);
     563             : 
     564          10 :     conn = rconn->conn;
     565             : 
     566          10 :     appendStringInfo(&buf, "CLOSE %s", curname);
     567             : 
     568             :     /* close the cursor */
     569          10 :     res = PQexec(conn, buf.data);
     570          10 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     571             :     {
     572           2 :         dblink_res_error(conn, conname, res, fail,
     573             :                          "while closing cursor \"%s\"", curname);
     574           2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     575             :     }
     576             : 
     577           8 :     PQclear(res);
     578             : 
     579             :     /* if we started a transaction, decrement cursor count */
     580           8 :     if (rconn->newXactForCursor)
     581             :     {
     582           8 :         (rconn->openCursorCount)--;
     583             : 
     584             :         /* if count is zero, commit the transaction */
     585           8 :         if (rconn->openCursorCount == 0)
     586             :         {
     587           4 :             rconn->newXactForCursor = false;
     588             : 
     589           4 :             res = PQexec(conn, "COMMIT");
     590           4 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     591           0 :                 dblink_res_internalerror(conn, res, "commit error");
     592           4 :             PQclear(res);
     593             :         }
     594             :     }
     595             : 
     596           8 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     597             : }
     598             : 
     599             : /*
     600             :  * Fetch results from an open cursor
     601             :  */
     602          18 : PG_FUNCTION_INFO_V1(dblink_fetch);
     603             : Datum
     604          26 : dblink_fetch(PG_FUNCTION_ARGS)
     605             : {
     606          26 :     PGresult   *res = NULL;
     607          26 :     char       *conname = NULL;
     608          26 :     remoteConn *rconn = NULL;
     609          26 :     PGconn     *conn = NULL;
     610             :     StringInfoData buf;
     611          26 :     char       *curname = NULL;
     612          26 :     int         howmany = 0;
     613          26 :     bool        fail = true;    /* default to backward compatible */
     614             : 
     615          26 :     prepTuplestoreResult(fcinfo);
     616             : 
     617          26 :     dblink_init();
     618             : 
     619          26 :     if (PG_NARGS() == 4)
     620             :     {
     621             :         /* text,text,int,bool */
     622           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     623           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     624           2 :         howmany = PG_GETARG_INT32(2);
     625           2 :         fail = PG_GETARG_BOOL(3);
     626             : 
     627           2 :         rconn = getConnectionByName(conname);
     628           2 :         if (rconn)
     629           2 :             conn = rconn->conn;
     630             :     }
     631          24 :     else if (PG_NARGS() == 3)
     632             :     {
     633             :         /* text,text,int or text,int,bool */
     634          16 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     635             :         {
     636           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     637           4 :             howmany = PG_GETARG_INT32(1);
     638           4 :             fail = PG_GETARG_BOOL(2);
     639           4 :             conn = pconn->conn;
     640             :         }
     641             :         else
     642             :         {
     643          12 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     644          12 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     645          12 :             howmany = PG_GETARG_INT32(2);
     646             : 
     647          12 :             rconn = getConnectionByName(conname);
     648          12 :             if (rconn)
     649          12 :                 conn = rconn->conn;
     650             :         }
     651             :     }
     652           8 :     else if (PG_NARGS() == 2)
     653             :     {
     654             :         /* text,int */
     655           8 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     656           8 :         howmany = PG_GETARG_INT32(1);
     657           8 :         conn = pconn->conn;
     658             :     }
     659             : 
     660          26 :     if (!conn)
     661           0 :         dblink_conn_not_avail(conname);
     662             : 
     663          26 :     initStringInfo(&buf);
     664          26 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     665             : 
     666             :     /*
     667             :      * Try to execute the query.  Note that since libpq uses malloc, the
     668             :      * PGresult will be long-lived even though we are still in a short-lived
     669             :      * memory context.
     670             :      */
     671          26 :     res = PQexec(conn, buf.data);
     672          52 :     if (!res ||
     673          52 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     674          26 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     675             :     {
     676          10 :         dblink_res_error(conn, conname, res, fail,
     677             :                          "while fetching from cursor \"%s\"", curname);
     678           6 :         return (Datum) 0;
     679             :     }
     680          16 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     681             :     {
     682             :         /* cursor does not exist - closed already or bad name */
     683           0 :         PQclear(res);
     684           0 :         ereport(ERROR,
     685             :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     686             :                  errmsg("cursor \"%s\" does not exist", curname)));
     687             :     }
     688             : 
     689          16 :     materializeResult(fcinfo, conn, res);
     690          14 :     return (Datum) 0;
     691             : }
     692             : 
     693             : /*
     694             :  * Note: this is the new preferred version of dblink
     695             :  */
     696          22 : PG_FUNCTION_INFO_V1(dblink_record);
     697             : Datum
     698          50 : dblink_record(PG_FUNCTION_ARGS)
     699             : {
     700          50 :     return dblink_record_internal(fcinfo, false);
     701             : }
     702             : 
     703           6 : PG_FUNCTION_INFO_V1(dblink_send_query);
     704             : Datum
     705          12 : dblink_send_query(PG_FUNCTION_ARGS)
     706             : {
     707             :     PGconn     *conn;
     708             :     char       *sql;
     709             :     int         retval;
     710             : 
     711          12 :     if (PG_NARGS() == 2)
     712             :     {
     713          12 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     714          12 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     715             :     }
     716             :     else
     717             :         /* shouldn't happen */
     718           0 :         elog(ERROR, "wrong number of arguments");
     719             : 
     720             :     /* async query send */
     721          12 :     retval = PQsendQuery(conn, sql);
     722          12 :     if (retval != 1)
     723           0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     724             : 
     725          12 :     PG_RETURN_INT32(retval);
     726             : }
     727             : 
     728           8 : PG_FUNCTION_INFO_V1(dblink_get_result);
     729             : Datum
     730          16 : dblink_get_result(PG_FUNCTION_ARGS)
     731             : {
     732          16 :     return dblink_record_internal(fcinfo, true);
     733             : }
     734             : 
     735             : static Datum
     736          66 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     737             : {
     738          66 :     PGconn     *volatile conn = NULL;
     739          66 :     volatile bool freeconn = false;
     740             : 
     741          66 :     prepTuplestoreResult(fcinfo);
     742             : 
     743          66 :     dblink_init();
     744             : 
     745          66 :     PG_TRY();
     746             :     {
     747          66 :         char       *sql = NULL;
     748          66 :         char       *conname = NULL;
     749          66 :         bool        fail = true;    /* default to backward compatible */
     750             : 
     751          66 :         if (!is_async)
     752             :         {
     753          50 :             if (PG_NARGS() == 3)
     754             :             {
     755             :                 /* text,text,bool */
     756           2 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     757           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     758           2 :                 fail = PG_GETARG_BOOL(2);
     759           2 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     760             :             }
     761          48 :             else if (PG_NARGS() == 2)
     762             :             {
     763             :                 /* text,text or text,bool */
     764          34 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     765             :                 {
     766           2 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     767           2 :                     fail = PG_GETARG_BOOL(1);
     768           2 :                     conn = pconn->conn;
     769             :                 }
     770             :                 else
     771             :                 {
     772          32 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     773          32 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     774          32 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     775             :                 }
     776             :             }
     777          14 :             else if (PG_NARGS() == 1)
     778             :             {
     779             :                 /* text */
     780          14 :                 conn = pconn->conn;
     781          14 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     782             :             }
     783             :             else
     784             :                 /* shouldn't happen */
     785           0 :                 elog(ERROR, "wrong number of arguments");
     786             :         }
     787             :         else                    /* is_async */
     788             :         {
     789             :             /* get async result */
     790          16 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     791             : 
     792          16 :             if (PG_NARGS() == 2)
     793             :             {
     794             :                 /* text,bool */
     795           0 :                 fail = PG_GETARG_BOOL(1);
     796           0 :                 conn = dblink_get_named_conn(conname);
     797             :             }
     798          16 :             else if (PG_NARGS() == 1)
     799             :             {
     800             :                 /* text */
     801          16 :                 conn = dblink_get_named_conn(conname);
     802             :             }
     803             :             else
     804             :                 /* shouldn't happen */
     805           0 :                 elog(ERROR, "wrong number of arguments");
     806             :         }
     807             : 
     808          62 :         if (!conn)
     809           4 :             dblink_conn_not_avail(conname);
     810             : 
     811          58 :         if (!is_async)
     812             :         {
     813             :             /* synchronous query, use efficient tuple collection method */
     814          42 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     815             :         }
     816             :         else
     817             :         {
     818             :             /* async result retrieval, do it the old way */
     819          16 :             PGresult   *res = PQgetResult(conn);
     820             : 
     821             :             /* NULL means we're all done with the async results */
     822          16 :             if (res)
     823             :             {
     824          10 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     825          10 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     826             :                 {
     827           0 :                     dblink_res_error(conn, conname, res, fail,
     828             :                                      "while executing query");
     829             :                     /* if fail isn't set, we'll return an empty query result */
     830             :                 }
     831             :                 else
     832             :                 {
     833          10 :                     materializeResult(fcinfo, conn, res);
     834             :                 }
     835             :             }
     836             :         }
     837             :     }
     838           8 :     PG_FINALLY();
     839             :     {
     840             :         /* if needed, close the connection to the database */
     841          66 :         if (freeconn)
     842             :         {
     843           6 :             PQfinish(conn);
     844           6 :             ReleaseExternalFD();
     845             :         }
     846             :     }
     847          66 :     PG_END_TRY();
     848             : 
     849          58 :     return (Datum) 0;
     850             : }
     851             : 
     852             : /*
     853             :  * Verify function caller can handle a tuplestore result, and set up for that.
     854             :  *
     855             :  * Note: if the caller returns without actually creating a tuplestore, the
     856             :  * executor will treat the function result as an empty set.
     857             :  */
     858             : static void
     859          96 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     860             : {
     861          96 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     862             : 
     863             :     /* check to see if query supports us returning a tuplestore */
     864          96 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     865           0 :         ereport(ERROR,
     866             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     867             :                  errmsg("set-valued function called in context that cannot accept a set")));
     868          96 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     869           0 :         ereport(ERROR,
     870             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     871             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     872             : 
     873             :     /* let the executor know we're sending back a tuplestore */
     874          96 :     rsinfo->returnMode = SFRM_Materialize;
     875             : 
     876             :     /* caller must fill these to return a non-empty result */
     877          96 :     rsinfo->setResult = NULL;
     878          96 :     rsinfo->setDesc = NULL;
     879          96 : }
     880             : 
     881             : /*
     882             :  * Copy the contents of the PGresult into a tuplestore to be returned
     883             :  * as the result of the current function.
     884             :  * The PGresult will be released in this function.
     885             :  */
     886             : static void
     887          26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     888             : {
     889          26 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     890             : 
     891             :     /* prepTuplestoreResult must have been called previously */
     892             :     Assert(rsinfo->returnMode == SFRM_Materialize);
     893             : 
     894          26 :     PG_TRY();
     895             :     {
     896             :         TupleDesc   tupdesc;
     897             :         bool        is_sql_cmd;
     898             :         int         ntuples;
     899             :         int         nfields;
     900             : 
     901          26 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     902             :         {
     903           0 :             is_sql_cmd = true;
     904             : 
     905             :             /*
     906             :              * need a tuple descriptor representing one TEXT column to return
     907             :              * the command status string as our result tuple
     908             :              */
     909           0 :             tupdesc = CreateTemplateTupleDesc(1);
     910           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     911             :                                TEXTOID, -1, 0);
     912           0 :             ntuples = 1;
     913           0 :             nfields = 1;
     914             :         }
     915             :         else
     916             :         {
     917             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     918             : 
     919          26 :             is_sql_cmd = false;
     920             : 
     921             :             /* get a tuple descriptor for our result type */
     922          26 :             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     923             :             {
     924          26 :                 case TYPEFUNC_COMPOSITE:
     925             :                     /* success */
     926          26 :                     break;
     927           0 :                 case TYPEFUNC_RECORD:
     928             :                     /* failed to determine actual type of RECORD */
     929           0 :                     ereport(ERROR,
     930             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     931             :                              errmsg("function returning record called in context "
     932             :                                     "that cannot accept type record")));
     933             :                     break;
     934           0 :                 default:
     935             :                     /* result type isn't composite */
     936           0 :                     elog(ERROR, "return type must be a row type");
     937             :                     break;
     938             :             }
     939             : 
     940             :             /* make sure we have a persistent copy of the tupdesc */
     941          26 :             tupdesc = CreateTupleDescCopy(tupdesc);
     942          26 :             ntuples = PQntuples(res);
     943          26 :             nfields = PQnfields(res);
     944             :         }
     945             : 
     946             :         /*
     947             :          * check result and tuple descriptor have the same number of columns
     948             :          */
     949          26 :         if (nfields != tupdesc->natts)
     950           0 :             ereport(ERROR,
     951             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
     952             :                      errmsg("remote query result rowtype does not match "
     953             :                             "the specified FROM clause rowtype")));
     954             : 
     955          26 :         if (ntuples > 0)
     956             :         {
     957             :             AttInMetadata *attinmeta;
     958          26 :             int         nestlevel = -1;
     959             :             Tuplestorestate *tupstore;
     960             :             MemoryContext oldcontext;
     961             :             int         row;
     962             :             char      **values;
     963             : 
     964          26 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
     965             : 
     966             :             /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     967          26 :             if (!is_sql_cmd)
     968          26 :                 nestlevel = applyRemoteGucs(conn);
     969             : 
     970          26 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     971          26 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
     972          26 :             rsinfo->setResult = tupstore;
     973          26 :             rsinfo->setDesc = tupdesc;
     974          26 :             MemoryContextSwitchTo(oldcontext);
     975             : 
     976          26 :             values = (char **) palloc(nfields * sizeof(char *));
     977             : 
     978             :             /* put all tuples into the tuplestore */
     979          98 :             for (row = 0; row < ntuples; row++)
     980             :             {
     981             :                 HeapTuple   tuple;
     982             : 
     983          74 :                 if (!is_sql_cmd)
     984             :                 {
     985             :                     int         i;
     986             : 
     987         276 :                     for (i = 0; i < nfields; i++)
     988             :                     {
     989         202 :                         if (PQgetisnull(res, row, i))
     990           0 :                             values[i] = NULL;
     991             :                         else
     992         202 :                             values[i] = PQgetvalue(res, row, i);
     993             :                     }
     994             :                 }
     995             :                 else
     996             :                 {
     997           0 :                     values[0] = PQcmdStatus(res);
     998             :                 }
     999             : 
    1000             :                 /* build the tuple and put it into the tuplestore. */
    1001          74 :                 tuple = BuildTupleFromCStrings(attinmeta, values);
    1002          72 :                 tuplestore_puttuple(tupstore, tuple);
    1003             :             }
    1004             : 
    1005             :             /* clean up GUC settings, if we changed any */
    1006          24 :             restoreLocalGucs(nestlevel);
    1007             : 
    1008             :             /* clean up and return the tuplestore */
    1009             :             tuplestore_donestoring(tupstore);
    1010             :         }
    1011             :     }
    1012           2 :     PG_FINALLY();
    1013             :     {
    1014             :         /* be sure to release the libpq result */
    1015          26 :         PQclear(res);
    1016             :     }
    1017          26 :     PG_END_TRY();
    1018          24 : }
    1019             : 
    1020             : /*
    1021             :  * Execute the given SQL command and store its results into a tuplestore
    1022             :  * to be returned as the result of the current function.
    1023             :  *
    1024             :  * This is equivalent to PQexec followed by materializeResult, but we make
    1025             :  * use of libpq's single-row mode to avoid accumulating the whole result
    1026             :  * inside libpq before it gets transferred to the tuplestore.
    1027             :  */
    1028             : static void
    1029          42 : materializeQueryResult(FunctionCallInfo fcinfo,
    1030             :                        PGconn *conn,
    1031             :                        const char *conname,
    1032             :                        const char *sql,
    1033             :                        bool fail)
    1034             : {
    1035          42 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1036          42 :     PGresult   *volatile res = NULL;
    1037          42 :     volatile storeInfo sinfo = {0};
    1038             : 
    1039             :     /* prepTuplestoreResult must have been called previously */
    1040             :     Assert(rsinfo->returnMode == SFRM_Materialize);
    1041             : 
    1042          42 :     sinfo.fcinfo = fcinfo;
    1043             : 
    1044          42 :     PG_TRY();
    1045             :     {
    1046             :         /* Create short-lived memory context for data conversions */
    1047          42 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1048             :                                                  "dblink temporary context",
    1049             :                                                  ALLOCSET_DEFAULT_SIZES);
    1050             : 
    1051             :         /* execute query, collecting any tuples into the tuplestore */
    1052          42 :         res = storeQueryResult(&sinfo, conn, sql);
    1053             : 
    1054          42 :         if (!res ||
    1055          42 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1056          42 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1057           4 :         {
    1058             :             /*
    1059             :              * dblink_res_error will clear the passed PGresult, so we need
    1060             :              * this ugly dance to avoid doing so twice during error exit
    1061             :              */
    1062           4 :             PGresult   *res1 = res;
    1063             : 
    1064           4 :             res = NULL;
    1065           4 :             dblink_res_error(conn, conname, res1, fail,
    1066             :                              "while executing query");
    1067             :             /* if fail isn't set, we'll return an empty query result */
    1068             :         }
    1069          38 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1070             :         {
    1071             :             /*
    1072             :              * storeRow didn't get called, so we need to convert the command
    1073             :              * status string to a tuple manually
    1074             :              */
    1075             :             TupleDesc   tupdesc;
    1076             :             AttInMetadata *attinmeta;
    1077             :             Tuplestorestate *tupstore;
    1078             :             HeapTuple   tuple;
    1079             :             char       *values[1];
    1080             :             MemoryContext oldcontext;
    1081             : 
    1082             :             /*
    1083             :              * need a tuple descriptor representing one TEXT column to return
    1084             :              * the command status string as our result tuple
    1085             :              */
    1086           0 :             tupdesc = CreateTemplateTupleDesc(1);
    1087           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1088             :                                TEXTOID, -1, 0);
    1089           0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1090             : 
    1091           0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1092           0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1093           0 :             rsinfo->setResult = tupstore;
    1094           0 :             rsinfo->setDesc = tupdesc;
    1095           0 :             MemoryContextSwitchTo(oldcontext);
    1096             : 
    1097           0 :             values[0] = PQcmdStatus(res);
    1098             : 
    1099             :             /* build the tuple and put it into the tuplestore. */
    1100           0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1101           0 :             tuplestore_puttuple(tupstore, tuple);
    1102             : 
    1103           0 :             PQclear(res);
    1104           0 :             res = NULL;
    1105             :         }
    1106             :         else
    1107             :         {
    1108             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1109             :             /* storeRow should have created a tuplestore */
    1110             :             Assert(rsinfo->setResult != NULL);
    1111             : 
    1112          38 :             PQclear(res);
    1113          38 :             res = NULL;
    1114             :         }
    1115             : 
    1116             :         /* clean up data conversion short-lived memory context */
    1117          42 :         if (sinfo.tmpcontext != NULL)
    1118          42 :             MemoryContextDelete(sinfo.tmpcontext);
    1119          42 :         sinfo.tmpcontext = NULL;
    1120             : 
    1121          42 :         PQclear(sinfo.last_res);
    1122          42 :         sinfo.last_res = NULL;
    1123          42 :         PQclear(sinfo.cur_res);
    1124          42 :         sinfo.cur_res = NULL;
    1125             :     }
    1126           0 :     PG_CATCH();
    1127             :     {
    1128             :         /* be sure to release any libpq result we collected */
    1129           0 :         PQclear(res);
    1130           0 :         PQclear(sinfo.last_res);
    1131           0 :         PQclear(sinfo.cur_res);
    1132             :         /* and clear out any pending data in libpq */
    1133           0 :         while ((res = PQgetResult(conn)) != NULL)
    1134           0 :             PQclear(res);
    1135           0 :         PG_RE_THROW();
    1136             :     }
    1137          42 :     PG_END_TRY();
    1138          42 : }
    1139             : 
    1140             : /*
    1141             :  * Execute query, and send any result rows to sinfo->tuplestore.
    1142             :  */
    1143             : static PGresult *
    1144          42 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
    1145             : {
    1146          42 :     bool        first = true;
    1147          42 :     int         nestlevel = -1;
    1148             :     PGresult   *res;
    1149             : 
    1150          42 :     if (!PQsendQuery(conn, sql))
    1151           0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1152             : 
    1153          42 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1154           0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1155             : 
    1156             :     for (;;)
    1157             :     {
    1158         348 :         CHECK_FOR_INTERRUPTS();
    1159             : 
    1160         348 :         sinfo->cur_res = PQgetResult(conn);
    1161         348 :         if (!sinfo->cur_res)
    1162          42 :             break;
    1163             : 
    1164         306 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1165             :         {
    1166             :             /* got one row from possibly-bigger resultset */
    1167             : 
    1168             :             /*
    1169             :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1170             :              * We shouldn't do this until we have a row in hand, to ensure
    1171             :              * libpq has seen any earlier ParameterStatus protocol messages.
    1172             :              */
    1173         264 :             if (first && nestlevel < 0)
    1174          38 :                 nestlevel = applyRemoteGucs(conn);
    1175             : 
    1176         264 :             storeRow(sinfo, sinfo->cur_res, first);
    1177             : 
    1178         264 :             PQclear(sinfo->cur_res);
    1179         264 :             sinfo->cur_res = NULL;
    1180         264 :             first = false;
    1181             :         }
    1182             :         else
    1183             :         {
    1184             :             /* if empty resultset, fill tuplestore header */
    1185          42 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1186           0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1187             : 
    1188             :             /* store completed result at last_res */
    1189          42 :             PQclear(sinfo->last_res);
    1190          42 :             sinfo->last_res = sinfo->cur_res;
    1191          42 :             sinfo->cur_res = NULL;
    1192          42 :             first = true;
    1193             :         }
    1194             :     }
    1195             : 
    1196             :     /* clean up GUC settings, if we changed any */
    1197          42 :     restoreLocalGucs(nestlevel);
    1198             : 
    1199             :     /* return last_res */
    1200          42 :     res = sinfo->last_res;
    1201          42 :     sinfo->last_res = NULL;
    1202          42 :     return res;
    1203             : }
    1204             : 
    1205             : /*
    1206             :  * Send single row to sinfo->tuplestore.
    1207             :  *
    1208             :  * If "first" is true, create the tuplestore using PGresult's metadata
    1209             :  * (in this case the PGresult might contain either zero or one row).
    1210             :  */
    1211             : static void
    1212         264 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
    1213             : {
    1214         264 :     int         nfields = PQnfields(res);
    1215             :     HeapTuple   tuple;
    1216             :     int         i;
    1217             :     MemoryContext oldcontext;
    1218             : 
    1219         264 :     if (first)
    1220             :     {
    1221             :         /* Prepare for new result set */
    1222          38 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1223             :         TupleDesc   tupdesc;
    1224             : 
    1225             :         /*
    1226             :          * It's possible to get more than one result set if the query string
    1227             :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1228             :          * traditional behavior of throwing away all but the last result.
    1229             :          */
    1230          38 :         if (sinfo->tuplestore)
    1231           0 :             tuplestore_end(sinfo->tuplestore);
    1232          38 :         sinfo->tuplestore = NULL;
    1233             : 
    1234             :         /* get a tuple descriptor for our result type */
    1235          38 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1236             :         {
    1237          38 :             case TYPEFUNC_COMPOSITE:
    1238             :                 /* success */
    1239          38 :                 break;
    1240           0 :             case TYPEFUNC_RECORD:
    1241             :                 /* failed to determine actual type of RECORD */
    1242           0 :                 ereport(ERROR,
    1243             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1244             :                          errmsg("function returning record called in context "
    1245             :                                 "that cannot accept type record")));
    1246             :                 break;
    1247           0 :             default:
    1248             :                 /* result type isn't composite */
    1249           0 :                 elog(ERROR, "return type must be a row type");
    1250             :                 break;
    1251             :         }
    1252             : 
    1253             :         /* make sure we have a persistent copy of the tupdesc */
    1254          38 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1255             : 
    1256             :         /* check result and tuple descriptor have the same number of columns */
    1257          38 :         if (nfields != tupdesc->natts)
    1258           0 :             ereport(ERROR,
    1259             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1260             :                      errmsg("remote query result rowtype does not match "
    1261             :                             "the specified FROM clause rowtype")));
    1262             : 
    1263             :         /* Prepare attinmeta for later data conversions */
    1264          38 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1265             : 
    1266             :         /* Create a new, empty tuplestore */
    1267          38 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1268          38 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1269          38 :         rsinfo->setResult = sinfo->tuplestore;
    1270          38 :         rsinfo->setDesc = tupdesc;
    1271          38 :         MemoryContextSwitchTo(oldcontext);
    1272             : 
    1273             :         /* Done if empty resultset */
    1274          38 :         if (PQntuples(res) == 0)
    1275           0 :             return;
    1276             : 
    1277             :         /*
    1278             :          * Set up sufficiently-wide string pointers array; this won't change
    1279             :          * in size so it's easy to preallocate.
    1280             :          */
    1281          38 :         if (sinfo->cstrs)
    1282           0 :             pfree(sinfo->cstrs);
    1283          38 :         sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
    1284             :     }
    1285             : 
    1286             :     /* Should have a single-row result if we get here */
    1287             :     Assert(PQntuples(res) == 1);
    1288             : 
    1289             :     /*
    1290             :      * Do the following work in a temp context that we reset after each tuple.
    1291             :      * This cleans up not only the data we have direct access to, but any
    1292             :      * cruft the I/O functions might leak.
    1293             :      */
    1294         264 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1295             : 
    1296             :     /*
    1297             :      * Fill cstrs with null-terminated strings of column values.
    1298             :      */
    1299        1020 :     for (i = 0; i < nfields; i++)
    1300             :     {
    1301         756 :         if (PQgetisnull(res, 0, i))
    1302           0 :             sinfo->cstrs[i] = NULL;
    1303             :         else
    1304         756 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1305             :     }
    1306             : 
    1307             :     /* Convert row to a tuple, and add it to the tuplestore */
    1308         264 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1309             : 
    1310         264 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1311             : 
    1312             :     /* Clean up */
    1313         264 :     MemoryContextSwitchTo(oldcontext);
    1314         264 :     MemoryContextReset(sinfo->tmpcontext);
    1315             : }
    1316             : 
    1317             : /*
    1318             :  * List all open dblink connections by name.
    1319             :  * Returns an array of all connection names.
    1320             :  * Takes no params
    1321             :  */
    1322           4 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1323             : Datum
    1324           2 : dblink_get_connections(PG_FUNCTION_ARGS)
    1325             : {
    1326             :     HASH_SEQ_STATUS status;
    1327             :     remoteConnHashEnt *hentry;
    1328           2 :     ArrayBuildState *astate = NULL;
    1329             : 
    1330           2 :     if (remoteConnHash)
    1331             :     {
    1332           2 :         hash_seq_init(&status, remoteConnHash);
    1333           8 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1334             :         {
    1335             :             /* stash away current value */
    1336           6 :             astate = accumArrayResult(astate,
    1337           6 :                                       CStringGetTextDatum(hentry->name),
    1338             :                                       false, TEXTOID, CurrentMemoryContext);
    1339             :         }
    1340             :     }
    1341             : 
    1342           2 :     if (astate)
    1343           2 :         PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
    1344             :                                               CurrentMemoryContext));
    1345             :     else
    1346           0 :         PG_RETURN_NULL();
    1347             : }
    1348             : 
    1349             : /*
    1350             :  * Checks if a given remote connection is busy
    1351             :  *
    1352             :  * Returns 1 if the connection is busy, 0 otherwise
    1353             :  * Params:
    1354             :  *  text connection_name - name of the connection to check
    1355             :  *
    1356             :  */
    1357           4 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1358             : Datum
    1359           2 : dblink_is_busy(PG_FUNCTION_ARGS)
    1360             : {
    1361             :     PGconn     *conn;
    1362             : 
    1363           2 :     dblink_init();
    1364           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1365             : 
    1366           2 :     PQconsumeInput(conn);
    1367           2 :     PG_RETURN_INT32(PQisBusy(conn));
    1368             : }
    1369             : 
    1370             : /*
    1371             :  * Cancels a running request on a connection
    1372             :  *
    1373             :  * Returns text:
    1374             :  *  "OK" if the cancel request has been sent correctly,
    1375             :  *      an error message otherwise
    1376             :  *
    1377             :  * Params:
    1378             :  *  text connection_name - name of the connection to check
    1379             :  *
    1380             :  */
    1381           4 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1382             : Datum
    1383           2 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1384             : {
    1385             :     int         res;
    1386             :     PGconn     *conn;
    1387             :     PGcancel   *cancel;
    1388             :     char        errbuf[256];
    1389             : 
    1390           2 :     dblink_init();
    1391           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1392           2 :     cancel = PQgetCancel(conn);
    1393             : 
    1394           2 :     res = PQcancel(cancel, errbuf, 256);
    1395           2 :     PQfreeCancel(cancel);
    1396             : 
    1397           2 :     if (res == 1)
    1398           2 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1399             :     else
    1400           0 :         PG_RETURN_TEXT_P(cstring_to_text(errbuf));
    1401             : }
    1402             : 
    1403             : 
    1404             : /*
    1405             :  * Get error message from a connection
    1406             :  *
    1407             :  * Returns text:
    1408             :  *  "OK" if no error, an error message otherwise
    1409             :  *
    1410             :  * Params:
    1411             :  *  text connection_name - name of the connection to check
    1412             :  *
    1413             :  */
    1414           4 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1415             : Datum
    1416           2 : dblink_error_message(PG_FUNCTION_ARGS)
    1417             : {
    1418             :     char       *msg;
    1419             :     PGconn     *conn;
    1420             : 
    1421           2 :     dblink_init();
    1422           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1423             : 
    1424           2 :     msg = PQerrorMessage(conn);
    1425           2 :     if (msg == NULL || msg[0] == '\0')
    1426           2 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1427             :     else
    1428           0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1429             : }
    1430             : 
    1431             : /*
    1432             :  * Execute an SQL non-SELECT command
    1433             :  */
    1434          18 : PG_FUNCTION_INFO_V1(dblink_exec);
    1435             : Datum
    1436          52 : dblink_exec(PG_FUNCTION_ARGS)
    1437             : {
    1438          52 :     text       *volatile sql_cmd_status = NULL;
    1439          52 :     PGconn     *volatile conn = NULL;
    1440          52 :     volatile bool freeconn = false;
    1441             : 
    1442          52 :     dblink_init();
    1443             : 
    1444          52 :     PG_TRY();
    1445             :     {
    1446          52 :         PGresult   *res = NULL;
    1447          52 :         char       *sql = NULL;
    1448          52 :         char       *conname = NULL;
    1449          52 :         bool        fail = true;    /* default to backward compatible behavior */
    1450             : 
    1451          52 :         if (PG_NARGS() == 3)
    1452             :         {
    1453             :             /* must be text,text,bool */
    1454           0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1455           0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1456           0 :             fail = PG_GETARG_BOOL(2);
    1457           0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1458             :         }
    1459          52 :         else if (PG_NARGS() == 2)
    1460             :         {
    1461             :             /* might be text,text or text,bool */
    1462          34 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1463             :             {
    1464           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1465           2 :                 fail = PG_GETARG_BOOL(1);
    1466           2 :                 conn = pconn->conn;
    1467             :             }
    1468             :             else
    1469             :             {
    1470          32 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1471          32 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1472          32 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1473             :             }
    1474             :         }
    1475          18 :         else if (PG_NARGS() == 1)
    1476             :         {
    1477             :             /* must be single text argument */
    1478          18 :             conn = pconn->conn;
    1479          18 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1480             :         }
    1481             :         else
    1482             :             /* shouldn't happen */
    1483           0 :             elog(ERROR, "wrong number of arguments");
    1484             : 
    1485          52 :         if (!conn)
    1486           0 :             dblink_conn_not_avail(conname);
    1487             : 
    1488          52 :         res = PQexec(conn, sql);
    1489          52 :         if (!res ||
    1490          52 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1491           4 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1492             :         {
    1493           4 :             dblink_res_error(conn, conname, res, fail,
    1494             :                              "while executing command");
    1495             : 
    1496             :             /*
    1497             :              * and save a copy of the command status string to return as our
    1498             :              * result tuple
    1499             :              */
    1500           2 :             sql_cmd_status = cstring_to_text("ERROR");
    1501             :         }
    1502          48 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1503             :         {
    1504             :             /*
    1505             :              * and save a copy of the command status string to return as our
    1506             :              * result tuple
    1507             :              */
    1508          48 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1509          48 :             PQclear(res);
    1510             :         }
    1511             :         else
    1512             :         {
    1513           0 :             PQclear(res);
    1514           0 :             ereport(ERROR,
    1515             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1516             :                      errmsg("statement returning results not allowed")));
    1517             :         }
    1518             :     }
    1519           2 :     PG_FINALLY();
    1520             :     {
    1521             :         /* if needed, close the connection to the database */
    1522          52 :         if (freeconn)
    1523             :         {
    1524           2 :             PQfinish(conn);
    1525           2 :             ReleaseExternalFD();
    1526             :         }
    1527             :     }
    1528          52 :     PG_END_TRY();
    1529             : 
    1530          50 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1531             : }
    1532             : 
    1533             : 
    1534             : /*
    1535             :  * dblink_get_pkey
    1536             :  *
    1537             :  * Return list of primary key fields for the supplied relation,
    1538             :  * or NULL if none exists.
    1539             :  */
    1540           4 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1541             : Datum
    1542          18 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1543             : {
    1544             :     int16       indnkeyatts;
    1545             :     char      **results;
    1546             :     FuncCallContext *funcctx;
    1547             :     int32       call_cntr;
    1548             :     int32       max_calls;
    1549             :     AttInMetadata *attinmeta;
    1550             :     MemoryContext oldcontext;
    1551             : 
    1552             :     /* stuff done only on the first call of the function */
    1553          18 :     if (SRF_IS_FIRSTCALL())
    1554             :     {
    1555             :         Relation    rel;
    1556             :         TupleDesc   tupdesc;
    1557             : 
    1558             :         /* create a function context for cross-call persistence */
    1559           6 :         funcctx = SRF_FIRSTCALL_INIT();
    1560             : 
    1561             :         /*
    1562             :          * switch to memory context appropriate for multiple function calls
    1563             :          */
    1564           6 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1565             : 
    1566             :         /* open target relation */
    1567           6 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1568             : 
    1569             :         /* get the array of attnums */
    1570           6 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1571             : 
    1572           6 :         relation_close(rel, AccessShareLock);
    1573             : 
    1574             :         /*
    1575             :          * need a tuple descriptor representing one INT and one TEXT column
    1576             :          */
    1577           6 :         tupdesc = CreateTemplateTupleDesc(2);
    1578           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1579             :                            INT4OID, -1, 0);
    1580           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1581             :                            TEXTOID, -1, 0);
    1582             : 
    1583             :         /*
    1584             :          * Generate attribute metadata needed later to produce tuples from raw
    1585             :          * C strings
    1586             :          */
    1587           6 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1588           6 :         funcctx->attinmeta = attinmeta;
    1589             : 
    1590           6 :         if ((results != NULL) && (indnkeyatts > 0))
    1591             :         {
    1592           6 :             funcctx->max_calls = indnkeyatts;
    1593             : 
    1594             :             /* got results, keep track of them */
    1595           6 :             funcctx->user_fctx = results;
    1596             :         }
    1597             :         else
    1598             :         {
    1599             :             /* fast track when no results */
    1600           0 :             MemoryContextSwitchTo(oldcontext);
    1601           0 :             SRF_RETURN_DONE(funcctx);
    1602             :         }
    1603             : 
    1604           6 :         MemoryContextSwitchTo(oldcontext);
    1605             :     }
    1606             : 
    1607             :     /* stuff done on every call of the function */
    1608          18 :     funcctx = SRF_PERCALL_SETUP();
    1609             : 
    1610             :     /*
    1611             :      * initialize per-call variables
    1612             :      */
    1613          18 :     call_cntr = funcctx->call_cntr;
    1614          18 :     max_calls = funcctx->max_calls;
    1615             : 
    1616          18 :     results = (char **) funcctx->user_fctx;
    1617          18 :     attinmeta = funcctx->attinmeta;
    1618             : 
    1619          18 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1620             :     {
    1621             :         char      **values;
    1622             :         HeapTuple   tuple;
    1623             :         Datum       result;
    1624             : 
    1625          12 :         values = (char **) palloc(2 * sizeof(char *));
    1626          12 :         values[0] = psprintf("%d", call_cntr + 1);
    1627          12 :         values[1] = results[call_cntr];
    1628             : 
    1629             :         /* build the tuple */
    1630          12 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1631             : 
    1632             :         /* make the tuple into a datum */
    1633          12 :         result = HeapTupleGetDatum(tuple);
    1634             : 
    1635          12 :         SRF_RETURN_NEXT(funcctx, result);
    1636             :     }
    1637             :     else
    1638             :     {
    1639             :         /* do when there is no more left */
    1640           6 :         SRF_RETURN_DONE(funcctx);
    1641             :     }
    1642             : }
    1643             : 
    1644             : 
    1645             : /*
    1646             :  * dblink_build_sql_insert
    1647             :  *
    1648             :  * Used to generate an SQL insert statement
    1649             :  * based on an existing tuple in a local relation.
    1650             :  * This is useful for selectively replicating data
    1651             :  * to another server via dblink.
    1652             :  *
    1653             :  * API:
    1654             :  * <relname> - name of local table of interest
    1655             :  * <pkattnums> - an int2vector of attnums which will be used
    1656             :  * to identify the local tuple of interest
    1657             :  * <pknumatts> - number of attnums in pkattnums
    1658             :  * <src_pkattvals_arry> - text array of key values which will be used
    1659             :  * to identify the local tuple of interest
    1660             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1661             :  * to build the string for execution remotely. These are substituted
    1662             :  * for their counterparts in src_pkattvals_arry
    1663             :  */
    1664           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1665             : Datum
    1666          12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1667             : {
    1668          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1669          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1670          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1671          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1672          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1673             :     Relation    rel;
    1674             :     int        *pkattnums;
    1675             :     int         pknumatts;
    1676             :     char      **src_pkattvals;
    1677             :     char      **tgt_pkattvals;
    1678             :     int         src_nitems;
    1679             :     int         tgt_nitems;
    1680             :     char       *sql;
    1681             : 
    1682             :     /*
    1683             :      * Open target relation.
    1684             :      */
    1685          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1686             : 
    1687             :     /*
    1688             :      * Process pkattnums argument.
    1689             :      */
    1690          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1691             :                        &pkattnums, &pknumatts);
    1692             : 
    1693             :     /*
    1694             :      * Source array is made up of key values that will be used to locate the
    1695             :      * tuple of interest from the local system.
    1696             :      */
    1697           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1698             : 
    1699             :     /*
    1700             :      * There should be one source array key value for each key attnum
    1701             :      */
    1702           8 :     if (src_nitems != pknumatts)
    1703           0 :         ereport(ERROR,
    1704             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1705             :                  errmsg("source key array length must match number of key attributes")));
    1706             : 
    1707             :     /*
    1708             :      * Target array is made up of key values that will be used to build the
    1709             :      * SQL string for use on the remote system.
    1710             :      */
    1711           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1712             : 
    1713             :     /*
    1714             :      * There should be one target array key value for each key attnum
    1715             :      */
    1716           8 :     if (tgt_nitems != pknumatts)
    1717           0 :         ereport(ERROR,
    1718             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1719             :                  errmsg("target key array length must match number of key attributes")));
    1720             : 
    1721             :     /*
    1722             :      * Prep work is finally done. Go get the SQL string.
    1723             :      */
    1724           8 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1725             : 
    1726             :     /*
    1727             :      * Now we can close the relation.
    1728             :      */
    1729           8 :     relation_close(rel, AccessShareLock);
    1730             : 
    1731             :     /*
    1732             :      * And send it
    1733             :      */
    1734           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1735             : }
    1736             : 
    1737             : 
    1738             : /*
    1739             :  * dblink_build_sql_delete
    1740             :  *
    1741             :  * Used to generate an SQL delete statement.
    1742             :  * This is useful for selectively replicating a
    1743             :  * delete to another server via dblink.
    1744             :  *
    1745             :  * API:
    1746             :  * <relname> - name of remote table of interest
    1747             :  * <pkattnums> - an int2vector of attnums which will be used
    1748             :  * to identify the remote tuple of interest
    1749             :  * <pknumatts> - number of attnums in pkattnums
    1750             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1751             :  * to build the string for execution remotely.
    1752             :  */
    1753           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1754             : Datum
    1755          12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1756             : {
    1757          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1758          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1759          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1760          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1761             :     Relation    rel;
    1762             :     int        *pkattnums;
    1763             :     int         pknumatts;
    1764             :     char      **tgt_pkattvals;
    1765             :     int         tgt_nitems;
    1766             :     char       *sql;
    1767             : 
    1768             :     /*
    1769             :      * Open target relation.
    1770             :      */
    1771          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1772             : 
    1773             :     /*
    1774             :      * Process pkattnums argument.
    1775             :      */
    1776          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1777             :                        &pkattnums, &pknumatts);
    1778             : 
    1779             :     /*
    1780             :      * Target array is made up of key values that will be used to build the
    1781             :      * SQL string for use on the remote system.
    1782             :      */
    1783           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1784             : 
    1785             :     /*
    1786             :      * There should be one target array key value for each key attnum
    1787             :      */
    1788           8 :     if (tgt_nitems != pknumatts)
    1789           0 :         ereport(ERROR,
    1790             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1791             :                  errmsg("target key array length must match number of key attributes")));
    1792             : 
    1793             :     /*
    1794             :      * Prep work is finally done. Go get the SQL string.
    1795             :      */
    1796           8 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1797             : 
    1798             :     /*
    1799             :      * Now we can close the relation.
    1800             :      */
    1801           8 :     relation_close(rel, AccessShareLock);
    1802             : 
    1803             :     /*
    1804             :      * And send it
    1805             :      */
    1806           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1807             : }
    1808             : 
    1809             : 
    1810             : /*
    1811             :  * dblink_build_sql_update
    1812             :  *
    1813             :  * Used to generate an SQL update statement
    1814             :  * based on an existing tuple in a local relation.
    1815             :  * This is useful for selectively replicating data
    1816             :  * to another server via dblink.
    1817             :  *
    1818             :  * API:
    1819             :  * <relname> - name of local table of interest
    1820             :  * <pkattnums> - an int2vector of attnums which will be used
    1821             :  * to identify the local tuple of interest
    1822             :  * <pknumatts> - number of attnums in pkattnums
    1823             :  * <src_pkattvals_arry> - text array of key values which will be used
    1824             :  * to identify the local tuple of interest
    1825             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1826             :  * to build the string for execution remotely. These are substituted
    1827             :  * for their counterparts in src_pkattvals_arry
    1828             :  */
    1829           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1830             : Datum
    1831          12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1832             : {
    1833          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1834          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1835          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1836          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1837          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1838             :     Relation    rel;
    1839             :     int        *pkattnums;
    1840             :     int         pknumatts;
    1841             :     char      **src_pkattvals;
    1842             :     char      **tgt_pkattvals;
    1843             :     int         src_nitems;
    1844             :     int         tgt_nitems;
    1845             :     char       *sql;
    1846             : 
    1847             :     /*
    1848             :      * Open target relation.
    1849             :      */
    1850          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1851             : 
    1852             :     /*
    1853             :      * Process pkattnums argument.
    1854             :      */
    1855          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1856             :                        &pkattnums, &pknumatts);
    1857             : 
    1858             :     /*
    1859             :      * Source array is made up of key values that will be used to locate the
    1860             :      * tuple of interest from the local system.
    1861             :      */
    1862           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1863             : 
    1864             :     /*
    1865             :      * There should be one source array key value for each key attnum
    1866             :      */
    1867           8 :     if (src_nitems != pknumatts)
    1868           0 :         ereport(ERROR,
    1869             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1870             :                  errmsg("source key array length must match number of key attributes")));
    1871             : 
    1872             :     /*
    1873             :      * Target array is made up of key values that will be used to build the
    1874             :      * SQL string for use on the remote system.
    1875             :      */
    1876           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1877             : 
    1878             :     /*
    1879             :      * There should be one target array key value for each key attnum
    1880             :      */
    1881           8 :     if (tgt_nitems != pknumatts)
    1882           0 :         ereport(ERROR,
    1883             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1884             :                  errmsg("target key array length must match number of key attributes")));
    1885             : 
    1886             :     /*
    1887             :      * Prep work is finally done. Go get the SQL string.
    1888             :      */
    1889           8 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1890             : 
    1891             :     /*
    1892             :      * Now we can close the relation.
    1893             :      */
    1894           8 :     relation_close(rel, AccessShareLock);
    1895             : 
    1896             :     /*
    1897             :      * And send it
    1898             :      */
    1899           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1900             : }
    1901             : 
    1902             : /*
    1903             :  * dblink_current_query
    1904             :  * return the current query string
    1905             :  * to allow its use in (among other things)
    1906             :  * rewrite rules
    1907             :  */
    1908           2 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1909             : Datum
    1910           0 : dblink_current_query(PG_FUNCTION_ARGS)
    1911             : {
    1912             :     /* This is now just an alias for the built-in function current_query() */
    1913           0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1914             : }
    1915             : 
    1916             : /*
    1917             :  * Retrieve async notifications for a connection.
    1918             :  *
    1919             :  * Returns a setof record of notifications, or an empty set if none received.
    1920             :  * Can optionally take a named connection as parameter, but uses the unnamed
    1921             :  * connection per default.
    1922             :  *
    1923             :  */
    1924             : #define DBLINK_NOTIFY_COLS      3
    1925             : 
    1926           6 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1927             : Datum
    1928           4 : dblink_get_notify(PG_FUNCTION_ARGS)
    1929             : {
    1930             :     PGconn     *conn;
    1931             :     PGnotify   *notify;
    1932           4 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1933             :     TupleDesc   tupdesc;
    1934             :     Tuplestorestate *tupstore;
    1935             :     MemoryContext per_query_ctx;
    1936             :     MemoryContext oldcontext;
    1937             : 
    1938           4 :     prepTuplestoreResult(fcinfo);
    1939             : 
    1940           4 :     dblink_init();
    1941           4 :     if (PG_NARGS() == 1)
    1942           0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1943             :     else
    1944           4 :         conn = pconn->conn;
    1945             : 
    1946             :     /* create the tuplestore in per-query memory */
    1947           4 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1948           4 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1949             : 
    1950           4 :     tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS);
    1951           4 :     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
    1952             :                        TEXTOID, -1, 0);
    1953           4 :     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
    1954             :                        INT4OID, -1, 0);
    1955           4 :     TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
    1956             :                        TEXTOID, -1, 0);
    1957             : 
    1958           4 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1959           4 :     rsinfo->setResult = tupstore;
    1960           4 :     rsinfo->setDesc = tupdesc;
    1961             : 
    1962           4 :     MemoryContextSwitchTo(oldcontext);
    1963             : 
    1964           4 :     PQconsumeInput(conn);
    1965           8 :     while ((notify = PQnotifies(conn)) != NULL)
    1966             :     {
    1967             :         Datum       values[DBLINK_NOTIFY_COLS];
    1968             :         bool        nulls[DBLINK_NOTIFY_COLS];
    1969             : 
    1970           4 :         memset(values, 0, sizeof(values));
    1971           4 :         memset(nulls, 0, sizeof(nulls));
    1972             : 
    1973           4 :         if (notify->relname != NULL)
    1974           4 :             values[0] = CStringGetTextDatum(notify->relname);
    1975             :         else
    1976           0 :             nulls[0] = true;
    1977             : 
    1978           4 :         values[1] = Int32GetDatum(notify->be_pid);
    1979             : 
    1980           4 :         if (notify->extra != NULL)
    1981           4 :             values[2] = CStringGetTextDatum(notify->extra);
    1982             :         else
    1983           0 :             nulls[2] = true;
    1984             : 
    1985           4 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1986             : 
    1987           4 :         PQfreemem(notify);
    1988           4 :         PQconsumeInput(conn);
    1989             :     }
    1990             : 
    1991             :     /* clean up and return the tuplestore */
    1992             :     tuplestore_donestoring(tupstore);
    1993             : 
    1994           4 :     return (Datum) 0;
    1995             : }
    1996             : 
    1997             : /*
    1998             :  * Validate the options given to a dblink foreign server or user mapping.
    1999             :  * Raise an error if any option is invalid.
    2000             :  *
    2001             :  * We just check the names of options here, so semantic errors in options,
    2002             :  * such as invalid numeric format, will be detected at the attempt to connect.
    2003             :  */
    2004           6 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    2005             : Datum
    2006          10 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    2007             : {
    2008          10 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    2009          10 :     Oid         context = PG_GETARG_OID(1);
    2010             :     ListCell   *cell;
    2011             : 
    2012             :     static const PQconninfoOption *options = NULL;
    2013             : 
    2014             :     /*
    2015             :      * Get list of valid libpq options.
    2016             :      *
    2017             :      * To avoid unnecessary work, we get the list once and use it throughout
    2018             :      * the lifetime of this backend process.  We don't need to care about
    2019             :      * memory context issues, because PQconndefaults allocates with malloc.
    2020             :      */
    2021          10 :     if (!options)
    2022             :     {
    2023           4 :         options = PQconndefaults();
    2024           4 :         if (!options)           /* assume reason for failure is OOM */
    2025           0 :             ereport(ERROR,
    2026             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2027             :                      errmsg("out of memory"),
    2028             :                      errdetail("Could not get libpq's default connection options.")));
    2029             :     }
    2030             : 
    2031             :     /* Validate each supplied option. */
    2032          16 :     foreach(cell, options_list)
    2033             :     {
    2034          10 :         DefElem    *def = (DefElem *) lfirst(cell);
    2035             : 
    2036          10 :         if (!is_valid_dblink_option(options, def->defname, context))
    2037             :         {
    2038             :             /*
    2039             :              * Unknown option, or invalid option for the context specified, so
    2040             :              * complain about it.  Provide a hint with list of valid options
    2041             :              * for the context.
    2042             :              */
    2043             :             StringInfoData buf;
    2044             :             const PQconninfoOption *opt;
    2045             : 
    2046           4 :             initStringInfo(&buf);
    2047         148 :             for (opt = options; opt->keyword; opt++)
    2048             :             {
    2049         144 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    2050           6 :                     appendStringInfo(&buf, "%s%s",
    2051           6 :                                      (buf.len > 0) ? ", " : "",
    2052             :                                      opt->keyword);
    2053             :             }
    2054           4 :             ereport(ERROR,
    2055             :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    2056             :                      errmsg("invalid option \"%s\"", def->defname),
    2057             :                      buf.len > 0
    2058             :                      ? errhint("Valid options in this context are: %s",
    2059             :                                buf.data)
    2060             :                      : errhint("There are no valid options in this context.")));
    2061             :         }
    2062             :     }
    2063             : 
    2064           6 :     PG_RETURN_VOID();
    2065             : }
    2066             : 
    2067             : 
    2068             : /*************************************************************
    2069             :  * internal functions
    2070             :  */
    2071             : 
    2072             : 
    2073             : /*
    2074             :  * get_pkey_attnames
    2075             :  *
    2076             :  * Get the primary key attnames for the given relation.
    2077             :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2078             :  */
    2079             : static char **
    2080           6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2081             : {
    2082             :     Relation    indexRelation;
    2083             :     ScanKeyData skey;
    2084             :     SysScanDesc scan;
    2085             :     HeapTuple   indexTuple;
    2086             :     int         i;
    2087           6 :     char      **result = NULL;
    2088             :     TupleDesc   tupdesc;
    2089             : 
    2090             :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2091           6 :     *indnkeyatts = 0;
    2092             : 
    2093           6 :     tupdesc = rel->rd_att;
    2094             : 
    2095             :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2096           6 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2097           6 :     ScanKeyInit(&skey,
    2098             :                 Anum_pg_index_indrelid,
    2099             :                 BTEqualStrategyNumber, F_OIDEQ,
    2100           6 :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2101             : 
    2102           6 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2103             :                               NULL, 1, &skey);
    2104             : 
    2105           6 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2106             :     {
    2107           6 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2108             : 
    2109             :         /* we're only interested if it is the primary key */
    2110           6 :         if (index->indisprimary)
    2111             :         {
    2112           6 :             *indnkeyatts = index->indnkeyatts;
    2113           6 :             if (*indnkeyatts > 0)
    2114             :             {
    2115           6 :                 result = (char **) palloc(*indnkeyatts * sizeof(char *));
    2116             : 
    2117          18 :                 for (i = 0; i < *indnkeyatts; i++)
    2118          12 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2119             :             }
    2120           6 :             break;
    2121             :         }
    2122             :     }
    2123             : 
    2124           6 :     systable_endscan(scan);
    2125           6 :     table_close(indexRelation, AccessShareLock);
    2126             : 
    2127           6 :     return result;
    2128             : }
    2129             : 
    2130             : /*
    2131             :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2132             :  * returned as NULL pointers)
    2133             :  */
    2134             : static char **
    2135          40 : get_text_array_contents(ArrayType *array, int *numitems)
    2136             : {
    2137          40 :     int         ndim = ARR_NDIM(array);
    2138          40 :     int        *dims = ARR_DIMS(array);
    2139             :     int         nitems;
    2140             :     int16       typlen;
    2141             :     bool        typbyval;
    2142             :     char        typalign;
    2143             :     char      **values;
    2144             :     char       *ptr;
    2145             :     bits8      *bitmap;
    2146             :     int         bitmask;
    2147             :     int         i;
    2148             : 
    2149             :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2150             : 
    2151          40 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2152             : 
    2153          40 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2154             :                          &typlen, &typbyval, &typalign);
    2155             : 
    2156          40 :     values = (char **) palloc(nitems * sizeof(char *));
    2157             : 
    2158          40 :     ptr = ARR_DATA_PTR(array);
    2159          40 :     bitmap = ARR_NULLBITMAP(array);
    2160          40 :     bitmask = 1;
    2161             : 
    2162         110 :     for (i = 0; i < nitems; i++)
    2163             :     {
    2164          70 :         if (bitmap && (*bitmap & bitmask) == 0)
    2165             :         {
    2166           0 :             values[i] = NULL;
    2167             :         }
    2168             :         else
    2169             :         {
    2170          70 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2171          70 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2172          70 :             ptr = (char *) att_align_nominal(ptr, typalign);
    2173             :         }
    2174             : 
    2175             :         /* advance bitmap pointer if any */
    2176          70 :         if (bitmap)
    2177             :         {
    2178           0 :             bitmask <<= 1;
    2179           0 :             if (bitmask == 0x100)
    2180             :             {
    2181           0 :                 bitmap++;
    2182           0 :                 bitmask = 1;
    2183             :             }
    2184             :         }
    2185             :     }
    2186             : 
    2187          40 :     return values;
    2188             : }
    2189             : 
    2190             : static char *
    2191           8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2192             : {
    2193             :     char       *relname;
    2194             :     HeapTuple   tuple;
    2195             :     TupleDesc   tupdesc;
    2196             :     int         natts;
    2197             :     StringInfoData buf;
    2198             :     char       *val;
    2199             :     int         key;
    2200             :     int         i;
    2201             :     bool        needComma;
    2202             : 
    2203           8 :     initStringInfo(&buf);
    2204             : 
    2205             :     /* get relation name including any needed schema prefix and quoting */
    2206           8 :     relname = generate_relation_name(rel);
    2207             : 
    2208           8 :     tupdesc = rel->rd_att;
    2209           8 :     natts = tupdesc->natts;
    2210             : 
    2211           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2212           8 :     if (!tuple)
    2213           0 :         ereport(ERROR,
    2214             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2215             :                  errmsg("source row not found")));
    2216             : 
    2217           8 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2218             : 
    2219           8 :     needComma = false;
    2220          38 :     for (i = 0; i < natts; i++)
    2221             :     {
    2222          30 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2223             : 
    2224          30 :         if (att->attisdropped)
    2225           4 :             continue;
    2226             : 
    2227          26 :         if (needComma)
    2228          18 :             appendStringInfoChar(&buf, ',');
    2229             : 
    2230          26 :         appendStringInfoString(&buf,
    2231          26 :                                quote_ident_cstr(NameStr(att->attname)));
    2232          26 :         needComma = true;
    2233             :     }
    2234             : 
    2235           8 :     appendStringInfoString(&buf, ") VALUES(");
    2236             : 
    2237             :     /*
    2238             :      * Note: i is physical column number (counting from 0).
    2239             :      */
    2240           8 :     needComma = false;
    2241          38 :     for (i = 0; i < natts; i++)
    2242             :     {
    2243          30 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2244           4 :             continue;
    2245             : 
    2246          26 :         if (needComma)
    2247          18 :             appendStringInfoChar(&buf, ',');
    2248             : 
    2249          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2250             : 
    2251          26 :         if (key >= 0)
    2252          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2253             :         else
    2254          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2255             : 
    2256          26 :         if (val != NULL)
    2257             :         {
    2258          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2259          26 :             pfree(val);
    2260             :         }
    2261             :         else
    2262           0 :             appendStringInfoString(&buf, "NULL");
    2263          26 :         needComma = true;
    2264             :     }
    2265           8 :     appendStringInfoChar(&buf, ')');
    2266             : 
    2267           8 :     return buf.data;
    2268             : }
    2269             : 
    2270             : static char *
    2271           8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2272             : {
    2273             :     char       *relname;
    2274             :     TupleDesc   tupdesc;
    2275             :     StringInfoData buf;
    2276             :     int         i;
    2277             : 
    2278           8 :     initStringInfo(&buf);
    2279             : 
    2280             :     /* get relation name including any needed schema prefix and quoting */
    2281           8 :     relname = generate_relation_name(rel);
    2282             : 
    2283           8 :     tupdesc = rel->rd_att;
    2284             : 
    2285           8 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2286          22 :     for (i = 0; i < pknumatts; i++)
    2287             :     {
    2288          14 :         int         pkattnum = pkattnums[i];
    2289          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2290             : 
    2291          14 :         if (i > 0)
    2292           6 :             appendStringInfoString(&buf, " AND ");
    2293             : 
    2294          14 :         appendStringInfoString(&buf,
    2295          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2296             : 
    2297          14 :         if (tgt_pkattvals[i] != NULL)
    2298          14 :             appendStringInfo(&buf, " = %s",
    2299          14 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2300             :         else
    2301           0 :             appendStringInfoString(&buf, " IS NULL");
    2302             :     }
    2303             : 
    2304           8 :     return buf.data;
    2305             : }
    2306             : 
    2307             : static char *
    2308           8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2309             : {
    2310             :     char       *relname;
    2311             :     HeapTuple   tuple;
    2312             :     TupleDesc   tupdesc;
    2313             :     int         natts;
    2314             :     StringInfoData buf;
    2315             :     char       *val;
    2316             :     int         key;
    2317             :     int         i;
    2318             :     bool        needComma;
    2319             : 
    2320           8 :     initStringInfo(&buf);
    2321             : 
    2322             :     /* get relation name including any needed schema prefix and quoting */
    2323           8 :     relname = generate_relation_name(rel);
    2324             : 
    2325           8 :     tupdesc = rel->rd_att;
    2326           8 :     natts = tupdesc->natts;
    2327             : 
    2328           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2329           8 :     if (!tuple)
    2330           0 :         ereport(ERROR,
    2331             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2332             :                  errmsg("source row not found")));
    2333             : 
    2334           8 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2335             : 
    2336             :     /*
    2337             :      * Note: i is physical column number (counting from 0).
    2338             :      */
    2339           8 :     needComma = false;
    2340          38 :     for (i = 0; i < natts; i++)
    2341             :     {
    2342          30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2343             : 
    2344          30 :         if (attr->attisdropped)
    2345           4 :             continue;
    2346             : 
    2347          26 :         if (needComma)
    2348          18 :             appendStringInfoString(&buf, ", ");
    2349             : 
    2350          26 :         appendStringInfo(&buf, "%s = ",
    2351          26 :                          quote_ident_cstr(NameStr(attr->attname)));
    2352             : 
    2353          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2354             : 
    2355          26 :         if (key >= 0)
    2356          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2357             :         else
    2358          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2359             : 
    2360          26 :         if (val != NULL)
    2361             :         {
    2362          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2363          26 :             pfree(val);
    2364             :         }
    2365             :         else
    2366           0 :             appendStringInfoString(&buf, "NULL");
    2367          26 :         needComma = true;
    2368             :     }
    2369             : 
    2370           8 :     appendStringInfoString(&buf, " WHERE ");
    2371             : 
    2372          22 :     for (i = 0; i < pknumatts; i++)
    2373             :     {
    2374          14 :         int         pkattnum = pkattnums[i];
    2375          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2376             : 
    2377          14 :         if (i > 0)
    2378           6 :             appendStringInfoString(&buf, " AND ");
    2379             : 
    2380          14 :         appendStringInfoString(&buf,
    2381          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2382             : 
    2383          14 :         val = tgt_pkattvals[i];
    2384             : 
    2385          14 :         if (val != NULL)
    2386          14 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2387             :         else
    2388           0 :             appendStringInfoString(&buf, " IS NULL");
    2389             :     }
    2390             : 
    2391           8 :     return buf.data;
    2392             : }
    2393             : 
    2394             : /*
    2395             :  * Return a properly quoted identifier.
    2396             :  * Uses quote_ident in quote.c
    2397             :  */
    2398             : static char *
    2399         160 : quote_ident_cstr(char *rawstr)
    2400             : {
    2401             :     text       *rawstr_text;
    2402             :     text       *result_text;
    2403             :     char       *result;
    2404             : 
    2405         160 :     rawstr_text = cstring_to_text(rawstr);
    2406         160 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2407             :                                                      PointerGetDatum(rawstr_text)));
    2408         160 :     result = text_to_cstring(result_text);
    2409             : 
    2410         160 :     return result;
    2411             : }
    2412             : 
    2413             : static int
    2414          52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2415             : {
    2416             :     int         i;
    2417             : 
    2418             :     /*
    2419             :      * Not likely a long list anyway, so just scan for the value
    2420             :      */
    2421         100 :     for (i = 0; i < pknumatts; i++)
    2422          76 :         if (key == pkattnums[i])
    2423          28 :             return i;
    2424             : 
    2425          24 :     return -1;
    2426             : }
    2427             : 
    2428             : static HeapTuple
    2429          16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2430             : {
    2431             :     char       *relname;
    2432             :     TupleDesc   tupdesc;
    2433             :     int         natts;
    2434             :     StringInfoData buf;
    2435             :     int         ret;
    2436             :     HeapTuple   tuple;
    2437             :     int         i;
    2438             : 
    2439             :     /*
    2440             :      * Connect to SPI manager
    2441             :      */
    2442          16 :     if ((ret = SPI_connect()) < 0)
    2443             :         /* internal error */
    2444           0 :         elog(ERROR, "SPI connect failure - returned %d", ret);
    2445             : 
    2446          16 :     initStringInfo(&buf);
    2447             : 
    2448             :     /* get relation name including any needed schema prefix and quoting */
    2449          16 :     relname = generate_relation_name(rel);
    2450             : 
    2451          16 :     tupdesc = rel->rd_att;
    2452          16 :     natts = tupdesc->natts;
    2453             : 
    2454             :     /*
    2455             :      * Build sql statement to look up tuple of interest, ie, the one matching
    2456             :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2457             :      * generate a result tuple that matches the table's physical structure,
    2458             :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2459             :      * different tupdescs and everything's very confusing.
    2460             :      */
    2461          16 :     appendStringInfoString(&buf, "SELECT ");
    2462             : 
    2463          76 :     for (i = 0; i < natts; i++)
    2464             :     {
    2465          60 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2466             : 
    2467          60 :         if (i > 0)
    2468          44 :             appendStringInfoString(&buf, ", ");
    2469             : 
    2470          60 :         if (attr->attisdropped)
    2471           8 :             appendStringInfoString(&buf, "NULL");
    2472             :         else
    2473          52 :             appendStringInfoString(&buf,
    2474          52 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2475             :     }
    2476             : 
    2477          16 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2478             : 
    2479          44 :     for (i = 0; i < pknumatts; i++)
    2480             :     {
    2481          28 :         int         pkattnum = pkattnums[i];
    2482          28 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2483             : 
    2484          28 :         if (i > 0)
    2485          12 :             appendStringInfoString(&buf, " AND ");
    2486             : 
    2487          28 :         appendStringInfoString(&buf,
    2488          28 :                                quote_ident_cstr(NameStr(attr->attname)));
    2489             : 
    2490          28 :         if (src_pkattvals[i] != NULL)
    2491          28 :             appendStringInfo(&buf, " = %s",
    2492          28 :                              quote_literal_cstr(src_pkattvals[i]));
    2493             :         else
    2494           0 :             appendStringInfoString(&buf, " IS NULL");
    2495             :     }
    2496             : 
    2497             :     /*
    2498             :      * Retrieve the desired tuple
    2499             :      */
    2500          16 :     ret = SPI_exec(buf.data, 0);
    2501          16 :     pfree(buf.data);
    2502             : 
    2503             :     /*
    2504             :      * Only allow one qualifying tuple
    2505             :      */
    2506          16 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2507           0 :         ereport(ERROR,
    2508             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2509             :                  errmsg("source criteria matched more than one record")));
    2510             : 
    2511          16 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2512             :     {
    2513          16 :         SPITupleTable *tuptable = SPI_tuptable;
    2514             : 
    2515          16 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2516          16 :         SPI_finish();
    2517             : 
    2518          16 :         return tuple;
    2519             :     }
    2520             :     else
    2521             :     {
    2522             :         /*
    2523             :          * no qualifying tuples
    2524             :          */
    2525           0 :         SPI_finish();
    2526             : 
    2527           0 :         return NULL;
    2528             :     }
    2529             : 
    2530             :     /*
    2531             :      * never reached, but keep compiler quiet
    2532             :      */
    2533             :     return NULL;
    2534             : }
    2535             : 
    2536             : /*
    2537             :  * Open the relation named by relname_text, acquire specified type of lock,
    2538             :  * verify we have specified permissions.
    2539             :  * Caller must close rel when done with it.
    2540             :  */
    2541             : static Relation
    2542          42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2543             : {
    2544             :     RangeVar   *relvar;
    2545             :     Relation    rel;
    2546             :     AclResult   aclresult;
    2547             : 
    2548          42 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2549          42 :     rel = table_openrv(relvar, lockmode);
    2550             : 
    2551          42 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    2552             :                                   aclmode);
    2553          42 :     if (aclresult != ACLCHECK_OK)
    2554           0 :         aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
    2555           0 :                        RelationGetRelationName(rel));
    2556             : 
    2557          42 :     return rel;
    2558             : }
    2559             : 
    2560             : /*
    2561             :  * generate_relation_name - copied from ruleutils.c
    2562             :  *      Compute the name to display for a relation
    2563             :  *
    2564             :  * The result includes all necessary quoting and schema-prefixing.
    2565             :  */
    2566             : static char *
    2567          40 : generate_relation_name(Relation rel)
    2568             : {
    2569             :     char       *nspname;
    2570             :     char       *result;
    2571             : 
    2572             :     /* Qualify the name if not visible in search path */
    2573          40 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2574          30 :         nspname = NULL;
    2575             :     else
    2576          10 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2577             : 
    2578          40 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2579             : 
    2580          40 :     return result;
    2581             : }
    2582             : 
    2583             : 
    2584             : static remoteConn *
    2585         150 : getConnectionByName(const char *name)
    2586             : {
    2587             :     remoteConnHashEnt *hentry;
    2588             :     char       *key;
    2589             : 
    2590         150 :     if (!remoteConnHash)
    2591           4 :         remoteConnHash = createConnHash();
    2592             : 
    2593         150 :     key = pstrdup(name);
    2594         150 :     truncate_identifier(key, strlen(key), false);
    2595         150 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2596             :                                                key, HASH_FIND, NULL);
    2597             : 
    2598         150 :     if (hentry)
    2599         136 :         return hentry->rconn;
    2600             : 
    2601          14 :     return NULL;
    2602             : }
    2603             : 
    2604             : static HTAB *
    2605           6 : createConnHash(void)
    2606             : {
    2607             :     HASHCTL     ctl;
    2608             : 
    2609           6 :     ctl.keysize = NAMEDATALEN;
    2610           6 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2611             : 
    2612           6 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2613             :                        HASH_ELEM | HASH_STRINGS);
    2614             : }
    2615             : 
    2616             : static void
    2617          20 : createNewConnection(const char *name, remoteConn *rconn)
    2618             : {
    2619             :     remoteConnHashEnt *hentry;
    2620             :     bool        found;
    2621             :     char       *key;
    2622             : 
    2623          20 :     if (!remoteConnHash)
    2624           2 :         remoteConnHash = createConnHash();
    2625             : 
    2626          20 :     key = pstrdup(name);
    2627          20 :     truncate_identifier(key, strlen(key), true);
    2628          20 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2629             :                                                HASH_ENTER, &found);
    2630             : 
    2631          20 :     if (found)
    2632             :     {
    2633           2 :         PQfinish(rconn->conn);
    2634           2 :         ReleaseExternalFD();
    2635           2 :         pfree(rconn);
    2636             : 
    2637           2 :         ereport(ERROR,
    2638             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2639             :                  errmsg("duplicate connection name")));
    2640             :     }
    2641             : 
    2642          18 :     hentry->rconn = rconn;
    2643          18 :     strlcpy(hentry->name, name, sizeof(hentry->name));
    2644          18 : }
    2645             : 
    2646             : static void
    2647          16 : deleteConnection(const char *name)
    2648             : {
    2649             :     remoteConnHashEnt *hentry;
    2650             :     bool        found;
    2651             :     char       *key;
    2652             : 
    2653          16 :     if (!remoteConnHash)
    2654           0 :         remoteConnHash = createConnHash();
    2655             : 
    2656          16 :     key = pstrdup(name);
    2657          16 :     truncate_identifier(key, strlen(key), false);
    2658          16 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2659             :                                                key, HASH_REMOVE, &found);
    2660             : 
    2661          16 :     if (!hentry)
    2662           0 :         ereport(ERROR,
    2663             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2664             :                  errmsg("undefined connection name")));
    2665             : 
    2666          16 : }
    2667             : 
    2668             : static void
    2669          34 : dblink_security_check(PGconn *conn, remoteConn *rconn)
    2670             : {
    2671          34 :     if (!superuser())
    2672             :     {
    2673           0 :         if (!PQconnectionUsedPassword(conn))
    2674             :         {
    2675           0 :             PQfinish(conn);
    2676           0 :             ReleaseExternalFD();
    2677           0 :             if (rconn)
    2678           0 :                 pfree(rconn);
    2679             : 
    2680           0 :             ereport(ERROR,
    2681             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2682             :                      errmsg("password is required"),
    2683             :                      errdetail("Non-superuser cannot connect if the server does not request a password."),
    2684             :                      errhint("Target server's authentication method must be changed.")));
    2685             :         }
    2686             :     }
    2687          34 : }
    2688             : 
    2689             : /*
    2690             :  * For non-superusers, insist that the connstr specify a password.  This
    2691             :  * prevents a password from being picked up from .pgpass, a service file,
    2692             :  * the environment, etc.  We don't want the postgres user's passwords
    2693             :  * to be accessible to non-superusers.
    2694             :  */
    2695             : static void
    2696          40 : dblink_connstr_check(const char *connstr)
    2697             : {
    2698          40 :     if (!superuser())
    2699             :     {
    2700             :         PQconninfoOption *options;
    2701             :         PQconninfoOption *option;
    2702           2 :         bool        connstr_gives_password = false;
    2703             : 
    2704           2 :         options = PQconninfoParse(connstr, NULL);
    2705           2 :         if (options)
    2706             :         {
    2707          74 :             for (option = options; option->keyword != NULL; option++)
    2708             :             {
    2709          72 :                 if (strcmp(option->keyword, "password") == 0)
    2710             :                 {
    2711           2 :                     if (option->val != NULL && option->val[0] != '\0')
    2712             :                     {
    2713           0 :                         connstr_gives_password = true;
    2714           0 :                         break;
    2715             :                     }
    2716             :                 }
    2717             :             }
    2718           2 :             PQconninfoFree(options);
    2719             :         }
    2720             : 
    2721           2 :         if (!connstr_gives_password)
    2722           2 :             ereport(ERROR,
    2723             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2724             :                      errmsg("password is required"),
    2725             :                      errdetail("Non-superusers must provide a password in the connection string.")));
    2726             :     }
    2727          38 : }
    2728             : 
    2729             : /*
    2730             :  * Report an error received from the remote server
    2731             :  *
    2732             :  * res: the received error result (will be freed)
    2733             :  * fail: true for ERROR ereport, false for NOTICE
    2734             :  * fmt and following args: sprintf-style format and values for errcontext;
    2735             :  * the resulting string should be worded like "while <some action>"
    2736             :  */
    2737             : static void
    2738          24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2739             :                  bool fail, const char *fmt,...)
    2740             : {
    2741             :     int         level;
    2742          24 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2743          24 :     char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2744          24 :     char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2745          24 :     char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2746          24 :     char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2747             :     int         sqlstate;
    2748             :     char       *message_primary;
    2749             :     char       *message_detail;
    2750             :     char       *message_hint;
    2751             :     char       *message_context;
    2752             :     va_list     ap;
    2753             :     char        dblink_context_msg[512];
    2754             : 
    2755          24 :     if (fail)
    2756           6 :         level = ERROR;
    2757             :     else
    2758          18 :         level = NOTICE;
    2759             : 
    2760          24 :     if (pg_diag_sqlstate)
    2761          24 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2762             :                                  pg_diag_sqlstate[1],
    2763             :                                  pg_diag_sqlstate[2],
    2764             :                                  pg_diag_sqlstate[3],
    2765             :                                  pg_diag_sqlstate[4]);
    2766             :     else
    2767           0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2768             : 
    2769          24 :     message_primary = xpstrdup(pg_diag_message_primary);
    2770          24 :     message_detail = xpstrdup(pg_diag_message_detail);
    2771          24 :     message_hint = xpstrdup(pg_diag_message_hint);
    2772          24 :     message_context = xpstrdup(pg_diag_context);
    2773             : 
    2774             :     /*
    2775             :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2776             :      * needed because for connection-level failures, PQexec may just return
    2777             :      * NULL, not a PGresult at all.
    2778             :      */
    2779          24 :     if (message_primary == NULL)
    2780           0 :         message_primary = pchomp(PQerrorMessage(conn));
    2781             : 
    2782             :     /*
    2783             :      * Now that we've copied all the data we need out of the PGresult, it's
    2784             :      * safe to free it.  We must do this to avoid PGresult leakage.  We're
    2785             :      * leaking all the strings too, but those are in palloc'd memory that will
    2786             :      * get cleaned up eventually.
    2787             :      */
    2788          24 :     if (res)
    2789          24 :         PQclear(res);
    2790             : 
    2791             :     /*
    2792             :      * Format the basic errcontext string.  Below, we'll add on something
    2793             :      * about the connection name.  That's a violation of the translatability
    2794             :      * guidelines about constructing error messages out of parts, but since
    2795             :      * there's no translation support for dblink, there's no need to worry
    2796             :      * about that (yet).
    2797             :      */
    2798          24 :     va_start(ap, fmt);
    2799          24 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2800          24 :     va_end(ap);
    2801             : 
    2802          24 :     ereport(level,
    2803             :             (errcode(sqlstate),
    2804             :              message_primary ? errmsg_internal("%s", message_primary) :
    2805             :              errmsg("could not obtain message string for remote error"),
    2806             :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2807             :              message_hint ? errhint("%s", message_hint) : 0,
    2808             :              message_context ? (errcontext("%s", message_context)) : 0,
    2809             :              conname ?
    2810             :              (errcontext("%s on dblink connection named \"%s\"",
    2811             :                          dblink_context_msg, conname)) :
    2812             :              (errcontext("%s on unnamed dblink connection",
    2813             :                          dblink_context_msg))));
    2814          18 : }
    2815             : 
    2816             : /*
    2817             :  * Obtain connection string for a foreign server
    2818             :  */
    2819             : static char *
    2820          40 : get_connect_string(const char *servername)
    2821             : {
    2822          40 :     ForeignServer *foreign_server = NULL;
    2823             :     UserMapping *user_mapping;
    2824             :     ListCell   *cell;
    2825             :     StringInfoData buf;
    2826             :     ForeignDataWrapper *fdw;
    2827             :     AclResult   aclresult;
    2828             :     char       *srvname;
    2829             : 
    2830             :     static const PQconninfoOption *options = NULL;
    2831             : 
    2832          40 :     initStringInfo(&buf);
    2833             : 
    2834             :     /*
    2835             :      * Get list of valid libpq options.
    2836             :      *
    2837             :      * To avoid unnecessary work, we get the list once and use it throughout
    2838             :      * the lifetime of this backend process.  We don't need to care about
    2839             :      * memory context issues, because PQconndefaults allocates with malloc.
    2840             :      */
    2841          40 :     if (!options)
    2842             :     {
    2843           6 :         options = PQconndefaults();
    2844           6 :         if (!options)           /* assume reason for failure is OOM */
    2845           0 :             ereport(ERROR,
    2846             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2847             :                      errmsg("out of memory"),
    2848             :                      errdetail("Could not get libpq's default connection options.")));
    2849             :     }
    2850             : 
    2851             :     /* first gather the server connstr options */
    2852          40 :     srvname = pstrdup(servername);
    2853          40 :     truncate_identifier(srvname, strlen(srvname), false);
    2854          40 :     foreign_server = GetForeignServerByName(srvname, true);
    2855             : 
    2856          40 :     if (foreign_server)
    2857             :     {
    2858           4 :         Oid         serverid = foreign_server->serverid;
    2859           4 :         Oid         fdwid = foreign_server->fdwid;
    2860           4 :         Oid         userid = GetUserId();
    2861             : 
    2862           4 :         user_mapping = GetUserMapping(userid, serverid);
    2863           4 :         fdw = GetForeignDataWrapper(fdwid);
    2864             : 
    2865             :         /* Check permissions, user must have usage on the server. */
    2866           4 :         aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
    2867           4 :         if (aclresult != ACLCHECK_OK)
    2868           0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2869             : 
    2870           4 :         foreach(cell, fdw->options)
    2871             :         {
    2872           0 :             DefElem    *def = lfirst(cell);
    2873             : 
    2874           0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2875           0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2876           0 :                                  escape_param_str(strVal(def->arg)));
    2877             :         }
    2878             : 
    2879          12 :         foreach(cell, foreign_server->options)
    2880             :         {
    2881           8 :             DefElem    *def = lfirst(cell);
    2882             : 
    2883           8 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2884           8 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2885           8 :                                  escape_param_str(strVal(def->arg)));
    2886             :         }
    2887             : 
    2888           8 :         foreach(cell, user_mapping->options)
    2889             :         {
    2890             : 
    2891           4 :             DefElem    *def = lfirst(cell);
    2892             : 
    2893           4 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2894           4 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2895           4 :                                  escape_param_str(strVal(def->arg)));
    2896             :         }
    2897             : 
    2898           4 :         return buf.data;
    2899             :     }
    2900             :     else
    2901          36 :         return NULL;
    2902             : }
    2903             : 
    2904             : /*
    2905             :  * Escaping libpq connect parameter strings.
    2906             :  *
    2907             :  * Replaces "'" with "\'" and "\" with "\\".
    2908             :  */
    2909             : static char *
    2910          12 : escape_param_str(const char *str)
    2911             : {
    2912             :     const char *cp;
    2913             :     StringInfoData buf;
    2914             : 
    2915          12 :     initStringInfo(&buf);
    2916             : 
    2917         136 :     for (cp = str; *cp; cp++)
    2918             :     {
    2919         124 :         if (*cp == '\\' || *cp == '\'')
    2920           0 :             appendStringInfoChar(&buf, '\\');
    2921         124 :         appendStringInfoChar(&buf, *cp);
    2922             :     }
    2923             : 
    2924          12 :     return buf.data;
    2925             : }
    2926             : 
    2927             : /*
    2928             :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2929             :  * functions, and translate to the internal representation.
    2930             :  *
    2931             :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2932             :  * argument (the need for the separate count argument is historical, but we
    2933             :  * still check it).  We check that each attnum corresponds to a valid,
    2934             :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2935             :  * listed twice, though the actual use-case for such things is dubious.
    2936             :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2937             :  * physical not logical column numbers; this was changed for future-proofing.
    2938             :  *
    2939             :  * The internal representation is a palloc'd int array of 0-based physical
    2940             :  * attnums.
    2941             :  */
    2942             : static void
    2943          36 : validate_pkattnums(Relation rel,
    2944             :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2945             :                    int **pkattnums, int *pknumatts)
    2946             : {
    2947          36 :     TupleDesc   tupdesc = rel->rd_att;
    2948          36 :     int         natts = tupdesc->natts;
    2949             :     int         i;
    2950             : 
    2951             :     /* Don't take more array elements than there are */
    2952          36 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    2953             : 
    2954             :     /* Must have at least one pk attnum selected */
    2955          36 :     if (pknumatts_arg <= 0)
    2956           0 :         ereport(ERROR,
    2957             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2958             :                  errmsg("number of key attributes must be > 0")));
    2959             : 
    2960             :     /* Allocate output array */
    2961          36 :     *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
    2962          36 :     *pknumatts = pknumatts_arg;
    2963             : 
    2964             :     /* Validate attnums and convert to internal form */
    2965         114 :     for (i = 0; i < pknumatts_arg; i++)
    2966             :     {
    2967          90 :         int         pkattnum = pkattnums_arg->values[i];
    2968             :         int         lnum;
    2969             :         int         j;
    2970             : 
    2971             :         /* Can throw error immediately if out of range */
    2972          90 :         if (pkattnum <= 0 || pkattnum > natts)
    2973          12 :             ereport(ERROR,
    2974             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2975             :                      errmsg("invalid attribute number %d", pkattnum)));
    2976             : 
    2977             :         /* Identify which physical column has this logical number */
    2978          78 :         lnum = 0;
    2979         138 :         for (j = 0; j < natts; j++)
    2980             :         {
    2981             :             /* dropped columns don't count */
    2982         138 :             if (TupleDescAttr(tupdesc, j)->attisdropped)
    2983           6 :                 continue;
    2984             : 
    2985         132 :             if (++lnum == pkattnum)
    2986          78 :                 break;
    2987             :         }
    2988             : 
    2989          78 :         if (j < natts)
    2990          78 :             (*pkattnums)[i] = j;
    2991             :         else
    2992           0 :             ereport(ERROR,
    2993             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2994             :                      errmsg("invalid attribute number %d", pkattnum)));
    2995             :     }
    2996          24 : }
    2997             : 
    2998             : /*
    2999             :  * Check if the specified connection option is valid.
    3000             :  *
    3001             :  * We basically allow whatever libpq thinks is an option, with these
    3002             :  * restrictions:
    3003             :  *      debug options: disallowed
    3004             :  *      "client_encoding": disallowed
    3005             :  *      "user": valid only in USER MAPPING options
    3006             :  *      secure options (eg password): valid only in USER MAPPING options
    3007             :  *      others: valid only in FOREIGN SERVER options
    3008             :  *
    3009             :  * We disallow client_encoding because it would be overridden anyway via
    3010             :  * PQclientEncoding; allowing it to be specified would merely promote
    3011             :  * confusion.
    3012             :  */
    3013             : static bool
    3014         166 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    3015             :                        Oid context)
    3016             : {
    3017             :     const PQconninfoOption *opt;
    3018             : 
    3019             :     /* Look up the option in libpq result */
    3020        2926 :     for (opt = options; opt->keyword; opt++)
    3021             :     {
    3022        2922 :         if (strcmp(opt->keyword, option) == 0)
    3023         162 :             break;
    3024             :     }
    3025         166 :     if (opt->keyword == NULL)
    3026           4 :         return false;
    3027             : 
    3028             :     /* Disallow debug options (particularly "replication") */
    3029         162 :     if (strchr(opt->dispchar, 'D'))
    3030           4 :         return false;
    3031             : 
    3032             :     /* Disallow "client_encoding" */
    3033         158 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    3034           4 :         return false;
    3035             : 
    3036             :     /*
    3037             :      * If the option is "user" or marked secure, it should be specified only
    3038             :      * in USER MAPPING.  Others should be specified only in SERVER.
    3039             :      */
    3040         154 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3041             :     {
    3042          18 :         if (context != UserMappingRelationId)
    3043           6 :             return false;
    3044             :     }
    3045             :     else
    3046             :     {
    3047         136 :         if (context != ForeignServerRelationId)
    3048         124 :             return false;
    3049             :     }
    3050             : 
    3051          24 :     return true;
    3052             : }
    3053             : 
    3054             : /*
    3055             :  * Copy the remote session's values of GUCs that affect datatype I/O
    3056             :  * and apply them locally in a new GUC nesting level.  Returns the new
    3057             :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3058             :  * or -1 if no new nestlevel was needed.
    3059             :  *
    3060             :  * We use the equivalent of a function SET option to allow the settings to
    3061             :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3062             :  * thrown in between, guc.c will take care of undoing the settings.
    3063             :  */
    3064             : static int
    3065          64 : applyRemoteGucs(PGconn *conn)
    3066             : {
    3067             :     static const char *const GUCsAffectingIO[] = {
    3068             :         "DateStyle",
    3069             :         "IntervalStyle"
    3070             :     };
    3071             : 
    3072          64 :     int         nestlevel = -1;
    3073             :     int         i;
    3074             : 
    3075         192 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3076             :     {
    3077         128 :         const char *gucName = GUCsAffectingIO[i];
    3078         128 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3079             :         const char *localVal;
    3080             : 
    3081             :         /*
    3082             :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3083             :          * that's okay because its output format won't be ambiguous.  So just
    3084             :          * skip the GUC if we don't get a value for it.  (We might eventually
    3085             :          * need more complicated logic with remote-version checks here.)
    3086             :          */
    3087         128 :         if (remoteVal == NULL)
    3088           0 :             continue;
    3089             : 
    3090             :         /*
    3091             :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3092             :          * have the same value.
    3093             :          */
    3094         128 :         localVal = GetConfigOption(gucName, false, false);
    3095             :         Assert(localVal != NULL);
    3096             : 
    3097         128 :         if (strcmp(remoteVal, localVal) == 0)
    3098          94 :             continue;
    3099             : 
    3100             :         /* Create new GUC nest level if we didn't already */
    3101          34 :         if (nestlevel < 0)
    3102          18 :             nestlevel = NewGUCNestLevel();
    3103             : 
    3104             :         /* Apply the option (this will throw error on failure) */
    3105          34 :         (void) set_config_option(gucName, remoteVal,
    3106             :                                  PGC_USERSET, PGC_S_SESSION,
    3107             :                                  GUC_ACTION_SAVE, true, 0, false);
    3108             :     }
    3109             : 
    3110          64 :     return nestlevel;
    3111             : }
    3112             : 
    3113             : /*
    3114             :  * Restore local GUCs after they have been overlaid with remote settings.
    3115             :  */
    3116             : static void
    3117          66 : restoreLocalGucs(int nestlevel)
    3118             : {
    3119             :     /* Do nothing if no new nestlevel was created */
    3120          66 :     if (nestlevel > 0)
    3121          16 :         AtEOXact_GUC(true, nestlevel);
    3122          66 : }

Generated by: LCOV version 1.14