LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Hit Total Coverage
Test: PostgreSQL 12beta2 Lines: 939 1082 86.8 %
Date: 2019-06-19 14:06:47 Functions: 74 76 97.4 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*
       2             :  * dblink.c
       3             :  *
       4             :  * Functions returning results from a remote database
       5             :  *
       6             :  * Joe Conway <mail@joeconway.com>
       7             :  * And contributors:
       8             :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9             :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10             :  *
      11             :  * contrib/dblink/dblink.c
      12             :  * Copyright (c) 2001-2019, PostgreSQL Global Development Group
      13             :  * ALL RIGHTS RESERVED;
      14             :  *
      15             :  * Permission to use, copy, modify, and distribute this software and its
      16             :  * documentation for any purpose, without fee, and without a written agreement
      17             :  * is hereby granted, provided that the above copyright notice and this
      18             :  * paragraph and the following two paragraphs appear in all copies.
      19             :  *
      20             :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21             :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22             :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23             :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24             :  * POSSIBILITY OF SUCH DAMAGE.
      25             :  *
      26             :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27             :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28             :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29             :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30             :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31             :  *
      32             :  */
      33             : #include "postgres.h"
      34             : 
      35             : #include <limits.h>
      36             : 
      37             : #include "libpq-fe.h"
      38             : 
      39             : #include "access/htup_details.h"
      40             : #include "access/relation.h"
      41             : #include "access/reloptions.h"
      42             : #include "access/table.h"
      43             : #include "catalog/indexing.h"
      44             : #include "catalog/namespace.h"
      45             : #include "catalog/pg_foreign_data_wrapper.h"
      46             : #include "catalog/pg_foreign_server.h"
      47             : #include "catalog/pg_type.h"
      48             : #include "catalog/pg_user_mapping.h"
      49             : #include "executor/spi.h"
      50             : #include "foreign/foreign.h"
      51             : #include "funcapi.h"
      52             : #include "lib/stringinfo.h"
      53             : #include "mb/pg_wchar.h"
      54             : #include "miscadmin.h"
      55             : #include "parser/scansup.h"
      56             : #include "utils/acl.h"
      57             : #include "utils/builtins.h"
      58             : #include "utils/fmgroids.h"
      59             : #include "utils/guc.h"
      60             : #include "utils/lsyscache.h"
      61             : #include "utils/memutils.h"
      62             : #include "utils/rel.h"
      63             : #include "utils/varlena.h"
      64             : 
      65           6 : PG_MODULE_MAGIC;
      66             : 
      67             : typedef struct remoteConn
      68             : {
      69             :     PGconn     *conn;           /* Hold the remote connection */
      70             :     int         openCursorCount;    /* The number of open cursors */
      71             :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      72             : } remoteConn;
      73             : 
      74             : typedef struct storeInfo
      75             : {
      76             :     FunctionCallInfo fcinfo;
      77             :     Tuplestorestate *tuplestore;
      78             :     AttInMetadata *attinmeta;
      79             :     MemoryContext tmpcontext;
      80             :     char      **cstrs;
      81             :     /* temp storage for results to avoid leaks on exception */
      82             :     PGresult   *last_res;
      83             :     PGresult   *cur_res;
      84             : } storeInfo;
      85             : 
      86             : /*
      87             :  * Internal declarations
      88             :  */
      89             : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      90             : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      91             : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      92             :                               PGresult *res);
      93             : static void materializeQueryResult(FunctionCallInfo fcinfo,
      94             :                                    PGconn *conn,
      95             :                                    const char *conname,
      96             :                                    const char *sql,
      97             :                                    bool fail);
      98             : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
      99             : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
     100             : static remoteConn *getConnectionByName(const char *name);
     101             : static HTAB *createConnHash(void);
     102             : static void createNewConnection(const char *name, remoteConn *rconn);
     103             : static void deleteConnection(const char *name);
     104             : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     105             : static char **get_text_array_contents(ArrayType *array, int *numitems);
     106             : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     107             : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     108             : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     109             : static char *quote_ident_cstr(char *rawstr);
     110             : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     111             : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     112             : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     113             : static char *generate_relation_name(Relation rel);
     114             : static void dblink_connstr_check(const char *connstr);
     115             : static void dblink_security_check(PGconn *conn, remoteConn *rconn);
     116             : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     117             :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     118             : static char *get_connect_string(const char *servername);
     119             : static char *escape_param_str(const char *from);
     120             : static void validate_pkattnums(Relation rel,
     121             :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     122             :                                int **pkattnums, int *pknumatts);
     123             : static bool is_valid_dblink_option(const PQconninfoOption *options,
     124             :                                    const char *option, Oid context);
     125             : static int  applyRemoteGucs(PGconn *conn);
     126             : static void restoreLocalGucs(int nestlevel);
     127             : 
     128             : /* Global */
     129             : static remoteConn *pconn = NULL;
     130             : static HTAB *remoteConnHash = NULL;
     131             : 
     132             : /*
     133             :  *  Following is list that holds multiple remote connections.
     134             :  *  Calling convention of each dblink function changes to accept
     135             :  *  connection name as the first parameter. The connection list is
     136             :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     137             :  */
     138             : 
     139             : typedef struct remoteConnHashEnt
     140             : {
     141             :     char        name[NAMEDATALEN];
     142             :     remoteConn *rconn;
     143             : } remoteConnHashEnt;
     144             : 
     145             : /* initial number of connection hashes */
     146             : #define NUMCONN 16
     147             : 
     148             : static char *
     149          96 : xpstrdup(const char *in)
     150             : {
     151          96 :     if (in == NULL)
     152          72 :         return NULL;
     153          24 :     return pstrdup(in);
     154             : }
     155             : 
     156             : static void
     157             : pg_attribute_noreturn()
     158           0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     159             : {
     160           0 :     char       *msg = pchomp(PQerrorMessage(conn));
     161             : 
     162           0 :     if (res)
     163           0 :         PQclear(res);
     164           0 :     elog(ERROR, "%s: %s", p2, msg);
     165             : }
     166             : 
     167             : static void
     168             : pg_attribute_noreturn()
     169           6 : dblink_conn_not_avail(const char *conname)
     170             : {
     171           6 :     if (conname)
     172           2 :         ereport(ERROR,
     173             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     174             :                  errmsg("connection \"%s\" not available", conname)));
     175             :     else
     176           4 :         ereport(ERROR,
     177             :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     178             :                  errmsg("connection not available")));
     179             : }
     180             : 
     181             : static void
     182          66 : dblink_get_conn(char *conname_or_str,
     183             :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     184             : {
     185          66 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     186             :     PGconn     *conn;
     187             :     char       *conname;
     188             :     bool        freeconn;
     189             : 
     190          66 :     if (rconn)
     191             :     {
     192          54 :         conn = rconn->conn;
     193          54 :         conname = conname_or_str;
     194          54 :         freeconn = false;
     195             :     }
     196             :     else
     197             :     {
     198             :         const char *connstr;
     199             : 
     200          12 :         connstr = get_connect_string(conname_or_str);
     201          12 :         if (connstr == NULL)
     202          12 :             connstr = conname_or_str;
     203          12 :         dblink_connstr_check(connstr);
     204          12 :         conn = PQconnectdb(connstr);
     205          12 :         if (PQstatus(conn) == CONNECTION_BAD)
     206             :         {
     207           4 :             char       *msg = pchomp(PQerrorMessage(conn));
     208             : 
     209           4 :             PQfinish(conn);
     210           4 :             ereport(ERROR,
     211             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     212             :                      errmsg("could not establish connection"),
     213             :                      errdetail_internal("%s", msg)));
     214             :         }
     215           8 :         dblink_security_check(conn, rconn);
     216           8 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     217           0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     218           8 :         freeconn = true;
     219           8 :         conname = NULL;
     220             :     }
     221             : 
     222          62 :     *conn_p = conn;
     223          62 :     *conname_p = conname;
     224          62 :     *freeconn_p = freeconn;
     225          62 : }
     226             : 
     227             : static PGconn *
     228          34 : dblink_get_named_conn(const char *conname)
     229             : {
     230          34 :     remoteConn *rconn = getConnectionByName(conname);
     231             : 
     232          34 :     if (rconn)
     233          34 :         return rconn->conn;
     234             : 
     235           0 :     dblink_conn_not_avail(conname);
     236             :     return NULL;                /* keep compiler quiet */
     237             : }
     238             : 
     239             : static void
     240         234 : dblink_init(void)
     241             : {
     242         234 :     if (!pconn)
     243             :     {
     244           6 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     245           6 :         pconn->conn = NULL;
     246           6 :         pconn->openCursorCount = 0;
     247           6 :         pconn->newXactForCursor = false;
     248             :     }
     249         234 : }
     250             : 
     251             : /*
     252             :  * Create a persistent connection to another database
     253             :  */
     254          18 : PG_FUNCTION_INFO_V1(dblink_connect);
     255             : Datum
     256          28 : dblink_connect(PG_FUNCTION_ARGS)
     257             : {
     258          28 :     char       *conname_or_str = NULL;
     259          28 :     char       *connstr = NULL;
     260          28 :     char       *connname = NULL;
     261             :     char       *msg;
     262          28 :     PGconn     *conn = NULL;
     263          28 :     remoteConn *rconn = NULL;
     264             : 
     265          28 :     dblink_init();
     266             : 
     267          28 :     if (PG_NARGS() == 2)
     268             :     {
     269          22 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     270          22 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     271             :     }
     272           6 :     else if (PG_NARGS() == 1)
     273           6 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     274             : 
     275          28 :     if (connname)
     276          22 :         rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
     277             :                                                   sizeof(remoteConn));
     278             : 
     279             :     /* first check for valid foreign data server */
     280          28 :     connstr = get_connect_string(conname_or_str);
     281          28 :     if (connstr == NULL)
     282          24 :         connstr = conname_or_str;
     283             : 
     284             :     /* check password in connection string if not superuser */
     285          28 :     dblink_connstr_check(connstr);
     286          26 :     conn = PQconnectdb(connstr);
     287             : 
     288          26 :     if (PQstatus(conn) == CONNECTION_BAD)
     289             :     {
     290           0 :         msg = pchomp(PQerrorMessage(conn));
     291           0 :         PQfinish(conn);
     292           0 :         if (rconn)
     293           0 :             pfree(rconn);
     294             : 
     295           0 :         ereport(ERROR,
     296             :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     297             :                  errmsg("could not establish connection"),
     298             :                  errdetail_internal("%s", msg)));
     299             :     }
     300             : 
     301             :     /* check password actually used if not superuser */
     302          26 :     dblink_security_check(conn, rconn);
     303             : 
     304             :     /* attempt to set client encoding to match server encoding, if needed */
     305          26 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     306           0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     307             : 
     308          26 :     if (connname)
     309             :     {
     310          20 :         rconn->conn = conn;
     311          20 :         createNewConnection(connname, rconn);
     312             :     }
     313             :     else
     314             :     {
     315           6 :         if (pconn->conn)
     316           0 :             PQfinish(pconn->conn);
     317           6 :         pconn->conn = conn;
     318             :     }
     319             : 
     320          24 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     321             : }
     322             : 
     323             : /*
     324             :  * Clear a persistent connection to another database
     325             :  */
     326          12 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     327             : Datum
     328          24 : dblink_disconnect(PG_FUNCTION_ARGS)
     329             : {
     330          24 :     char       *conname = NULL;
     331          24 :     remoteConn *rconn = NULL;
     332          24 :     PGconn     *conn = NULL;
     333             : 
     334          24 :     dblink_init();
     335             : 
     336          24 :     if (PG_NARGS() == 1)
     337             :     {
     338          18 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     339          18 :         rconn = getConnectionByName(conname);
     340          18 :         if (rconn)
     341          16 :             conn = rconn->conn;
     342             :     }
     343             :     else
     344           6 :         conn = pconn->conn;
     345             : 
     346          24 :     if (!conn)
     347           2 :         dblink_conn_not_avail(conname);
     348             : 
     349          22 :     PQfinish(conn);
     350          22 :     if (rconn)
     351             :     {
     352          16 :         deleteConnection(conname);
     353          16 :         pfree(rconn);
     354             :     }
     355             :     else
     356           6 :         pconn->conn = NULL;
     357             : 
     358          22 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     359             : }
     360             : 
     361             : /*
     362             :  * opens a cursor using a persistent connection
     363             :  */
     364          18 : PG_FUNCTION_INFO_V1(dblink_open);
     365             : Datum
     366          18 : dblink_open(PG_FUNCTION_ARGS)
     367             : {
     368          18 :     PGresult   *res = NULL;
     369             :     PGconn     *conn;
     370          18 :     char       *curname = NULL;
     371          18 :     char       *sql = NULL;
     372          18 :     char       *conname = NULL;
     373             :     StringInfoData buf;
     374          18 :     remoteConn *rconn = NULL;
     375          18 :     bool        fail = true;    /* default to backward compatible behavior */
     376             : 
     377          18 :     dblink_init();
     378          18 :     initStringInfo(&buf);
     379             : 
     380          18 :     if (PG_NARGS() == 2)
     381             :     {
     382             :         /* text,text */
     383           4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     384           4 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     385           4 :         rconn = pconn;
     386             :     }
     387          14 :     else if (PG_NARGS() == 3)
     388             :     {
     389             :         /* might be text,text,text or text,text,bool */
     390          12 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     391             :         {
     392           2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     393           2 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     394           2 :             fail = PG_GETARG_BOOL(2);
     395           2 :             rconn = pconn;
     396             :         }
     397             :         else
     398             :         {
     399          10 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     400          10 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     401          10 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     402          10 :             rconn = getConnectionByName(conname);
     403             :         }
     404             :     }
     405           2 :     else if (PG_NARGS() == 4)
     406             :     {
     407             :         /* text,text,text,bool */
     408           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     409           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     410           2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     411           2 :         fail = PG_GETARG_BOOL(3);
     412           2 :         rconn = getConnectionByName(conname);
     413             :     }
     414             : 
     415          18 :     if (!rconn || !rconn->conn)
     416           0 :         dblink_conn_not_avail(conname);
     417             : 
     418          18 :     conn = rconn->conn;
     419             : 
     420             :     /* If we are not in a transaction, start one */
     421          18 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     422             :     {
     423          14 :         res = PQexec(conn, "BEGIN");
     424          14 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     425           0 :             dblink_res_internalerror(conn, res, "begin error");
     426          14 :         PQclear(res);
     427          14 :         rconn->newXactForCursor = true;
     428             : 
     429             :         /*
     430             :          * Since transaction state was IDLE, we force cursor count to
     431             :          * initially be 0. This is needed as a previous ABORT might have wiped
     432             :          * out our transaction without maintaining the cursor count for us.
     433             :          */
     434          14 :         rconn->openCursorCount = 0;
     435             :     }
     436             : 
     437             :     /* if we started a transaction, increment cursor count */
     438          18 :     if (rconn->newXactForCursor)
     439          18 :         (rconn->openCursorCount)++;
     440             : 
     441          18 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     442          18 :     res = PQexec(conn, buf.data);
     443          18 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     444             :     {
     445           4 :         dblink_res_error(conn, conname, res, fail,
     446             :                          "while opening cursor \"%s\"", curname);
     447           4 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     448             :     }
     449             : 
     450          14 :     PQclear(res);
     451          14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     452             : }
     453             : 
     454             : /*
     455             :  * closes a cursor
     456             :  */
     457          12 : PG_FUNCTION_INFO_V1(dblink_close);
     458             : Datum
     459          10 : dblink_close(PG_FUNCTION_ARGS)
     460             : {
     461             :     PGconn     *conn;
     462          10 :     PGresult   *res = NULL;
     463          10 :     char       *curname = NULL;
     464          10 :     char       *conname = NULL;
     465             :     StringInfoData buf;
     466          10 :     remoteConn *rconn = NULL;
     467          10 :     bool        fail = true;    /* default to backward compatible behavior */
     468             : 
     469          10 :     dblink_init();
     470          10 :     initStringInfo(&buf);
     471             : 
     472          10 :     if (PG_NARGS() == 1)
     473             :     {
     474             :         /* text */
     475           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     476           0 :         rconn = pconn;
     477             :     }
     478          10 :     else if (PG_NARGS() == 2)
     479             :     {
     480             :         /* might be text,text or text,bool */
     481          10 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     482             :         {
     483           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     484           4 :             fail = PG_GETARG_BOOL(1);
     485           4 :             rconn = pconn;
     486             :         }
     487             :         else
     488             :         {
     489           6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     490           6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     491           6 :             rconn = getConnectionByName(conname);
     492             :         }
     493             :     }
     494          10 :     if (PG_NARGS() == 3)
     495             :     {
     496             :         /* text,text,bool */
     497           0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     498           0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     499           0 :         fail = PG_GETARG_BOOL(2);
     500           0 :         rconn = getConnectionByName(conname);
     501             :     }
     502             : 
     503          10 :     if (!rconn || !rconn->conn)
     504           0 :         dblink_conn_not_avail(conname);
     505             : 
     506          10 :     conn = rconn->conn;
     507             : 
     508          10 :     appendStringInfo(&buf, "CLOSE %s", curname);
     509             : 
     510             :     /* close the cursor */
     511          10 :     res = PQexec(conn, buf.data);
     512          10 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     513             :     {
     514           2 :         dblink_res_error(conn, conname, res, fail,
     515             :                          "while closing cursor \"%s\"", curname);
     516           2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     517             :     }
     518             : 
     519           8 :     PQclear(res);
     520             : 
     521             :     /* if we started a transaction, decrement cursor count */
     522           8 :     if (rconn->newXactForCursor)
     523             :     {
     524           8 :         (rconn->openCursorCount)--;
     525             : 
     526             :         /* if count is zero, commit the transaction */
     527           8 :         if (rconn->openCursorCount == 0)
     528             :         {
     529           4 :             rconn->newXactForCursor = false;
     530             : 
     531           4 :             res = PQexec(conn, "COMMIT");
     532           4 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     533           0 :                 dblink_res_internalerror(conn, res, "commit error");
     534           4 :             PQclear(res);
     535             :         }
     536             :     }
     537             : 
     538           8 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     539             : }
     540             : 
     541             : /*
     542             :  * Fetch results from an open cursor
     543             :  */
     544          18 : PG_FUNCTION_INFO_V1(dblink_fetch);
     545             : Datum
     546          26 : dblink_fetch(PG_FUNCTION_ARGS)
     547             : {
     548          26 :     PGresult   *res = NULL;
     549          26 :     char       *conname = NULL;
     550          26 :     remoteConn *rconn = NULL;
     551          26 :     PGconn     *conn = NULL;
     552             :     StringInfoData buf;
     553          26 :     char       *curname = NULL;
     554          26 :     int         howmany = 0;
     555          26 :     bool        fail = true;    /* default to backward compatible */
     556             : 
     557          26 :     prepTuplestoreResult(fcinfo);
     558             : 
     559          26 :     dblink_init();
     560             : 
     561          26 :     if (PG_NARGS() == 4)
     562             :     {
     563             :         /* text,text,int,bool */
     564           2 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     565           2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     566           2 :         howmany = PG_GETARG_INT32(2);
     567           2 :         fail = PG_GETARG_BOOL(3);
     568             : 
     569           2 :         rconn = getConnectionByName(conname);
     570           2 :         if (rconn)
     571           2 :             conn = rconn->conn;
     572             :     }
     573          24 :     else if (PG_NARGS() == 3)
     574             :     {
     575             :         /* text,text,int or text,int,bool */
     576          16 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     577             :         {
     578           4 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     579           4 :             howmany = PG_GETARG_INT32(1);
     580           4 :             fail = PG_GETARG_BOOL(2);
     581           4 :             conn = pconn->conn;
     582             :         }
     583             :         else
     584             :         {
     585          12 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     586          12 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     587          12 :             howmany = PG_GETARG_INT32(2);
     588             : 
     589          12 :             rconn = getConnectionByName(conname);
     590          12 :             if (rconn)
     591          12 :                 conn = rconn->conn;
     592             :         }
     593             :     }
     594           8 :     else if (PG_NARGS() == 2)
     595             :     {
     596             :         /* text,int */
     597           8 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     598           8 :         howmany = PG_GETARG_INT32(1);
     599           8 :         conn = pconn->conn;
     600             :     }
     601             : 
     602          26 :     if (!conn)
     603           0 :         dblink_conn_not_avail(conname);
     604             : 
     605          26 :     initStringInfo(&buf);
     606          26 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     607             : 
     608             :     /*
     609             :      * Try to execute the query.  Note that since libpq uses malloc, the
     610             :      * PGresult will be long-lived even though we are still in a short-lived
     611             :      * memory context.
     612             :      */
     613          26 :     res = PQexec(conn, buf.data);
     614          52 :     if (!res ||
     615          52 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     616          26 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     617             :     {
     618          10 :         dblink_res_error(conn, conname, res, fail,
     619             :                          "while fetching from cursor \"%s\"", curname);
     620           6 :         return (Datum) 0;
     621             :     }
     622          16 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     623             :     {
     624             :         /* cursor does not exist - closed already or bad name */
     625           0 :         PQclear(res);
     626           0 :         ereport(ERROR,
     627             :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     628             :                  errmsg("cursor \"%s\" does not exist", curname)));
     629             :     }
     630             : 
     631          16 :     materializeResult(fcinfo, conn, res);
     632          14 :     return (Datum) 0;
     633             : }
     634             : 
     635             : /*
     636             :  * Note: this is the new preferred version of dblink
     637             :  */
     638          22 : PG_FUNCTION_INFO_V1(dblink_record);
     639             : Datum
     640          50 : dblink_record(PG_FUNCTION_ARGS)
     641             : {
     642          50 :     return dblink_record_internal(fcinfo, false);
     643             : }
     644             : 
     645           6 : PG_FUNCTION_INFO_V1(dblink_send_query);
     646             : Datum
     647          12 : dblink_send_query(PG_FUNCTION_ARGS)
     648             : {
     649             :     PGconn     *conn;
     650             :     char       *sql;
     651             :     int         retval;
     652             : 
     653          12 :     if (PG_NARGS() == 2)
     654             :     {
     655          12 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     656          12 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     657             :     }
     658             :     else
     659             :         /* shouldn't happen */
     660           0 :         elog(ERROR, "wrong number of arguments");
     661             : 
     662             :     /* async query send */
     663          12 :     retval = PQsendQuery(conn, sql);
     664          12 :     if (retval != 1)
     665           0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     666             : 
     667          12 :     PG_RETURN_INT32(retval);
     668             : }
     669             : 
     670           8 : PG_FUNCTION_INFO_V1(dblink_get_result);
     671             : Datum
     672          16 : dblink_get_result(PG_FUNCTION_ARGS)
     673             : {
     674          16 :     return dblink_record_internal(fcinfo, true);
     675             : }
     676             : 
     677             : static Datum
     678          66 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     679             : {
     680          66 :     PGconn     *volatile conn = NULL;
     681          66 :     volatile bool freeconn = false;
     682             : 
     683          66 :     prepTuplestoreResult(fcinfo);
     684             : 
     685          66 :     dblink_init();
     686             : 
     687          66 :     PG_TRY();
     688             :     {
     689          66 :         char       *sql = NULL;
     690          66 :         char       *conname = NULL;
     691          66 :         bool        fail = true;    /* default to backward compatible */
     692             : 
     693          66 :         if (!is_async)
     694             :         {
     695          50 :             if (PG_NARGS() == 3)
     696             :             {
     697             :                 /* text,text,bool */
     698           2 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     699           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     700           2 :                 fail = PG_GETARG_BOOL(2);
     701           2 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     702             :             }
     703          48 :             else if (PG_NARGS() == 2)
     704             :             {
     705             :                 /* text,text or text,bool */
     706          34 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     707             :                 {
     708           2 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     709           2 :                     fail = PG_GETARG_BOOL(1);
     710           2 :                     conn = pconn->conn;
     711             :                 }
     712             :                 else
     713             :                 {
     714          32 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     715          32 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     716          32 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     717             :                 }
     718             :             }
     719          14 :             else if (PG_NARGS() == 1)
     720             :             {
     721             :                 /* text */
     722          14 :                 conn = pconn->conn;
     723          14 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     724             :             }
     725             :             else
     726             :                 /* shouldn't happen */
     727           0 :                 elog(ERROR, "wrong number of arguments");
     728             :         }
     729             :         else                    /* is_async */
     730             :         {
     731             :             /* get async result */
     732          16 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     733             : 
     734          16 :             if (PG_NARGS() == 2)
     735             :             {
     736             :                 /* text,bool */
     737           0 :                 fail = PG_GETARG_BOOL(1);
     738           0 :                 conn = dblink_get_named_conn(conname);
     739             :             }
     740          16 :             else if (PG_NARGS() == 1)
     741             :             {
     742             :                 /* text */
     743          16 :                 conn = dblink_get_named_conn(conname);
     744             :             }
     745             :             else
     746             :                 /* shouldn't happen */
     747           0 :                 elog(ERROR, "wrong number of arguments");
     748             :         }
     749             : 
     750          62 :         if (!conn)
     751           4 :             dblink_conn_not_avail(conname);
     752             : 
     753          58 :         if (!is_async)
     754             :         {
     755             :             /* synchronous query, use efficient tuple collection method */
     756          42 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     757             :         }
     758             :         else
     759             :         {
     760             :             /* async result retrieval, do it the old way */
     761          16 :             PGresult   *res = PQgetResult(conn);
     762             : 
     763             :             /* NULL means we're all done with the async results */
     764          16 :             if (res)
     765             :             {
     766          20 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     767          10 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     768             :                 {
     769           0 :                     dblink_res_error(conn, conname, res, fail,
     770             :                                      "while executing query");
     771             :                     /* if fail isn't set, we'll return an empty query result */
     772             :                 }
     773             :                 else
     774             :                 {
     775          10 :                     materializeResult(fcinfo, conn, res);
     776             :                 }
     777             :             }
     778             :         }
     779             :     }
     780           8 :     PG_CATCH();
     781             :     {
     782             :         /* if needed, close the connection to the database */
     783           8 :         if (freeconn)
     784           0 :             PQfinish(conn);
     785           8 :         PG_RE_THROW();
     786             :     }
     787          58 :     PG_END_TRY();
     788             : 
     789             :     /* if needed, close the connection to the database */
     790          58 :     if (freeconn)
     791           6 :         PQfinish(conn);
     792             : 
     793          58 :     return (Datum) 0;
     794             : }
     795             : 
     796             : /*
     797             :  * Verify function caller can handle a tuplestore result, and set up for that.
     798             :  *
     799             :  * Note: if the caller returns without actually creating a tuplestore, the
     800             :  * executor will treat the function result as an empty set.
     801             :  */
     802             : static void
     803          96 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     804             : {
     805          96 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     806             : 
     807             :     /* check to see if query supports us returning a tuplestore */
     808          96 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     809           0 :         ereport(ERROR,
     810             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     811             :                  errmsg("set-valued function called in context that cannot accept a set")));
     812          96 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     813           0 :         ereport(ERROR,
     814             :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     815             :                  errmsg("materialize mode required, but it is not allowed in this context")));
     816             : 
     817             :     /* let the executor know we're sending back a tuplestore */
     818          96 :     rsinfo->returnMode = SFRM_Materialize;
     819             : 
     820             :     /* caller must fill these to return a non-empty result */
     821          96 :     rsinfo->setResult = NULL;
     822          96 :     rsinfo->setDesc = NULL;
     823          96 : }
     824             : 
     825             : /*
     826             :  * Copy the contents of the PGresult into a tuplestore to be returned
     827             :  * as the result of the current function.
     828             :  * The PGresult will be released in this function.
     829             :  */
     830             : static void
     831          26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     832             : {
     833          26 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     834             : 
     835             :     /* prepTuplestoreResult must have been called previously */
     836             :     Assert(rsinfo->returnMode == SFRM_Materialize);
     837             : 
     838          26 :     PG_TRY();
     839             :     {
     840             :         TupleDesc   tupdesc;
     841             :         bool        is_sql_cmd;
     842             :         int         ntuples;
     843             :         int         nfields;
     844             : 
     845          26 :         if (PQresultStatus(res) == PGRES_COMMAND_OK)
     846             :         {
     847           0 :             is_sql_cmd = true;
     848             : 
     849             :             /*
     850             :              * need a tuple descriptor representing one TEXT column to return
     851             :              * the command status string as our result tuple
     852             :              */
     853           0 :             tupdesc = CreateTemplateTupleDesc(1);
     854           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     855             :                                TEXTOID, -1, 0);
     856           0 :             ntuples = 1;
     857           0 :             nfields = 1;
     858             :         }
     859             :         else
     860             :         {
     861             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     862             : 
     863          26 :             is_sql_cmd = false;
     864             : 
     865             :             /* get a tuple descriptor for our result type */
     866          26 :             switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     867             :             {
     868             :                 case TYPEFUNC_COMPOSITE:
     869             :                     /* success */
     870          26 :                     break;
     871             :                 case TYPEFUNC_RECORD:
     872             :                     /* failed to determine actual type of RECORD */
     873           0 :                     ereport(ERROR,
     874             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     875             :                              errmsg("function returning record called in context "
     876             :                                     "that cannot accept type record")));
     877             :                     break;
     878             :                 default:
     879             :                     /* result type isn't composite */
     880           0 :                     elog(ERROR, "return type must be a row type");
     881             :                     break;
     882             :             }
     883             : 
     884             :             /* make sure we have a persistent copy of the tupdesc */
     885          26 :             tupdesc = CreateTupleDescCopy(tupdesc);
     886          26 :             ntuples = PQntuples(res);
     887          26 :             nfields = PQnfields(res);
     888             :         }
     889             : 
     890             :         /*
     891             :          * check result and tuple descriptor have the same number of columns
     892             :          */
     893          26 :         if (nfields != tupdesc->natts)
     894           0 :             ereport(ERROR,
     895             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
     896             :                      errmsg("remote query result rowtype does not match "
     897             :                             "the specified FROM clause rowtype")));
     898             : 
     899          26 :         if (ntuples > 0)
     900             :         {
     901             :             AttInMetadata *attinmeta;
     902          26 :             int         nestlevel = -1;
     903             :             Tuplestorestate *tupstore;
     904             :             MemoryContext oldcontext;
     905             :             int         row;
     906             :             char      **values;
     907             : 
     908          26 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
     909             : 
     910             :             /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     911          26 :             if (!is_sql_cmd)
     912          26 :                 nestlevel = applyRemoteGucs(conn);
     913             : 
     914          26 :             oldcontext = MemoryContextSwitchTo(
     915          26 :                                                rsinfo->econtext->ecxt_per_query_memory);
     916          26 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
     917          26 :             rsinfo->setResult = tupstore;
     918          26 :             rsinfo->setDesc = tupdesc;
     919          26 :             MemoryContextSwitchTo(oldcontext);
     920             : 
     921          26 :             values = (char **) palloc(nfields * sizeof(char *));
     922             : 
     923             :             /* put all tuples into the tuplestore */
     924          98 :             for (row = 0; row < ntuples; row++)
     925             :             {
     926             :                 HeapTuple   tuple;
     927             : 
     928          74 :                 if (!is_sql_cmd)
     929             :                 {
     930             :                     int         i;
     931             : 
     932         276 :                     for (i = 0; i < nfields; i++)
     933             :                     {
     934         202 :                         if (PQgetisnull(res, row, i))
     935           0 :                             values[i] = NULL;
     936             :                         else
     937         202 :                             values[i] = PQgetvalue(res, row, i);
     938             :                     }
     939             :                 }
     940             :                 else
     941             :                 {
     942           0 :                     values[0] = PQcmdStatus(res);
     943             :                 }
     944             : 
     945             :                 /* build the tuple and put it into the tuplestore. */
     946          74 :                 tuple = BuildTupleFromCStrings(attinmeta, values);
     947          72 :                 tuplestore_puttuple(tupstore, tuple);
     948             :             }
     949             : 
     950             :             /* clean up GUC settings, if we changed any */
     951          24 :             restoreLocalGucs(nestlevel);
     952             : 
     953             :             /* clean up and return the tuplestore */
     954             :             tuplestore_donestoring(tupstore);
     955             :         }
     956             : 
     957          24 :         PQclear(res);
     958             :     }
     959           2 :     PG_CATCH();
     960             :     {
     961             :         /* be sure to release the libpq result */
     962           2 :         PQclear(res);
     963           2 :         PG_RE_THROW();
     964             :     }
     965          24 :     PG_END_TRY();
     966          24 : }
     967             : 
     968             : /*
     969             :  * Execute the given SQL command and store its results into a tuplestore
     970             :  * to be returned as the result of the current function.
     971             :  *
     972             :  * This is equivalent to PQexec followed by materializeResult, but we make
     973             :  * use of libpq's single-row mode to avoid accumulating the whole result
     974             :  * inside libpq before it gets transferred to the tuplestore.
     975             :  */
     976             : static void
     977          42 : materializeQueryResult(FunctionCallInfo fcinfo,
     978             :                        PGconn *conn,
     979             :                        const char *conname,
     980             :                        const char *sql,
     981             :                        bool fail)
     982             : {
     983          42 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     984          42 :     PGresult   *volatile res = NULL;
     985          42 :     volatile storeInfo sinfo = {0};
     986             : 
     987             :     /* prepTuplestoreResult must have been called previously */
     988             :     Assert(rsinfo->returnMode == SFRM_Materialize);
     989             : 
     990          42 :     sinfo.fcinfo = fcinfo;
     991             : 
     992          42 :     PG_TRY();
     993             :     {
     994             :         /* Create short-lived memory context for data conversions */
     995          42 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
     996             :                                                  "dblink temporary context",
     997             :                                                  ALLOCSET_DEFAULT_SIZES);
     998             : 
     999             :         /* execute query, collecting any tuples into the tuplestore */
    1000          42 :         res = storeQueryResult(&sinfo, conn, sql);
    1001             : 
    1002          84 :         if (!res ||
    1003          84 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1004          42 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1005           4 :         {
    1006             :             /*
    1007             :              * dblink_res_error will clear the passed PGresult, so we need
    1008             :              * this ugly dance to avoid doing so twice during error exit
    1009             :              */
    1010           4 :             PGresult   *res1 = res;
    1011             : 
    1012           4 :             res = NULL;
    1013           4 :             dblink_res_error(conn, conname, res1, fail,
    1014             :                              "while executing query");
    1015             :             /* if fail isn't set, we'll return an empty query result */
    1016             :         }
    1017          38 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1018             :         {
    1019             :             /*
    1020             :              * storeRow didn't get called, so we need to convert the command
    1021             :              * status string to a tuple manually
    1022             :              */
    1023             :             TupleDesc   tupdesc;
    1024             :             AttInMetadata *attinmeta;
    1025             :             Tuplestorestate *tupstore;
    1026             :             HeapTuple   tuple;
    1027             :             char       *values[1];
    1028             :             MemoryContext oldcontext;
    1029             : 
    1030             :             /*
    1031             :              * need a tuple descriptor representing one TEXT column to return
    1032             :              * the command status string as our result tuple
    1033             :              */
    1034           0 :             tupdesc = CreateTemplateTupleDesc(1);
    1035           0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1036             :                                TEXTOID, -1, 0);
    1037           0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1038             : 
    1039           0 :             oldcontext = MemoryContextSwitchTo(
    1040           0 :                                                rsinfo->econtext->ecxt_per_query_memory);
    1041           0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1042           0 :             rsinfo->setResult = tupstore;
    1043           0 :             rsinfo->setDesc = tupdesc;
    1044           0 :             MemoryContextSwitchTo(oldcontext);
    1045             : 
    1046           0 :             values[0] = PQcmdStatus(res);
    1047             : 
    1048             :             /* build the tuple and put it into the tuplestore. */
    1049           0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1050           0 :             tuplestore_puttuple(tupstore, tuple);
    1051             : 
    1052           0 :             PQclear(res);
    1053           0 :             res = NULL;
    1054             :         }
    1055             :         else
    1056             :         {
    1057             :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1058             :             /* storeRow should have created a tuplestore */
    1059             :             Assert(rsinfo->setResult != NULL);
    1060             : 
    1061          38 :             PQclear(res);
    1062          38 :             res = NULL;
    1063             :         }
    1064             : 
    1065             :         /* clean up data conversion short-lived memory context */
    1066          42 :         if (sinfo.tmpcontext != NULL)
    1067          42 :             MemoryContextDelete(sinfo.tmpcontext);
    1068          42 :         sinfo.tmpcontext = NULL;
    1069             : 
    1070          42 :         PQclear(sinfo.last_res);
    1071          42 :         sinfo.last_res = NULL;
    1072          42 :         PQclear(sinfo.cur_res);
    1073          42 :         sinfo.cur_res = NULL;
    1074             :     }
    1075           0 :     PG_CATCH();
    1076             :     {
    1077             :         /* be sure to release any libpq result we collected */
    1078           0 :         PQclear(res);
    1079           0 :         PQclear(sinfo.last_res);
    1080           0 :         PQclear(sinfo.cur_res);
    1081             :         /* and clear out any pending data in libpq */
    1082           0 :         while ((res = PQgetResult(conn)) != NULL)
    1083           0 :             PQclear(res);
    1084           0 :         PG_RE_THROW();
    1085             :     }
    1086          42 :     PG_END_TRY();
    1087          42 : }
    1088             : 
    1089             : /*
    1090             :  * Execute query, and send any result rows to sinfo->tuplestore.
    1091             :  */
    1092             : static PGresult *
    1093          42 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
    1094             : {
    1095          42 :     bool        first = true;
    1096          42 :     int         nestlevel = -1;
    1097             :     PGresult   *res;
    1098             : 
    1099          42 :     if (!PQsendQuery(conn, sql))
    1100           0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1101             : 
    1102          42 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1103           0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1104             : 
    1105             :     for (;;)
    1106             :     {
    1107         654 :         CHECK_FOR_INTERRUPTS();
    1108             : 
    1109         348 :         sinfo->cur_res = PQgetResult(conn);
    1110         348 :         if (!sinfo->cur_res)
    1111          42 :             break;
    1112             : 
    1113         306 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1114             :         {
    1115             :             /* got one row from possibly-bigger resultset */
    1116             : 
    1117             :             /*
    1118             :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1119             :              * We shouldn't do this until we have a row in hand, to ensure
    1120             :              * libpq has seen any earlier ParameterStatus protocol messages.
    1121             :              */
    1122         264 :             if (first && nestlevel < 0)
    1123          38 :                 nestlevel = applyRemoteGucs(conn);
    1124             : 
    1125         264 :             storeRow(sinfo, sinfo->cur_res, first);
    1126             : 
    1127         264 :             PQclear(sinfo->cur_res);
    1128         264 :             sinfo->cur_res = NULL;
    1129         264 :             first = false;
    1130             :         }
    1131             :         else
    1132             :         {
    1133             :             /* if empty resultset, fill tuplestore header */
    1134          42 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1135           0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1136             : 
    1137             :             /* store completed result at last_res */
    1138          42 :             PQclear(sinfo->last_res);
    1139          42 :             sinfo->last_res = sinfo->cur_res;
    1140          42 :             sinfo->cur_res = NULL;
    1141          42 :             first = true;
    1142             :         }
    1143             :     }
    1144             : 
    1145             :     /* clean up GUC settings, if we changed any */
    1146          42 :     restoreLocalGucs(nestlevel);
    1147             : 
    1148             :     /* return last_res */
    1149          42 :     res = sinfo->last_res;
    1150          42 :     sinfo->last_res = NULL;
    1151          42 :     return res;
    1152             : }
    1153             : 
    1154             : /*
    1155             :  * Send single row to sinfo->tuplestore.
    1156             :  *
    1157             :  * If "first" is true, create the tuplestore using PGresult's metadata
    1158             :  * (in this case the PGresult might contain either zero or one row).
    1159             :  */
    1160             : static void
    1161         264 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
    1162             : {
    1163         264 :     int         nfields = PQnfields(res);
    1164             :     HeapTuple   tuple;
    1165             :     int         i;
    1166             :     MemoryContext oldcontext;
    1167             : 
    1168         264 :     if (first)
    1169             :     {
    1170             :         /* Prepare for new result set */
    1171          38 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1172             :         TupleDesc   tupdesc;
    1173             : 
    1174             :         /*
    1175             :          * It's possible to get more than one result set if the query string
    1176             :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1177             :          * traditional behavior of throwing away all but the last result.
    1178             :          */
    1179          38 :         if (sinfo->tuplestore)
    1180           0 :             tuplestore_end(sinfo->tuplestore);
    1181          38 :         sinfo->tuplestore = NULL;
    1182             : 
    1183             :         /* get a tuple descriptor for our result type */
    1184          38 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1185             :         {
    1186             :             case TYPEFUNC_COMPOSITE:
    1187             :                 /* success */
    1188          38 :                 break;
    1189             :             case TYPEFUNC_RECORD:
    1190             :                 /* failed to determine actual type of RECORD */
    1191           0 :                 ereport(ERROR,
    1192             :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1193             :                          errmsg("function returning record called in context "
    1194             :                                 "that cannot accept type record")));
    1195             :                 break;
    1196             :             default:
    1197             :                 /* result type isn't composite */
    1198           0 :                 elog(ERROR, "return type must be a row type");
    1199             :                 break;
    1200             :         }
    1201             : 
    1202             :         /* make sure we have a persistent copy of the tupdesc */
    1203          38 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1204             : 
    1205             :         /* check result and tuple descriptor have the same number of columns */
    1206          38 :         if (nfields != tupdesc->natts)
    1207           0 :             ereport(ERROR,
    1208             :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1209             :                      errmsg("remote query result rowtype does not match "
    1210             :                             "the specified FROM clause rowtype")));
    1211             : 
    1212             :         /* Prepare attinmeta for later data conversions */
    1213          38 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1214             : 
    1215             :         /* Create a new, empty tuplestore */
    1216          38 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1217          38 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1218          38 :         rsinfo->setResult = sinfo->tuplestore;
    1219          38 :         rsinfo->setDesc = tupdesc;
    1220          38 :         MemoryContextSwitchTo(oldcontext);
    1221             : 
    1222             :         /* Done if empty resultset */
    1223          38 :         if (PQntuples(res) == 0)
    1224           0 :             return;
    1225             : 
    1226             :         /*
    1227             :          * Set up sufficiently-wide string pointers array; this won't change
    1228             :          * in size so it's easy to preallocate.
    1229             :          */
    1230          38 :         if (sinfo->cstrs)
    1231           0 :             pfree(sinfo->cstrs);
    1232          38 :         sinfo->cstrs = (char **) palloc(nfields * sizeof(char *));
    1233             :     }
    1234             : 
    1235             :     /* Should have a single-row result if we get here */
    1236             :     Assert(PQntuples(res) == 1);
    1237             : 
    1238             :     /*
    1239             :      * Do the following work in a temp context that we reset after each tuple.
    1240             :      * This cleans up not only the data we have direct access to, but any
    1241             :      * cruft the I/O functions might leak.
    1242             :      */
    1243         264 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1244             : 
    1245             :     /*
    1246             :      * Fill cstrs with null-terminated strings of column values.
    1247             :      */
    1248        1020 :     for (i = 0; i < nfields; i++)
    1249             :     {
    1250         756 :         if (PQgetisnull(res, 0, i))
    1251           0 :             sinfo->cstrs[i] = NULL;
    1252             :         else
    1253         756 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1254             :     }
    1255             : 
    1256             :     /* Convert row to a tuple, and add it to the tuplestore */
    1257         264 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1258             : 
    1259         264 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1260             : 
    1261             :     /* Clean up */
    1262         264 :     MemoryContextSwitchTo(oldcontext);
    1263         264 :     MemoryContextReset(sinfo->tmpcontext);
    1264             : }
    1265             : 
    1266             : /*
    1267             :  * List all open dblink connections by name.
    1268             :  * Returns an array of all connection names.
    1269             :  * Takes no params
    1270             :  */
    1271           4 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1272             : Datum
    1273           2 : dblink_get_connections(PG_FUNCTION_ARGS)
    1274             : {
    1275             :     HASH_SEQ_STATUS status;
    1276             :     remoteConnHashEnt *hentry;
    1277           2 :     ArrayBuildState *astate = NULL;
    1278             : 
    1279           2 :     if (remoteConnHash)
    1280             :     {
    1281           2 :         hash_seq_init(&status, remoteConnHash);
    1282          10 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1283             :         {
    1284             :             /* stash away current value */
    1285          12 :             astate = accumArrayResult(astate,
    1286           6 :                                       CStringGetTextDatum(hentry->name),
    1287             :                                       false, TEXTOID, CurrentMemoryContext);
    1288             :         }
    1289             :     }
    1290             : 
    1291           2 :     if (astate)
    1292           2 :         PG_RETURN_ARRAYTYPE_P(makeArrayResult(astate,
    1293             :                                               CurrentMemoryContext));
    1294             :     else
    1295           0 :         PG_RETURN_NULL();
    1296             : }
    1297             : 
    1298             : /*
    1299             :  * Checks if a given remote connection is busy
    1300             :  *
    1301             :  * Returns 1 if the connection is busy, 0 otherwise
    1302             :  * Params:
    1303             :  *  text connection_name - name of the connection to check
    1304             :  *
    1305             :  */
    1306           4 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1307             : Datum
    1308           2 : dblink_is_busy(PG_FUNCTION_ARGS)
    1309             : {
    1310             :     PGconn     *conn;
    1311             : 
    1312           2 :     dblink_init();
    1313           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1314             : 
    1315           2 :     PQconsumeInput(conn);
    1316           2 :     PG_RETURN_INT32(PQisBusy(conn));
    1317             : }
    1318             : 
    1319             : /*
    1320             :  * Cancels a running request on a connection
    1321             :  *
    1322             :  * Returns text:
    1323             :  *  "OK" if the cancel request has been sent correctly,
    1324             :  *      an error message otherwise
    1325             :  *
    1326             :  * Params:
    1327             :  *  text connection_name - name of the connection to check
    1328             :  *
    1329             :  */
    1330           4 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1331             : Datum
    1332           2 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1333             : {
    1334             :     int         res;
    1335             :     PGconn     *conn;
    1336             :     PGcancel   *cancel;
    1337             :     char        errbuf[256];
    1338             : 
    1339           2 :     dblink_init();
    1340           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1341           2 :     cancel = PQgetCancel(conn);
    1342             : 
    1343           2 :     res = PQcancel(cancel, errbuf, 256);
    1344           2 :     PQfreeCancel(cancel);
    1345             : 
    1346           2 :     if (res == 1)
    1347           2 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1348             :     else
    1349           0 :         PG_RETURN_TEXT_P(cstring_to_text(errbuf));
    1350             : }
    1351             : 
    1352             : 
    1353             : /*
    1354             :  * Get error message from a connection
    1355             :  *
    1356             :  * Returns text:
    1357             :  *  "OK" if no error, an error message otherwise
    1358             :  *
    1359             :  * Params:
    1360             :  *  text connection_name - name of the connection to check
    1361             :  *
    1362             :  */
    1363           4 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1364             : Datum
    1365           2 : dblink_error_message(PG_FUNCTION_ARGS)
    1366             : {
    1367             :     char       *msg;
    1368             :     PGconn     *conn;
    1369             : 
    1370           2 :     dblink_init();
    1371           2 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1372             : 
    1373           2 :     msg = PQerrorMessage(conn);
    1374           2 :     if (msg == NULL || msg[0] == '\0')
    1375           2 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1376             :     else
    1377           0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1378             : }
    1379             : 
    1380             : /*
    1381             :  * Execute an SQL non-SELECT command
    1382             :  */
    1383          18 : PG_FUNCTION_INFO_V1(dblink_exec);
    1384             : Datum
    1385          52 : dblink_exec(PG_FUNCTION_ARGS)
    1386             : {
    1387          52 :     text       *volatile sql_cmd_status = NULL;
    1388          52 :     PGconn     *volatile conn = NULL;
    1389          52 :     volatile bool freeconn = false;
    1390             : 
    1391          52 :     dblink_init();
    1392             : 
    1393          52 :     PG_TRY();
    1394             :     {
    1395          52 :         PGresult   *res = NULL;
    1396          52 :         char       *sql = NULL;
    1397          52 :         char       *conname = NULL;
    1398          52 :         bool        fail = true;    /* default to backward compatible behavior */
    1399             : 
    1400          52 :         if (PG_NARGS() == 3)
    1401             :         {
    1402             :             /* must be text,text,bool */
    1403           0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1404           0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1405           0 :             fail = PG_GETARG_BOOL(2);
    1406           0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1407             :         }
    1408          52 :         else if (PG_NARGS() == 2)
    1409             :         {
    1410             :             /* might be text,text or text,bool */
    1411          34 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1412             :             {
    1413           2 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1414           2 :                 fail = PG_GETARG_BOOL(1);
    1415           2 :                 conn = pconn->conn;
    1416             :             }
    1417             :             else
    1418             :             {
    1419          32 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1420          32 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1421          32 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1422             :             }
    1423             :         }
    1424          18 :         else if (PG_NARGS() == 1)
    1425             :         {
    1426             :             /* must be single text argument */
    1427          18 :             conn = pconn->conn;
    1428          18 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1429             :         }
    1430             :         else
    1431             :             /* shouldn't happen */
    1432           0 :             elog(ERROR, "wrong number of arguments");
    1433             : 
    1434          52 :         if (!conn)
    1435           0 :             dblink_conn_not_avail(conname);
    1436             : 
    1437          52 :         res = PQexec(conn, sql);
    1438         104 :         if (!res ||
    1439          56 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1440           4 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1441             :         {
    1442           4 :             dblink_res_error(conn, conname, res, fail,
    1443             :                              "while executing command");
    1444             : 
    1445             :             /*
    1446             :              * and save a copy of the command status string to return as our
    1447             :              * result tuple
    1448             :              */
    1449           2 :             sql_cmd_status = cstring_to_text("ERROR");
    1450             :         }
    1451          48 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1452             :         {
    1453             :             /*
    1454             :              * and save a copy of the command status string to return as our
    1455             :              * result tuple
    1456             :              */
    1457          48 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1458          48 :             PQclear(res);
    1459             :         }
    1460             :         else
    1461             :         {
    1462           0 :             PQclear(res);
    1463           0 :             ereport(ERROR,
    1464             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1465             :                      errmsg("statement returning results not allowed")));
    1466             :         }
    1467             :     }
    1468           2 :     PG_CATCH();
    1469             :     {
    1470             :         /* if needed, close the connection to the database */
    1471           2 :         if (freeconn)
    1472           0 :             PQfinish(conn);
    1473           2 :         PG_RE_THROW();
    1474             :     }
    1475          50 :     PG_END_TRY();
    1476             : 
    1477             :     /* if needed, close the connection to the database */
    1478          50 :     if (freeconn)
    1479           2 :         PQfinish(conn);
    1480             : 
    1481          50 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1482             : }
    1483             : 
    1484             : 
    1485             : /*
    1486             :  * dblink_get_pkey
    1487             :  *
    1488             :  * Return list of primary key fields for the supplied relation,
    1489             :  * or NULL if none exists.
    1490             :  */
    1491           4 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1492             : Datum
    1493          18 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1494             : {
    1495             :     int16       indnkeyatts;
    1496             :     char      **results;
    1497             :     FuncCallContext *funcctx;
    1498             :     int32       call_cntr;
    1499             :     int32       max_calls;
    1500             :     AttInMetadata *attinmeta;
    1501             :     MemoryContext oldcontext;
    1502             : 
    1503             :     /* stuff done only on the first call of the function */
    1504          18 :     if (SRF_IS_FIRSTCALL())
    1505             :     {
    1506             :         Relation    rel;
    1507             :         TupleDesc   tupdesc;
    1508             : 
    1509             :         /* create a function context for cross-call persistence */
    1510           6 :         funcctx = SRF_FIRSTCALL_INIT();
    1511             : 
    1512             :         /*
    1513             :          * switch to memory context appropriate for multiple function calls
    1514             :          */
    1515           6 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1516             : 
    1517             :         /* open target relation */
    1518           6 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1519             : 
    1520             :         /* get the array of attnums */
    1521           6 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1522             : 
    1523           6 :         relation_close(rel, AccessShareLock);
    1524             : 
    1525             :         /*
    1526             :          * need a tuple descriptor representing one INT and one TEXT column
    1527             :          */
    1528           6 :         tupdesc = CreateTemplateTupleDesc(2);
    1529           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1530             :                            INT4OID, -1, 0);
    1531           6 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1532             :                            TEXTOID, -1, 0);
    1533             : 
    1534             :         /*
    1535             :          * Generate attribute metadata needed later to produce tuples from raw
    1536             :          * C strings
    1537             :          */
    1538           6 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1539           6 :         funcctx->attinmeta = attinmeta;
    1540             : 
    1541           6 :         if ((results != NULL) && (indnkeyatts > 0))
    1542             :         {
    1543           6 :             funcctx->max_calls = indnkeyatts;
    1544             : 
    1545             :             /* got results, keep track of them */
    1546           6 :             funcctx->user_fctx = results;
    1547             :         }
    1548             :         else
    1549             :         {
    1550             :             /* fast track when no results */
    1551           0 :             MemoryContextSwitchTo(oldcontext);
    1552           0 :             SRF_RETURN_DONE(funcctx);
    1553             :         }
    1554             : 
    1555           6 :         MemoryContextSwitchTo(oldcontext);
    1556             :     }
    1557             : 
    1558             :     /* stuff done on every call of the function */
    1559          18 :     funcctx = SRF_PERCALL_SETUP();
    1560             : 
    1561             :     /*
    1562             :      * initialize per-call variables
    1563             :      */
    1564          18 :     call_cntr = funcctx->call_cntr;
    1565          18 :     max_calls = funcctx->max_calls;
    1566             : 
    1567          18 :     results = (char **) funcctx->user_fctx;
    1568          18 :     attinmeta = funcctx->attinmeta;
    1569             : 
    1570          18 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1571             :     {
    1572             :         char      **values;
    1573             :         HeapTuple   tuple;
    1574             :         Datum       result;
    1575             : 
    1576          12 :         values = (char **) palloc(2 * sizeof(char *));
    1577          12 :         values[0] = psprintf("%d", call_cntr + 1);
    1578          12 :         values[1] = results[call_cntr];
    1579             : 
    1580             :         /* build the tuple */
    1581          12 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1582             : 
    1583             :         /* make the tuple into a datum */
    1584          12 :         result = HeapTupleGetDatum(tuple);
    1585             : 
    1586          12 :         SRF_RETURN_NEXT(funcctx, result);
    1587             :     }
    1588             :     else
    1589             :     {
    1590             :         /* do when there is no more left */
    1591           6 :         SRF_RETURN_DONE(funcctx);
    1592             :     }
    1593             : }
    1594             : 
    1595             : 
    1596             : /*
    1597             :  * dblink_build_sql_insert
    1598             :  *
    1599             :  * Used to generate an SQL insert statement
    1600             :  * based on an existing tuple in a local relation.
    1601             :  * This is useful for selectively replicating data
    1602             :  * to another server via dblink.
    1603             :  *
    1604             :  * API:
    1605             :  * <relname> - name of local table of interest
    1606             :  * <pkattnums> - an int2vector of attnums which will be used
    1607             :  * to identify the local tuple of interest
    1608             :  * <pknumatts> - number of attnums in pkattnums
    1609             :  * <src_pkattvals_arry> - text array of key values which will be used
    1610             :  * to identify the local tuple of interest
    1611             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1612             :  * to build the string for execution remotely. These are substituted
    1613             :  * for their counterparts in src_pkattvals_arry
    1614             :  */
    1615           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1616             : Datum
    1617          12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1618             : {
    1619          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1620          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1621          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1622          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1623          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1624             :     Relation    rel;
    1625             :     int        *pkattnums;
    1626             :     int         pknumatts;
    1627             :     char      **src_pkattvals;
    1628             :     char      **tgt_pkattvals;
    1629             :     int         src_nitems;
    1630             :     int         tgt_nitems;
    1631             :     char       *sql;
    1632             : 
    1633             :     /*
    1634             :      * Open target relation.
    1635             :      */
    1636          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1637             : 
    1638             :     /*
    1639             :      * Process pkattnums argument.
    1640             :      */
    1641          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1642             :                        &pkattnums, &pknumatts);
    1643             : 
    1644             :     /*
    1645             :      * Source array is made up of key values that will be used to locate the
    1646             :      * tuple of interest from the local system.
    1647             :      */
    1648           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1649             : 
    1650             :     /*
    1651             :      * There should be one source array key value for each key attnum
    1652             :      */
    1653           8 :     if (src_nitems != pknumatts)
    1654           0 :         ereport(ERROR,
    1655             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1656             :                  errmsg("source key array length must match number of key " \
    1657             :                         "attributes")));
    1658             : 
    1659             :     /*
    1660             :      * Target array is made up of key values that will be used to build the
    1661             :      * SQL string for use on the remote system.
    1662             :      */
    1663           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1664             : 
    1665             :     /*
    1666             :      * There should be one target array key value for each key attnum
    1667             :      */
    1668           8 :     if (tgt_nitems != pknumatts)
    1669           0 :         ereport(ERROR,
    1670             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1671             :                  errmsg("target key array length must match number of key " \
    1672             :                         "attributes")));
    1673             : 
    1674             :     /*
    1675             :      * Prep work is finally done. Go get the SQL string.
    1676             :      */
    1677           8 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1678             : 
    1679             :     /*
    1680             :      * Now we can close the relation.
    1681             :      */
    1682           8 :     relation_close(rel, AccessShareLock);
    1683             : 
    1684             :     /*
    1685             :      * And send it
    1686             :      */
    1687           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1688             : }
    1689             : 
    1690             : 
    1691             : /*
    1692             :  * dblink_build_sql_delete
    1693             :  *
    1694             :  * Used to generate an SQL delete statement.
    1695             :  * This is useful for selectively replicating a
    1696             :  * delete to another server via dblink.
    1697             :  *
    1698             :  * API:
    1699             :  * <relname> - name of remote table of interest
    1700             :  * <pkattnums> - an int2vector of attnums which will be used
    1701             :  * to identify the remote tuple of interest
    1702             :  * <pknumatts> - number of attnums in pkattnums
    1703             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1704             :  * to build the string for execution remotely.
    1705             :  */
    1706           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1707             : Datum
    1708          12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1709             : {
    1710          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1711          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1712          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1713          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1714             :     Relation    rel;
    1715             :     int        *pkattnums;
    1716             :     int         pknumatts;
    1717             :     char      **tgt_pkattvals;
    1718             :     int         tgt_nitems;
    1719             :     char       *sql;
    1720             : 
    1721             :     /*
    1722             :      * Open target relation.
    1723             :      */
    1724          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1725             : 
    1726             :     /*
    1727             :      * Process pkattnums argument.
    1728             :      */
    1729          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1730             :                        &pkattnums, &pknumatts);
    1731             : 
    1732             :     /*
    1733             :      * Target array is made up of key values that will be used to build the
    1734             :      * SQL string for use on the remote system.
    1735             :      */
    1736           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1737             : 
    1738             :     /*
    1739             :      * There should be one target array key value for each key attnum
    1740             :      */
    1741           8 :     if (tgt_nitems != pknumatts)
    1742           0 :         ereport(ERROR,
    1743             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1744             :                  errmsg("target key array length must match number of key " \
    1745             :                         "attributes")));
    1746             : 
    1747             :     /*
    1748             :      * Prep work is finally done. Go get the SQL string.
    1749             :      */
    1750           8 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1751             : 
    1752             :     /*
    1753             :      * Now we can close the relation.
    1754             :      */
    1755           8 :     relation_close(rel, AccessShareLock);
    1756             : 
    1757             :     /*
    1758             :      * And send it
    1759             :      */
    1760           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1761             : }
    1762             : 
    1763             : 
    1764             : /*
    1765             :  * dblink_build_sql_update
    1766             :  *
    1767             :  * Used to generate an SQL update statement
    1768             :  * based on an existing tuple in a local relation.
    1769             :  * This is useful for selectively replicating data
    1770             :  * to another server via dblink.
    1771             :  *
    1772             :  * API:
    1773             :  * <relname> - name of local table of interest
    1774             :  * <pkattnums> - an int2vector of attnums which will be used
    1775             :  * to identify the local tuple of interest
    1776             :  * <pknumatts> - number of attnums in pkattnums
    1777             :  * <src_pkattvals_arry> - text array of key values which will be used
    1778             :  * to identify the local tuple of interest
    1779             :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1780             :  * to build the string for execution remotely. These are substituted
    1781             :  * for their counterparts in src_pkattvals_arry
    1782             :  */
    1783           6 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1784             : Datum
    1785          12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1786             : {
    1787          12 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1788          12 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1789          12 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1790          12 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1791          12 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1792             :     Relation    rel;
    1793             :     int        *pkattnums;
    1794             :     int         pknumatts;
    1795             :     char      **src_pkattvals;
    1796             :     char      **tgt_pkattvals;
    1797             :     int         src_nitems;
    1798             :     int         tgt_nitems;
    1799             :     char       *sql;
    1800             : 
    1801             :     /*
    1802             :      * Open target relation.
    1803             :      */
    1804          12 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1805             : 
    1806             :     /*
    1807             :      * Process pkattnums argument.
    1808             :      */
    1809          12 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1810             :                        &pkattnums, &pknumatts);
    1811             : 
    1812             :     /*
    1813             :      * Source array is made up of key values that will be used to locate the
    1814             :      * tuple of interest from the local system.
    1815             :      */
    1816           8 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1817             : 
    1818             :     /*
    1819             :      * There should be one source array key value for each key attnum
    1820             :      */
    1821           8 :     if (src_nitems != pknumatts)
    1822           0 :         ereport(ERROR,
    1823             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1824             :                  errmsg("source key array length must match number of key " \
    1825             :                         "attributes")));
    1826             : 
    1827             :     /*
    1828             :      * Target array is made up of key values that will be used to build the
    1829             :      * SQL string for use on the remote system.
    1830             :      */
    1831           8 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1832             : 
    1833             :     /*
    1834             :      * There should be one target array key value for each key attnum
    1835             :      */
    1836           8 :     if (tgt_nitems != pknumatts)
    1837           0 :         ereport(ERROR,
    1838             :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1839             :                  errmsg("target key array length must match number of key " \
    1840             :                         "attributes")));
    1841             : 
    1842             :     /*
    1843             :      * Prep work is finally done. Go get the SQL string.
    1844             :      */
    1845           8 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1846             : 
    1847             :     /*
    1848             :      * Now we can close the relation.
    1849             :      */
    1850           8 :     relation_close(rel, AccessShareLock);
    1851             : 
    1852             :     /*
    1853             :      * And send it
    1854             :      */
    1855           8 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1856             : }
    1857             : 
    1858             : /*
    1859             :  * dblink_current_query
    1860             :  * return the current query string
    1861             :  * to allow its use in (among other things)
    1862             :  * rewrite rules
    1863             :  */
    1864           2 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1865             : Datum
    1866           0 : dblink_current_query(PG_FUNCTION_ARGS)
    1867             : {
    1868             :     /* This is now just an alias for the built-in function current_query() */
    1869           0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1870             : }
    1871             : 
    1872             : /*
    1873             :  * Retrieve async notifications for a connection.
    1874             :  *
    1875             :  * Returns a setof record of notifications, or an empty set if none received.
    1876             :  * Can optionally take a named connection as parameter, but uses the unnamed
    1877             :  * connection per default.
    1878             :  *
    1879             :  */
    1880             : #define DBLINK_NOTIFY_COLS      3
    1881             : 
    1882           6 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1883             : Datum
    1884           4 : dblink_get_notify(PG_FUNCTION_ARGS)
    1885             : {
    1886             :     PGconn     *conn;
    1887             :     PGnotify   *notify;
    1888           4 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1889             :     TupleDesc   tupdesc;
    1890             :     Tuplestorestate *tupstore;
    1891             :     MemoryContext per_query_ctx;
    1892             :     MemoryContext oldcontext;
    1893             : 
    1894           4 :     prepTuplestoreResult(fcinfo);
    1895             : 
    1896           4 :     dblink_init();
    1897           4 :     if (PG_NARGS() == 1)
    1898           0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1899             :     else
    1900           4 :         conn = pconn->conn;
    1901             : 
    1902             :     /* create the tuplestore in per-query memory */
    1903           4 :     per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
    1904           4 :     oldcontext = MemoryContextSwitchTo(per_query_ctx);
    1905             : 
    1906           4 :     tupdesc = CreateTemplateTupleDesc(DBLINK_NOTIFY_COLS);
    1907           4 :     TupleDescInitEntry(tupdesc, (AttrNumber) 1, "notify_name",
    1908             :                        TEXTOID, -1, 0);
    1909           4 :     TupleDescInitEntry(tupdesc, (AttrNumber) 2, "be_pid",
    1910             :                        INT4OID, -1, 0);
    1911           4 :     TupleDescInitEntry(tupdesc, (AttrNumber) 3, "extra",
    1912             :                        TEXTOID, -1, 0);
    1913             : 
    1914           4 :     tupstore = tuplestore_begin_heap(true, false, work_mem);
    1915           4 :     rsinfo->setResult = tupstore;
    1916           4 :     rsinfo->setDesc = tupdesc;
    1917             : 
    1918           4 :     MemoryContextSwitchTo(oldcontext);
    1919             : 
    1920           4 :     PQconsumeInput(conn);
    1921          12 :     while ((notify = PQnotifies(conn)) != NULL)
    1922             :     {
    1923             :         Datum       values[DBLINK_NOTIFY_COLS];
    1924             :         bool        nulls[DBLINK_NOTIFY_COLS];
    1925             : 
    1926           4 :         memset(values, 0, sizeof(values));
    1927           4 :         memset(nulls, 0, sizeof(nulls));
    1928             : 
    1929           4 :         if (notify->relname != NULL)
    1930           4 :             values[0] = CStringGetTextDatum(notify->relname);
    1931             :         else
    1932           0 :             nulls[0] = true;
    1933             : 
    1934           4 :         values[1] = Int32GetDatum(notify->be_pid);
    1935             : 
    1936           4 :         if (notify->extra != NULL)
    1937           4 :             values[2] = CStringGetTextDatum(notify->extra);
    1938             :         else
    1939           0 :             nulls[2] = true;
    1940             : 
    1941           4 :         tuplestore_putvalues(tupstore, tupdesc, values, nulls);
    1942             : 
    1943           4 :         PQfreemem(notify);
    1944           4 :         PQconsumeInput(conn);
    1945             :     }
    1946             : 
    1947             :     /* clean up and return the tuplestore */
    1948             :     tuplestore_donestoring(tupstore);
    1949             : 
    1950           4 :     return (Datum) 0;
    1951             : }
    1952             : 
    1953             : /*
    1954             :  * Validate the options given to a dblink foreign server or user mapping.
    1955             :  * Raise an error if any option is invalid.
    1956             :  *
    1957             :  * We just check the names of options here, so semantic errors in options,
    1958             :  * such as invalid numeric format, will be detected at the attempt to connect.
    1959             :  */
    1960           4 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1961             : Datum
    1962           8 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1963             : {
    1964           8 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1965           8 :     Oid         context = PG_GETARG_OID(1);
    1966             :     ListCell   *cell;
    1967             : 
    1968             :     static const PQconninfoOption *options = NULL;
    1969             : 
    1970             :     /*
    1971             :      * Get list of valid libpq options.
    1972             :      *
    1973             :      * To avoid unnecessary work, we get the list once and use it throughout
    1974             :      * the lifetime of this backend process.  We don't need to care about
    1975             :      * memory context issues, because PQconndefaults allocates with malloc.
    1976             :      */
    1977           8 :     if (!options)
    1978             :     {
    1979           2 :         options = PQconndefaults();
    1980           2 :         if (!options)           /* assume reason for failure is OOM */
    1981           0 :             ereport(ERROR,
    1982             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1983             :                      errmsg("out of memory"),
    1984             :                      errdetail("Could not get libpq's default connection options.")));
    1985             :     }
    1986             : 
    1987             :     /* Validate each supplied option. */
    1988          14 :     foreach(cell, options_list)
    1989             :     {
    1990           8 :         DefElem    *def = (DefElem *) lfirst(cell);
    1991             : 
    1992           8 :         if (!is_valid_dblink_option(options, def->defname, context))
    1993             :         {
    1994             :             /*
    1995             :              * Unknown option, or invalid option for the context specified, so
    1996             :              * complain about it.  Provide a hint with list of valid options
    1997             :              * for the context.
    1998             :              */
    1999             :             StringInfoData buf;
    2000             :             const PQconninfoOption *opt;
    2001             : 
    2002           2 :             initStringInfo(&buf);
    2003          62 :             for (opt = options; opt->keyword; opt++)
    2004             :             {
    2005          60 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    2006           8 :                     appendStringInfo(&buf, "%s%s",
    2007           4 :                                      (buf.len > 0) ? ", " : "",
    2008             :                                      opt->keyword);
    2009             :             }
    2010           2 :             ereport(ERROR,
    2011             :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    2012             :                      errmsg("invalid option \"%s\"", def->defname),
    2013             :                      errhint("Valid options in this context are: %s",
    2014             :                              buf.data)));
    2015             :         }
    2016             :     }
    2017             : 
    2018           6 :     PG_RETURN_VOID();
    2019             : }
    2020             : 
    2021             : 
    2022             : /*************************************************************
    2023             :  * internal functions
    2024             :  */
    2025             : 
    2026             : 
    2027             : /*
    2028             :  * get_pkey_attnames
    2029             :  *
    2030             :  * Get the primary key attnames for the given relation.
    2031             :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2032             :  */
    2033             : static char **
    2034           6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2035             : {
    2036             :     Relation    indexRelation;
    2037             :     ScanKeyData skey;
    2038             :     SysScanDesc scan;
    2039             :     HeapTuple   indexTuple;
    2040             :     int         i;
    2041           6 :     char      **result = NULL;
    2042             :     TupleDesc   tupdesc;
    2043             : 
    2044             :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2045           6 :     *indnkeyatts = 0;
    2046             : 
    2047           6 :     tupdesc = rel->rd_att;
    2048             : 
    2049             :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2050           6 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2051           6 :     ScanKeyInit(&skey,
    2052             :                 Anum_pg_index_indrelid,
    2053             :                 BTEqualStrategyNumber, F_OIDEQ,
    2054           6 :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2055             : 
    2056           6 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2057             :                               NULL, 1, &skey);
    2058             : 
    2059           6 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2060             :     {
    2061           6 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2062             : 
    2063             :         /* we're only interested if it is the primary key */
    2064           6 :         if (index->indisprimary)
    2065             :         {
    2066           6 :             *indnkeyatts = index->indnkeyatts;
    2067           6 :             if (*indnkeyatts > 0)
    2068             :             {
    2069           6 :                 result = (char **) palloc(*indnkeyatts * sizeof(char *));
    2070             : 
    2071          18 :                 for (i = 0; i < *indnkeyatts; i++)
    2072          12 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2073             :             }
    2074           6 :             break;
    2075             :         }
    2076             :     }
    2077             : 
    2078           6 :     systable_endscan(scan);
    2079           6 :     table_close(indexRelation, AccessShareLock);
    2080             : 
    2081           6 :     return result;
    2082             : }
    2083             : 
    2084             : /*
    2085             :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2086             :  * returned as NULL pointers)
    2087             :  */
    2088             : static char **
    2089          40 : get_text_array_contents(ArrayType *array, int *numitems)
    2090             : {
    2091          40 :     int         ndim = ARR_NDIM(array);
    2092          40 :     int        *dims = ARR_DIMS(array);
    2093             :     int         nitems;
    2094             :     int16       typlen;
    2095             :     bool        typbyval;
    2096             :     char        typalign;
    2097             :     char      **values;
    2098             :     char       *ptr;
    2099             :     bits8      *bitmap;
    2100             :     int         bitmask;
    2101             :     int         i;
    2102             : 
    2103             :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2104             : 
    2105          40 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2106             : 
    2107          40 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2108             :                          &typlen, &typbyval, &typalign);
    2109             : 
    2110          40 :     values = (char **) palloc(nitems * sizeof(char *));
    2111             : 
    2112          40 :     ptr = ARR_DATA_PTR(array);
    2113          40 :     bitmap = ARR_NULLBITMAP(array);
    2114          40 :     bitmask = 1;
    2115             : 
    2116         110 :     for (i = 0; i < nitems; i++)
    2117             :     {
    2118          70 :         if (bitmap && (*bitmap & bitmask) == 0)
    2119             :         {
    2120           0 :             values[i] = NULL;
    2121             :         }
    2122             :         else
    2123             :         {
    2124          70 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2125          70 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2126          70 :             ptr = (char *) att_align_nominal(ptr, typalign);
    2127             :         }
    2128             : 
    2129             :         /* advance bitmap pointer if any */
    2130          70 :         if (bitmap)
    2131             :         {
    2132           0 :             bitmask <<= 1;
    2133           0 :             if (bitmask == 0x100)
    2134             :             {
    2135           0 :                 bitmap++;
    2136           0 :                 bitmask = 1;
    2137             :             }
    2138             :         }
    2139             :     }
    2140             : 
    2141          40 :     return values;
    2142             : }
    2143             : 
    2144             : static char *
    2145           8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2146             : {
    2147             :     char       *relname;
    2148             :     HeapTuple   tuple;
    2149             :     TupleDesc   tupdesc;
    2150             :     int         natts;
    2151             :     StringInfoData buf;
    2152             :     char       *val;
    2153             :     int         key;
    2154             :     int         i;
    2155             :     bool        needComma;
    2156             : 
    2157           8 :     initStringInfo(&buf);
    2158             : 
    2159             :     /* get relation name including any needed schema prefix and quoting */
    2160           8 :     relname = generate_relation_name(rel);
    2161             : 
    2162           8 :     tupdesc = rel->rd_att;
    2163           8 :     natts = tupdesc->natts;
    2164             : 
    2165           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2166           8 :     if (!tuple)
    2167           0 :         ereport(ERROR,
    2168             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2169             :                  errmsg("source row not found")));
    2170             : 
    2171           8 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2172             : 
    2173           8 :     needComma = false;
    2174          38 :     for (i = 0; i < natts; i++)
    2175             :     {
    2176          30 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2177             : 
    2178          30 :         if (att->attisdropped)
    2179           4 :             continue;
    2180             : 
    2181          26 :         if (needComma)
    2182          18 :             appendStringInfoChar(&buf, ',');
    2183             : 
    2184          26 :         appendStringInfoString(&buf,
    2185          26 :                                quote_ident_cstr(NameStr(att->attname)));
    2186          26 :         needComma = true;
    2187             :     }
    2188             : 
    2189           8 :     appendStringInfoString(&buf, ") VALUES(");
    2190             : 
    2191             :     /*
    2192             :      * Note: i is physical column number (counting from 0).
    2193             :      */
    2194           8 :     needComma = false;
    2195          38 :     for (i = 0; i < natts; i++)
    2196             :     {
    2197          30 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2198           4 :             continue;
    2199             : 
    2200          26 :         if (needComma)
    2201          18 :             appendStringInfoChar(&buf, ',');
    2202             : 
    2203          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2204             : 
    2205          26 :         if (key >= 0)
    2206          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2207             :         else
    2208          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2209             : 
    2210          26 :         if (val != NULL)
    2211             :         {
    2212          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2213          26 :             pfree(val);
    2214             :         }
    2215             :         else
    2216           0 :             appendStringInfoString(&buf, "NULL");
    2217          26 :         needComma = true;
    2218             :     }
    2219           8 :     appendStringInfoChar(&buf, ')');
    2220             : 
    2221           8 :     return buf.data;
    2222             : }
    2223             : 
    2224             : static char *
    2225           8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2226             : {
    2227             :     char       *relname;
    2228             :     TupleDesc   tupdesc;
    2229             :     StringInfoData buf;
    2230             :     int         i;
    2231             : 
    2232           8 :     initStringInfo(&buf);
    2233             : 
    2234             :     /* get relation name including any needed schema prefix and quoting */
    2235           8 :     relname = generate_relation_name(rel);
    2236             : 
    2237           8 :     tupdesc = rel->rd_att;
    2238             : 
    2239           8 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2240          22 :     for (i = 0; i < pknumatts; i++)
    2241             :     {
    2242          14 :         int         pkattnum = pkattnums[i];
    2243          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2244             : 
    2245          14 :         if (i > 0)
    2246           6 :             appendStringInfoString(&buf, " AND ");
    2247             : 
    2248          14 :         appendStringInfoString(&buf,
    2249          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2250             : 
    2251          14 :         if (tgt_pkattvals[i] != NULL)
    2252          14 :             appendStringInfo(&buf, " = %s",
    2253          14 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2254             :         else
    2255           0 :             appendStringInfoString(&buf, " IS NULL");
    2256             :     }
    2257             : 
    2258           8 :     return buf.data;
    2259             : }
    2260             : 
    2261             : static char *
    2262           8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2263             : {
    2264             :     char       *relname;
    2265             :     HeapTuple   tuple;
    2266             :     TupleDesc   tupdesc;
    2267             :     int         natts;
    2268             :     StringInfoData buf;
    2269             :     char       *val;
    2270             :     int         key;
    2271             :     int         i;
    2272             :     bool        needComma;
    2273             : 
    2274           8 :     initStringInfo(&buf);
    2275             : 
    2276             :     /* get relation name including any needed schema prefix and quoting */
    2277           8 :     relname = generate_relation_name(rel);
    2278             : 
    2279           8 :     tupdesc = rel->rd_att;
    2280           8 :     natts = tupdesc->natts;
    2281             : 
    2282           8 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2283           8 :     if (!tuple)
    2284           0 :         ereport(ERROR,
    2285             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2286             :                  errmsg("source row not found")));
    2287             : 
    2288           8 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2289             : 
    2290             :     /*
    2291             :      * Note: i is physical column number (counting from 0).
    2292             :      */
    2293           8 :     needComma = false;
    2294          38 :     for (i = 0; i < natts; i++)
    2295             :     {
    2296          30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2297             : 
    2298          30 :         if (attr->attisdropped)
    2299           4 :             continue;
    2300             : 
    2301          26 :         if (needComma)
    2302          18 :             appendStringInfoString(&buf, ", ");
    2303             : 
    2304          26 :         appendStringInfo(&buf, "%s = ",
    2305          26 :                          quote_ident_cstr(NameStr(attr->attname)));
    2306             : 
    2307          26 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2308             : 
    2309          26 :         if (key >= 0)
    2310          14 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2311             :         else
    2312          12 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2313             : 
    2314          26 :         if (val != NULL)
    2315             :         {
    2316          26 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2317          26 :             pfree(val);
    2318             :         }
    2319             :         else
    2320           0 :             appendStringInfoString(&buf, "NULL");
    2321          26 :         needComma = true;
    2322             :     }
    2323             : 
    2324           8 :     appendStringInfoString(&buf, " WHERE ");
    2325             : 
    2326          22 :     for (i = 0; i < pknumatts; i++)
    2327             :     {
    2328          14 :         int         pkattnum = pkattnums[i];
    2329          14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2330             : 
    2331          14 :         if (i > 0)
    2332           6 :             appendStringInfoString(&buf, " AND ");
    2333             : 
    2334          14 :         appendStringInfoString(&buf,
    2335          14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2336             : 
    2337          14 :         val = tgt_pkattvals[i];
    2338             : 
    2339          14 :         if (val != NULL)
    2340          14 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2341             :         else
    2342           0 :             appendStringInfoString(&buf, " IS NULL");
    2343             :     }
    2344             : 
    2345           8 :     return buf.data;
    2346             : }
    2347             : 
    2348             : /*
    2349             :  * Return a properly quoted identifier.
    2350             :  * Uses quote_ident in quote.c
    2351             :  */
    2352             : static char *
    2353         160 : quote_ident_cstr(char *rawstr)
    2354             : {
    2355             :     text       *rawstr_text;
    2356             :     text       *result_text;
    2357             :     char       *result;
    2358             : 
    2359         160 :     rawstr_text = cstring_to_text(rawstr);
    2360         160 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2361             :                                                      PointerGetDatum(rawstr_text)));
    2362         160 :     result = text_to_cstring(result_text);
    2363             : 
    2364         160 :     return result;
    2365             : }
    2366             : 
    2367             : static int
    2368          52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2369             : {
    2370             :     int         i;
    2371             : 
    2372             :     /*
    2373             :      * Not likely a long list anyway, so just scan for the value
    2374             :      */
    2375         100 :     for (i = 0; i < pknumatts; i++)
    2376          76 :         if (key == pkattnums[i])
    2377          28 :             return i;
    2378             : 
    2379          24 :     return -1;
    2380             : }
    2381             : 
    2382             : static HeapTuple
    2383          16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2384             : {
    2385             :     char       *relname;
    2386             :     TupleDesc   tupdesc;
    2387             :     int         natts;
    2388             :     StringInfoData buf;
    2389             :     int         ret;
    2390             :     HeapTuple   tuple;
    2391             :     int         i;
    2392             : 
    2393             :     /*
    2394             :      * Connect to SPI manager
    2395             :      */
    2396          16 :     if ((ret = SPI_connect()) < 0)
    2397             :         /* internal error */
    2398           0 :         elog(ERROR, "SPI connect failure - returned %d", ret);
    2399             : 
    2400          16 :     initStringInfo(&buf);
    2401             : 
    2402             :     /* get relation name including any needed schema prefix and quoting */
    2403          16 :     relname = generate_relation_name(rel);
    2404             : 
    2405          16 :     tupdesc = rel->rd_att;
    2406          16 :     natts = tupdesc->natts;
    2407             : 
    2408             :     /*
    2409             :      * Build sql statement to look up tuple of interest, ie, the one matching
    2410             :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2411             :      * generate a result tuple that matches the table's physical structure,
    2412             :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2413             :      * different tupdescs and everything's very confusing.
    2414             :      */
    2415          16 :     appendStringInfoString(&buf, "SELECT ");
    2416             : 
    2417          76 :     for (i = 0; i < natts; i++)
    2418             :     {
    2419          60 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2420             : 
    2421          60 :         if (i > 0)
    2422          44 :             appendStringInfoString(&buf, ", ");
    2423             : 
    2424          60 :         if (attr->attisdropped)
    2425           8 :             appendStringInfoString(&buf, "NULL");
    2426             :         else
    2427          52 :             appendStringInfoString(&buf,
    2428          52 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2429             :     }
    2430             : 
    2431          16 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2432             : 
    2433          44 :     for (i = 0; i < pknumatts; i++)
    2434             :     {
    2435          28 :         int         pkattnum = pkattnums[i];
    2436          28 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2437             : 
    2438          28 :         if (i > 0)
    2439          12 :             appendStringInfoString(&buf, " AND ");
    2440             : 
    2441          28 :         appendStringInfoString(&buf,
    2442          28 :                                quote_ident_cstr(NameStr(attr->attname)));
    2443             : 
    2444          28 :         if (src_pkattvals[i] != NULL)
    2445          28 :             appendStringInfo(&buf, " = %s",
    2446          28 :                              quote_literal_cstr(src_pkattvals[i]));
    2447             :         else
    2448           0 :             appendStringInfoString(&buf, " IS NULL");
    2449             :     }
    2450             : 
    2451             :     /*
    2452             :      * Retrieve the desired tuple
    2453             :      */
    2454          16 :     ret = SPI_exec(buf.data, 0);
    2455          16 :     pfree(buf.data);
    2456             : 
    2457             :     /*
    2458             :      * Only allow one qualifying tuple
    2459             :      */
    2460          16 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2461           0 :         ereport(ERROR,
    2462             :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2463             :                  errmsg("source criteria matched more than one record")));
    2464             : 
    2465          16 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2466             :     {
    2467          16 :         SPITupleTable *tuptable = SPI_tuptable;
    2468             : 
    2469          16 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2470          16 :         SPI_finish();
    2471             : 
    2472          16 :         return tuple;
    2473             :     }
    2474             :     else
    2475             :     {
    2476             :         /*
    2477             :          * no qualifying tuples
    2478             :          */
    2479           0 :         SPI_finish();
    2480             : 
    2481           0 :         return NULL;
    2482             :     }
    2483             : 
    2484             :     /*
    2485             :      * never reached, but keep compiler quiet
    2486             :      */
    2487             :     return NULL;
    2488             : }
    2489             : 
    2490             : /*
    2491             :  * Open the relation named by relname_text, acquire specified type of lock,
    2492             :  * verify we have specified permissions.
    2493             :  * Caller must close rel when done with it.
    2494             :  */
    2495             : static Relation
    2496          42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2497             : {
    2498             :     RangeVar   *relvar;
    2499             :     Relation    rel;
    2500             :     AclResult   aclresult;
    2501             : 
    2502          42 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2503          42 :     rel = table_openrv(relvar, lockmode);
    2504             : 
    2505          42 :     aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
    2506             :                                   aclmode);
    2507          42 :     if (aclresult != ACLCHECK_OK)
    2508           0 :         aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
    2509           0 :                        RelationGetRelationName(rel));
    2510             : 
    2511          42 :     return rel;
    2512             : }
    2513             : 
    2514             : /*
    2515             :  * generate_relation_name - copied from ruleutils.c
    2516             :  *      Compute the name to display for a relation
    2517             :  *
    2518             :  * The result includes all necessary quoting and schema-prefixing.
    2519             :  */
    2520             : static char *
    2521          40 : generate_relation_name(Relation rel)
    2522             : {
    2523             :     char       *nspname;
    2524             :     char       *result;
    2525             : 
    2526             :     /* Qualify the name if not visible in search path */
    2527          40 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2528          30 :         nspname = NULL;
    2529             :     else
    2530          10 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2531             : 
    2532          40 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2533             : 
    2534          40 :     return result;
    2535             : }
    2536             : 
    2537             : 
    2538             : static remoteConn *
    2539         150 : getConnectionByName(const char *name)
    2540             : {
    2541             :     remoteConnHashEnt *hentry;
    2542             :     char       *key;
    2543             : 
    2544         150 :     if (!remoteConnHash)
    2545           4 :         remoteConnHash = createConnHash();
    2546             : 
    2547         150 :     key = pstrdup(name);
    2548         150 :     truncate_identifier(key, strlen(key), false);
    2549         150 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2550             :                                                key, HASH_FIND, NULL);
    2551             : 
    2552         150 :     if (hentry)
    2553         136 :         return hentry->rconn;
    2554             : 
    2555          14 :     return NULL;
    2556             : }
    2557             : 
    2558             : static HTAB *
    2559           6 : createConnHash(void)
    2560             : {
    2561             :     HASHCTL     ctl;
    2562             : 
    2563           6 :     ctl.keysize = NAMEDATALEN;
    2564           6 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2565             : 
    2566           6 :     return hash_create("Remote Con hash", NUMCONN, &ctl, HASH_ELEM);
    2567             : }
    2568             : 
    2569             : static void
    2570          20 : createNewConnection(const char *name, remoteConn *rconn)
    2571             : {
    2572             :     remoteConnHashEnt *hentry;
    2573             :     bool        found;
    2574             :     char       *key;
    2575             : 
    2576          20 :     if (!remoteConnHash)
    2577           2 :         remoteConnHash = createConnHash();
    2578             : 
    2579          20 :     key = pstrdup(name);
    2580          20 :     truncate_identifier(key, strlen(key), true);
    2581          20 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2582             :                                                HASH_ENTER, &found);
    2583             : 
    2584          20 :     if (found)
    2585             :     {
    2586           2 :         PQfinish(rconn->conn);
    2587           2 :         pfree(rconn);
    2588             : 
    2589           2 :         ereport(ERROR,
    2590             :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2591             :                  errmsg("duplicate connection name")));
    2592             :     }
    2593             : 
    2594          18 :     hentry->rconn = rconn;
    2595          18 :     strlcpy(hentry->name, name, sizeof(hentry->name));
    2596          18 : }
    2597             : 
    2598             : static void
    2599          16 : deleteConnection(const char *name)
    2600             : {
    2601             :     remoteConnHashEnt *hentry;
    2602             :     bool        found;
    2603             :     char       *key;
    2604             : 
    2605          16 :     if (!remoteConnHash)
    2606           0 :         remoteConnHash = createConnHash();
    2607             : 
    2608          16 :     key = pstrdup(name);
    2609          16 :     truncate_identifier(key, strlen(key), false);
    2610          16 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2611             :                                                key, HASH_REMOVE, &found);
    2612             : 
    2613          16 :     if (!hentry)
    2614           0 :         ereport(ERROR,
    2615             :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2616             :                  errmsg("undefined connection name")));
    2617             : 
    2618          16 : }
    2619             : 
    2620             : static void
    2621          34 : dblink_security_check(PGconn *conn, remoteConn *rconn)
    2622             : {
    2623          34 :     if (!superuser())
    2624             :     {
    2625           0 :         if (!PQconnectionUsedPassword(conn))
    2626             :         {
    2627           0 :             PQfinish(conn);
    2628           0 :             if (rconn)
    2629           0 :                 pfree(rconn);
    2630             : 
    2631           0 :             ereport(ERROR,
    2632             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2633             :                      errmsg("password is required"),
    2634             :                      errdetail("Non-superuser cannot connect if the server does not request a password."),
    2635             :                      errhint("Target server's authentication method must be changed.")));
    2636             :         }
    2637             :     }
    2638          34 : }
    2639             : 
    2640             : /*
    2641             :  * For non-superusers, insist that the connstr specify a password.  This
    2642             :  * prevents a password from being picked up from .pgpass, a service file,
    2643             :  * the environment, etc.  We don't want the postgres user's passwords
    2644             :  * to be accessible to non-superusers.
    2645             :  */
    2646             : static void
    2647          40 : dblink_connstr_check(const char *connstr)
    2648             : {
    2649          40 :     if (!superuser())
    2650             :     {
    2651             :         PQconninfoOption *options;
    2652             :         PQconninfoOption *option;
    2653           2 :         bool        connstr_gives_password = false;
    2654             : 
    2655           2 :         options = PQconninfoParse(connstr, NULL);
    2656           2 :         if (options)
    2657             :         {
    2658          62 :             for (option = options; option->keyword != NULL; option++)
    2659             :             {
    2660          60 :                 if (strcmp(option->keyword, "password") == 0)
    2661             :                 {
    2662           2 :                     if (option->val != NULL && option->val[0] != '\0')
    2663             :                     {
    2664           0 :                         connstr_gives_password = true;
    2665           0 :                         break;
    2666             :                     }
    2667             :                 }
    2668             :             }
    2669           2 :             PQconninfoFree(options);
    2670             :         }
    2671             : 
    2672           2 :         if (!connstr_gives_password)
    2673           2 :             ereport(ERROR,
    2674             :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2675             :                      errmsg("password is required"),
    2676             :                      errdetail("Non-superusers must provide a password in the connection string.")));
    2677             :     }
    2678          38 : }
    2679             : 
    2680             : /*
    2681             :  * Report an error received from the remote server
    2682             :  *
    2683             :  * res: the received error result (will be freed)
    2684             :  * fail: true for ERROR ereport, false for NOTICE
    2685             :  * fmt and following args: sprintf-style format and values for errcontext;
    2686             :  * the resulting string should be worded like "while <some action>"
    2687             :  */
    2688             : static void
    2689          24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2690             :                  bool fail, const char *fmt,...)
    2691             : {
    2692             :     int         level;
    2693          24 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2694          24 :     char       *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2695          24 :     char       *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2696          24 :     char       *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2697          24 :     char       *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2698             :     int         sqlstate;
    2699             :     char       *message_primary;
    2700             :     char       *message_detail;
    2701             :     char       *message_hint;
    2702             :     char       *message_context;
    2703             :     va_list     ap;
    2704             :     char        dblink_context_msg[512];
    2705             : 
    2706          24 :     if (fail)
    2707           6 :         level = ERROR;
    2708             :     else
    2709          18 :         level = NOTICE;
    2710             : 
    2711          24 :     if (pg_diag_sqlstate)
    2712          24 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2713             :                                  pg_diag_sqlstate[1],
    2714             :                                  pg_diag_sqlstate[2],
    2715             :                                  pg_diag_sqlstate[3],
    2716             :                                  pg_diag_sqlstate[4]);
    2717             :     else
    2718           0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2719             : 
    2720          24 :     message_primary = xpstrdup(pg_diag_message_primary);
    2721          24 :     message_detail = xpstrdup(pg_diag_message_detail);
    2722          24 :     message_hint = xpstrdup(pg_diag_message_hint);
    2723          24 :     message_context = xpstrdup(pg_diag_context);
    2724             : 
    2725             :     /*
    2726             :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2727             :      * needed because for connection-level failures, PQexec may just return
    2728             :      * NULL, not a PGresult at all.
    2729             :      */
    2730          24 :     if (message_primary == NULL)
    2731           0 :         message_primary = pchomp(PQerrorMessage(conn));
    2732             : 
    2733             :     /*
    2734             :      * Now that we've copied all the data we need out of the PGresult, it's
    2735             :      * safe to free it.  We must do this to avoid PGresult leakage.  We're
    2736             :      * leaking all the strings too, but those are in palloc'd memory that will
    2737             :      * get cleaned up eventually.
    2738             :      */
    2739          24 :     if (res)
    2740          24 :         PQclear(res);
    2741             : 
    2742             :     /*
    2743             :      * Format the basic errcontext string.  Below, we'll add on something
    2744             :      * about the connection name.  That's a violation of the translatability
    2745             :      * guidelines about constructing error messages out of parts, but since
    2746             :      * there's no translation support for dblink, there's no need to worry
    2747             :      * about that (yet).
    2748             :      */
    2749          24 :     va_start(ap, fmt);
    2750          24 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2751          24 :     va_end(ap);
    2752             : 
    2753          24 :     ereport(level,
    2754             :             (errcode(sqlstate),
    2755             :              message_primary ? errmsg_internal("%s", message_primary) :
    2756             :              errmsg("could not obtain message string for remote error"),
    2757             :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2758             :              message_hint ? errhint("%s", message_hint) : 0,
    2759             :              message_context ? (errcontext("%s", message_context)) : 0,
    2760             :              conname ?
    2761             :              (errcontext("%s on dblink connection named \"%s\"",
    2762             :                          dblink_context_msg, conname)) :
    2763             :              (errcontext("%s on unnamed dblink connection",
    2764             :                          dblink_context_msg))));
    2765          18 : }
    2766             : 
    2767             : /*
    2768             :  * Obtain connection string for a foreign server
    2769             :  */
    2770             : static char *
    2771          40 : get_connect_string(const char *servername)
    2772             : {
    2773          40 :     ForeignServer *foreign_server = NULL;
    2774             :     UserMapping *user_mapping;
    2775             :     ListCell   *cell;
    2776             :     StringInfoData buf;
    2777             :     ForeignDataWrapper *fdw;
    2778             :     AclResult   aclresult;
    2779             :     char       *srvname;
    2780             : 
    2781             :     static const PQconninfoOption *options = NULL;
    2782             : 
    2783          40 :     initStringInfo(&buf);
    2784             : 
    2785             :     /*
    2786             :      * Get list of valid libpq options.
    2787             :      *
    2788             :      * To avoid unnecessary work, we get the list once and use it throughout
    2789             :      * the lifetime of this backend process.  We don't need to care about
    2790             :      * memory context issues, because PQconndefaults allocates with malloc.
    2791             :      */
    2792          40 :     if (!options)
    2793             :     {
    2794           6 :         options = PQconndefaults();
    2795           6 :         if (!options)           /* assume reason for failure is OOM */
    2796           0 :             ereport(ERROR,
    2797             :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2798             :                      errmsg("out of memory"),
    2799             :                      errdetail("Could not get libpq's default connection options.")));
    2800             :     }
    2801             : 
    2802             :     /* first gather the server connstr options */
    2803          40 :     srvname = pstrdup(servername);
    2804          40 :     truncate_identifier(srvname, strlen(srvname), false);
    2805          40 :     foreign_server = GetForeignServerByName(srvname, true);
    2806             : 
    2807          40 :     if (foreign_server)
    2808             :     {
    2809           4 :         Oid         serverid = foreign_server->serverid;
    2810           4 :         Oid         fdwid = foreign_server->fdwid;
    2811           4 :         Oid         userid = GetUserId();
    2812             : 
    2813           4 :         user_mapping = GetUserMapping(userid, serverid);
    2814           4 :         fdw = GetForeignDataWrapper(fdwid);
    2815             : 
    2816             :         /* Check permissions, user must have usage on the server. */
    2817           4 :         aclresult = pg_foreign_server_aclcheck(serverid, userid, ACL_USAGE);
    2818           4 :         if (aclresult != ACLCHECK_OK)
    2819           0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2820             : 
    2821           4 :         foreach(cell, fdw->options)
    2822             :         {
    2823           0 :             DefElem    *def = lfirst(cell);
    2824             : 
    2825           0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2826           0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2827           0 :                                  escape_param_str(strVal(def->arg)));
    2828             :         }
    2829             : 
    2830          12 :         foreach(cell, foreign_server->options)
    2831             :         {
    2832           8 :             DefElem    *def = lfirst(cell);
    2833             : 
    2834           8 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2835           8 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2836           8 :                                  escape_param_str(strVal(def->arg)));
    2837             :         }
    2838             : 
    2839           8 :         foreach(cell, user_mapping->options)
    2840             :         {
    2841             : 
    2842           4 :             DefElem    *def = lfirst(cell);
    2843             : 
    2844           4 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2845           4 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2846           4 :                                  escape_param_str(strVal(def->arg)));
    2847             :         }
    2848             : 
    2849           4 :         return buf.data;
    2850             :     }
    2851             :     else
    2852          36 :         return NULL;
    2853             : }
    2854             : 
    2855             : /*
    2856             :  * Escaping libpq connect parameter strings.
    2857             :  *
    2858             :  * Replaces "'" with "\'" and "\" with "\\".
    2859             :  */
    2860             : static char *
    2861          12 : escape_param_str(const char *str)
    2862             : {
    2863             :     const char *cp;
    2864             :     StringInfoData buf;
    2865             : 
    2866          12 :     initStringInfo(&buf);
    2867             : 
    2868         136 :     for (cp = str; *cp; cp++)
    2869             :     {
    2870         124 :         if (*cp == '\\' || *cp == '\'')
    2871           0 :             appendStringInfoChar(&buf, '\\');
    2872         124 :         appendStringInfoChar(&buf, *cp);
    2873             :     }
    2874             : 
    2875          12 :     return buf.data;
    2876             : }
    2877             : 
    2878             : /*
    2879             :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2880             :  * functions, and translate to the internal representation.
    2881             :  *
    2882             :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2883             :  * argument (the need for the separate count argument is historical, but we
    2884             :  * still check it).  We check that each attnum corresponds to a valid,
    2885             :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2886             :  * listed twice, though the actual use-case for such things is dubious.
    2887             :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2888             :  * physical not logical column numbers; this was changed for future-proofing.
    2889             :  *
    2890             :  * The internal representation is a palloc'd int array of 0-based physical
    2891             :  * attnums.
    2892             :  */
    2893             : static void
    2894          36 : validate_pkattnums(Relation rel,
    2895             :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2896             :                    int **pkattnums, int *pknumatts)
    2897             : {
    2898          36 :     TupleDesc   tupdesc = rel->rd_att;
    2899          36 :     int         natts = tupdesc->natts;
    2900             :     int         i;
    2901             : 
    2902             :     /* Don't take more array elements than there are */
    2903          36 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    2904             : 
    2905             :     /* Must have at least one pk attnum selected */
    2906          36 :     if (pknumatts_arg <= 0)
    2907           0 :         ereport(ERROR,
    2908             :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2909             :                  errmsg("number of key attributes must be > 0")));
    2910             : 
    2911             :     /* Allocate output array */
    2912          36 :     *pkattnums = (int *) palloc(pknumatts_arg * sizeof(int));
    2913          36 :     *pknumatts = pknumatts_arg;
    2914             : 
    2915             :     /* Validate attnums and convert to internal form */
    2916         114 :     for (i = 0; i < pknumatts_arg; i++)
    2917             :     {
    2918          90 :         int         pkattnum = pkattnums_arg->values[i];
    2919             :         int         lnum;
    2920             :         int         j;
    2921             : 
    2922             :         /* Can throw error immediately if out of range */
    2923          90 :         if (pkattnum <= 0 || pkattnum > natts)
    2924          12 :             ereport(ERROR,
    2925             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2926             :                      errmsg("invalid attribute number %d", pkattnum)));
    2927             : 
    2928             :         /* Identify which physical column has this logical number */
    2929          78 :         lnum = 0;
    2930         138 :         for (j = 0; j < natts; j++)
    2931             :         {
    2932             :             /* dropped columns don't count */
    2933         138 :             if (TupleDescAttr(tupdesc, j)->attisdropped)
    2934           6 :                 continue;
    2935             : 
    2936         132 :             if (++lnum == pkattnum)
    2937          78 :                 break;
    2938             :         }
    2939             : 
    2940          78 :         if (j < natts)
    2941          78 :             (*pkattnums)[i] = j;
    2942             :         else
    2943           0 :             ereport(ERROR,
    2944             :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    2945             :                      errmsg("invalid attribute number %d", pkattnum)));
    2946             :     }
    2947          24 : }
    2948             : 
    2949             : /*
    2950             :  * Check if the specified connection option is valid.
    2951             :  *
    2952             :  * We basically allow whatever libpq thinks is an option, with these
    2953             :  * restrictions:
    2954             :  *      debug options: disallowed
    2955             :  *      "client_encoding": disallowed
    2956             :  *      "user": valid only in USER MAPPING options
    2957             :  *      secure options (eg password): valid only in USER MAPPING options
    2958             :  *      others: valid only in FOREIGN SERVER options
    2959             :  *
    2960             :  * We disallow client_encoding because it would be overridden anyway via
    2961             :  * PQclientEncoding; allowing it to be specified would merely promote
    2962             :  * confusion.
    2963             :  */
    2964             : static bool
    2965          80 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    2966             :                        Oid context)
    2967             : {
    2968             :     const PQconninfoOption *opt;
    2969             : 
    2970             :     /* Look up the option in libpq result */
    2971        1112 :     for (opt = options; opt->keyword; opt++)
    2972             :     {
    2973        1110 :         if (strcmp(opt->keyword, option) == 0)
    2974          78 :             break;
    2975             :     }
    2976          80 :     if (opt->keyword == NULL)
    2977           2 :         return false;
    2978             : 
    2979             :     /* Disallow debug options (particularly "replication") */
    2980          78 :     if (strchr(opt->dispchar, 'D'))
    2981           6 :         return false;
    2982             : 
    2983             :     /* Disallow "client_encoding" */
    2984          72 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    2985           2 :         return false;
    2986             : 
    2987             :     /*
    2988             :      * If the option is "user" or marked secure, it should be specified only
    2989             :      * in USER MAPPING.  Others should be specified only in SERVER.
    2990             :      */
    2991          70 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    2992             :     {
    2993          20 :         if (context != UserMappingRelationId)
    2994           0 :             return false;
    2995             :     }
    2996             :     else
    2997             :     {
    2998          60 :         if (context != ForeignServerRelationId)
    2999          48 :             return false;
    3000             :     }
    3001             : 
    3002          22 :     return true;
    3003             : }
    3004             : 
    3005             : /*
    3006             :  * Copy the remote session's values of GUCs that affect datatype I/O
    3007             :  * and apply them locally in a new GUC nesting level.  Returns the new
    3008             :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3009             :  * or -1 if no new nestlevel was needed.
    3010             :  *
    3011             :  * We use the equivalent of a function SET option to allow the settings to
    3012             :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3013             :  * thrown in between, guc.c will take care of undoing the settings.
    3014             :  */
    3015             : static int
    3016          64 : applyRemoteGucs(PGconn *conn)
    3017             : {
    3018             :     static const char *const GUCsAffectingIO[] = {
    3019             :         "DateStyle",
    3020             :         "IntervalStyle"
    3021             :     };
    3022             : 
    3023          64 :     int         nestlevel = -1;
    3024             :     int         i;
    3025             : 
    3026         192 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3027             :     {
    3028         128 :         const char *gucName = GUCsAffectingIO[i];
    3029         128 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3030             :         const char *localVal;
    3031             : 
    3032             :         /*
    3033             :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3034             :          * that's okay because its output format won't be ambiguous.  So just
    3035             :          * skip the GUC if we don't get a value for it.  (We might eventually
    3036             :          * need more complicated logic with remote-version checks here.)
    3037             :          */
    3038         128 :         if (remoteVal == NULL)
    3039           0 :             continue;
    3040             : 
    3041             :         /*
    3042             :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3043             :          * have the same value.
    3044             :          */
    3045         128 :         localVal = GetConfigOption(gucName, false, false);
    3046             :         Assert(localVal != NULL);
    3047             : 
    3048         128 :         if (strcmp(remoteVal, localVal) == 0)
    3049          94 :             continue;
    3050             : 
    3051             :         /* Create new GUC nest level if we didn't already */
    3052          34 :         if (nestlevel < 0)
    3053          18 :             nestlevel = NewGUCNestLevel();
    3054             : 
    3055             :         /* Apply the option (this will throw error on failure) */
    3056          34 :         (void) set_config_option(gucName, remoteVal,
    3057             :                                  PGC_USERSET, PGC_S_SESSION,
    3058             :                                  GUC_ACTION_SAVE, true, 0, false);
    3059             :     }
    3060             : 
    3061          64 :     return nestlevel;
    3062             : }
    3063             : 
    3064             : /*
    3065             :  * Restore local GUCs after they have been overlaid with remote settings.
    3066             :  */
    3067             : static void
    3068          66 : restoreLocalGucs(int nestlevel)
    3069             : {
    3070             :     /* Do nothing if no new nestlevel was created */
    3071          66 :     if (nestlevel > 0)
    3072          16 :         AtEOXact_GUC(true, nestlevel);
    3073          66 : }

Generated by: LCOV version 1.13