LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.8 % 1117 970
Test Date: 2026-03-11 10:16:42 Functions: 97.5 % 81 79
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*
       2              :  * dblink.c
       3              :  *
       4              :  * Functions returning results from a remote database
       5              :  *
       6              :  * Joe Conway <mail@joeconway.com>
       7              :  * And contributors:
       8              :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9              :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10              :  *
      11              :  * contrib/dblink/dblink.c
      12              :  * Copyright (c) 2001-2026, PostgreSQL Global Development Group
      13              :  * ALL RIGHTS RESERVED;
      14              :  *
      15              :  * Permission to use, copy, modify, and distribute this software and its
      16              :  * documentation for any purpose, without fee, and without a written agreement
      17              :  * is hereby granted, provided that the above copyright notice and this
      18              :  * paragraph and the following two paragraphs appear in all copies.
      19              :  *
      20              :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21              :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22              :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23              :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24              :  * POSSIBILITY OF SUCH DAMAGE.
      25              :  *
      26              :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27              :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28              :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29              :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30              :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31              :  *
      32              :  */
      33              : #include "postgres.h"
      34              : 
      35              : #include <limits.h>
      36              : 
      37              : #include "access/htup_details.h"
      38              : #include "access/relation.h"
      39              : #include "access/reloptions.h"
      40              : #include "access/table.h"
      41              : #include "catalog/namespace.h"
      42              : #include "catalog/pg_foreign_data_wrapper.h"
      43              : #include "catalog/pg_foreign_server.h"
      44              : #include "catalog/pg_type.h"
      45              : #include "catalog/pg_user_mapping.h"
      46              : #include "commands/defrem.h"
      47              : #include "common/base64.h"
      48              : #include "executor/spi.h"
      49              : #include "foreign/foreign.h"
      50              : #include "funcapi.h"
      51              : #include "lib/stringinfo.h"
      52              : #include "libpq-fe.h"
      53              : #include "libpq/libpq-be.h"
      54              : #include "libpq/libpq-be-fe-helpers.h"
      55              : #include "mb/pg_wchar.h"
      56              : #include "miscadmin.h"
      57              : #include "parser/scansup.h"
      58              : #include "utils/acl.h"
      59              : #include "utils/builtins.h"
      60              : #include "utils/fmgroids.h"
      61              : #include "utils/guc.h"
      62              : #include "utils/lsyscache.h"
      63              : #include "utils/memutils.h"
      64              : #include "utils/rel.h"
      65              : #include "utils/varlena.h"
      66              : #include "utils/wait_event.h"
      67              : 
      68           16 : PG_MODULE_MAGIC_EXT(
      69              :                     .name = "dblink",
      70              :                     .version = PG_VERSION
      71              : );
      72              : 
      73              : typedef struct remoteConn
      74              : {
      75              :     PGconn     *conn;           /* Hold the remote connection */
      76              :     int         openCursorCount;    /* The number of open cursors */
      77              :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      78              : } remoteConn;
      79              : 
      80              : typedef struct storeInfo
      81              : {
      82              :     FunctionCallInfo fcinfo;
      83              :     Tuplestorestate *tuplestore;
      84              :     AttInMetadata *attinmeta;
      85              :     MemoryContext tmpcontext;
      86              :     char      **cstrs;
      87              :     /* temp storage for results to avoid leaks on exception */
      88              :     PGresult   *last_res;
      89              :     PGresult   *cur_res;
      90              : } storeInfo;
      91              : 
      92              : /*
      93              :  * Internal declarations
      94              :  */
      95              : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      96              : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      97              : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
      98              :                               PGresult *res);
      99              : static void materializeQueryResult(FunctionCallInfo fcinfo,
     100              :                                    PGconn *conn,
     101              :                                    const char *conname,
     102              :                                    const char *sql,
     103              :                                    bool fail);
     104              : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
     105              : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
     106              : static remoteConn *getConnectionByName(const char *name);
     107              : static HTAB *createConnHash(void);
     108              : static remoteConn *createNewConnection(const char *name);
     109              : static void deleteConnection(const char *name);
     110              : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     111              : static char **get_text_array_contents(ArrayType *array, int *numitems);
     112              : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     113              : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     114              : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     115              : static char *quote_ident_cstr(char *rawstr);
     116              : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     117              : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     118              : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     119              : static char *generate_relation_name(Relation rel);
     120              : static void dblink_connstr_check(const char *connstr);
     121              : static bool dblink_connstr_has_pw(const char *connstr);
     122              : static void dblink_security_check(PGconn *conn, const char *connname,
     123              :                                   const char *connstr);
     124              : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     125              :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     126              : static char *get_connect_string(const char *servername);
     127              : static char *escape_param_str(const char *str);
     128              : static void validate_pkattnums(Relation rel,
     129              :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     130              :                                int **pkattnums, int *pknumatts);
     131              : static bool is_valid_dblink_option(const PQconninfoOption *options,
     132              :                                    const char *option, Oid context);
     133              : static int  applyRemoteGucs(PGconn *conn);
     134              : static void restoreLocalGucs(int nestlevel);
     135              : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
     136              : static void appendSCRAMKeysInfo(StringInfo buf);
     137              : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
     138              :                                        Oid context);
     139              : static bool dblink_connstr_has_required_scram_options(const char *connstr);
     140              : 
     141              : /* Global */
     142              : static remoteConn *pconn = NULL;
     143              : static HTAB *remoteConnHash = NULL;
     144              : 
     145              : /* custom wait event values, retrieved from shared memory */
     146              : static uint32 dblink_we_connect = 0;
     147              : static uint32 dblink_we_get_conn = 0;
     148              : static uint32 dblink_we_get_result = 0;
     149              : 
     150              : /*
     151              :  *  Following is hash that holds multiple remote connections.
     152              :  *  Calling convention of each dblink function changes to accept
     153              :  *  connection name as the first parameter. The connection hash is
     154              :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     155              :  *
     156              :  *  To avoid potentially leaking a PGconn object in case of out-of-memory
     157              :  *  errors, we first create the hash entry, then open the PGconn.
     158              :  *  Hence, a hash entry whose rconn.conn pointer is NULL must be
     159              :  *  understood as a leftover from a failed create; it should be ignored
     160              :  *  by lookup operations, and silently replaced by create operations.
     161              :  */
     162              : 
     163              : typedef struct remoteConnHashEnt
     164              : {
     165              :     char        name[NAMEDATALEN];
     166              :     remoteConn  rconn;
     167              : } remoteConnHashEnt;
     168              : 
     169              : /* initial number of connection hashes */
     170              : #define NUMCONN 16
     171              : 
     172              : pg_noreturn static void
     173            0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     174              : {
     175            0 :     char       *msg = pchomp(PQerrorMessage(conn));
     176              : 
     177            0 :     PQclear(res);
     178            0 :     elog(ERROR, "%s: %s", p2, msg);
     179              : }
     180              : 
     181              : pg_noreturn static void
     182            3 : dblink_conn_not_avail(const char *conname)
     183              : {
     184            3 :     if (conname)
     185            1 :         ereport(ERROR,
     186              :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     187              :                  errmsg("connection \"%s\" not available", conname)));
     188              :     else
     189            2 :         ereport(ERROR,
     190              :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     191              :                  errmsg("connection not available")));
     192              : }
     193              : 
     194              : static void
     195           36 : dblink_get_conn(char *conname_or_str,
     196              :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     197              : {
     198           36 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     199              :     PGconn     *conn;
     200              :     char       *conname;
     201              :     bool        freeconn;
     202              : 
     203           36 :     if (rconn)
     204              :     {
     205           27 :         conn = rconn->conn;
     206           27 :         conname = conname_or_str;
     207           27 :         freeconn = false;
     208              :     }
     209              :     else
     210              :     {
     211              :         const char *connstr;
     212              : 
     213            9 :         connstr = get_connect_string(conname_or_str);
     214            9 :         if (connstr == NULL)
     215            6 :             connstr = conname_or_str;
     216            9 :         dblink_connstr_check(connstr);
     217              : 
     218              :         /* first time, allocate or get the custom wait event */
     219            8 :         if (dblink_we_get_conn == 0)
     220            4 :             dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
     221              : 
     222              :         /* OK to make connection */
     223            8 :         conn = libpqsrv_connect(connstr, dblink_we_get_conn);
     224              : 
     225            8 :         if (PQstatus(conn) == CONNECTION_BAD)
     226              :         {
     227            2 :             char       *msg = pchomp(PQerrorMessage(conn));
     228              : 
     229            2 :             libpqsrv_disconnect(conn);
     230            2 :             ereport(ERROR,
     231              :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     232              :                      errmsg("could not establish connection"),
     233              :                      errdetail_internal("%s", msg)));
     234              :         }
     235              : 
     236            6 :         PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     237              :                             "received message via remote connection");
     238              : 
     239            6 :         dblink_security_check(conn, NULL, connstr);
     240            6 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     241            0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     242            6 :         freeconn = true;
     243            6 :         conname = NULL;
     244              :     }
     245              : 
     246           33 :     *conn_p = conn;
     247           33 :     *conname_p = conname;
     248           33 :     *freeconn_p = freeconn;
     249           33 : }
     250              : 
     251              : static PGconn *
     252           17 : dblink_get_named_conn(const char *conname)
     253              : {
     254           17 :     remoteConn *rconn = getConnectionByName(conname);
     255              : 
     256           17 :     if (rconn)
     257           17 :         return rconn->conn;
     258              : 
     259            0 :     dblink_conn_not_avail(conname);
     260              :     return NULL;                /* keep compiler quiet */
     261              : }
     262              : 
     263              : static void
     264          123 : dblink_init(void)
     265              : {
     266          123 :     if (!pconn)
     267              :     {
     268            6 :         if (dblink_we_get_result == 0)
     269            6 :             dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
     270              : 
     271            6 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     272            6 :         pconn->conn = NULL;
     273            6 :         pconn->openCursorCount = 0;
     274            6 :         pconn->newXactForCursor = false;
     275              :     }
     276          123 : }
     277              : 
     278              : /*
     279              :  * Create a persistent connection to another database
     280              :  */
     281           13 : PG_FUNCTION_INFO_V1(dblink_connect);
     282              : Datum
     283           16 : dblink_connect(PG_FUNCTION_ARGS)
     284              : {
     285           16 :     char       *conname_or_str = NULL;
     286           16 :     char       *connstr = NULL;
     287           16 :     char       *connname = NULL;
     288              :     char       *msg;
     289           16 :     PGconn     *conn = NULL;
     290           16 :     remoteConn *rconn = NULL;
     291              : 
     292           16 :     dblink_init();
     293              : 
     294           16 :     if (PG_NARGS() == 2)
     295              :     {
     296           11 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     297           11 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     298              :     }
     299            5 :     else if (PG_NARGS() == 1)
     300            5 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     301              : 
     302              :     /* first check for valid foreign data server */
     303           16 :     connstr = get_connect_string(conname_or_str);
     304           16 :     if (connstr == NULL)
     305           14 :         connstr = conname_or_str;
     306              : 
     307              :     /* check password in connection string if not superuser */
     308           16 :     dblink_connstr_check(connstr);
     309              : 
     310              :     /* first time, allocate or get the custom wait event */
     311           15 :     if (dblink_we_connect == 0)
     312            2 :         dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
     313              : 
     314              :     /* if we need a hashtable entry, make that first, since it might fail */
     315           15 :     if (connname)
     316              :     {
     317           10 :         rconn = createNewConnection(connname);
     318              :         Assert(rconn->conn == NULL);
     319              :     }
     320              : 
     321              :     /* OK to make connection */
     322           14 :     conn = libpqsrv_connect(connstr, dblink_we_connect);
     323              : 
     324           14 :     if (PQstatus(conn) == CONNECTION_BAD)
     325              :     {
     326            0 :         msg = pchomp(PQerrorMessage(conn));
     327            0 :         libpqsrv_disconnect(conn);
     328            0 :         if (connname)
     329            0 :             deleteConnection(connname);
     330              : 
     331            0 :         ereport(ERROR,
     332              :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     333              :                  errmsg("could not establish connection"),
     334              :                  errdetail_internal("%s", msg)));
     335              :     }
     336              : 
     337           14 :     PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     338              :                         "received message via remote connection");
     339              : 
     340              :     /* check password actually used if not superuser */
     341           14 :     dblink_security_check(conn, connname, connstr);
     342              : 
     343              :     /* attempt to set client encoding to match server encoding, if needed */
     344           14 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     345            0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     346              : 
     347              :     /* all OK, save away the conn */
     348           14 :     if (connname)
     349              :     {
     350            9 :         rconn->conn = conn;
     351              :     }
     352              :     else
     353              :     {
     354            5 :         if (pconn->conn)
     355            1 :             libpqsrv_disconnect(pconn->conn);
     356            5 :         pconn->conn = conn;
     357              :     }
     358              : 
     359           14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     360              : }
     361              : 
     362              : /*
     363              :  * Clear a persistent connection to another database
     364              :  */
     365            8 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     366              : Datum
     367           13 : dblink_disconnect(PG_FUNCTION_ARGS)
     368              : {
     369           13 :     char       *conname = NULL;
     370           13 :     remoteConn *rconn = NULL;
     371           13 :     PGconn     *conn = NULL;
     372              : 
     373           13 :     dblink_init();
     374              : 
     375           13 :     if (PG_NARGS() == 1)
     376              :     {
     377            9 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     378            9 :         rconn = getConnectionByName(conname);
     379            9 :         if (rconn)
     380            8 :             conn = rconn->conn;
     381              :     }
     382              :     else
     383            4 :         conn = pconn->conn;
     384              : 
     385           13 :     if (!conn)
     386            1 :         dblink_conn_not_avail(conname);
     387              : 
     388           12 :     libpqsrv_disconnect(conn);
     389           12 :     if (rconn)
     390            8 :         deleteConnection(conname);
     391              :     else
     392            4 :         pconn->conn = NULL;
     393              : 
     394           12 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     395              : }
     396              : 
     397              : /*
     398              :  * opens a cursor using a persistent connection
     399              :  */
     400           13 : PG_FUNCTION_INFO_V1(dblink_open);
     401              : Datum
     402            9 : dblink_open(PG_FUNCTION_ARGS)
     403              : {
     404            9 :     PGresult   *res = NULL;
     405              :     PGconn     *conn;
     406            9 :     char       *curname = NULL;
     407            9 :     char       *sql = NULL;
     408            9 :     char       *conname = NULL;
     409              :     StringInfoData buf;
     410            9 :     remoteConn *rconn = NULL;
     411            9 :     bool        fail = true;    /* default to backward compatible behavior */
     412              : 
     413            9 :     dblink_init();
     414            9 :     initStringInfo(&buf);
     415              : 
     416            9 :     if (PG_NARGS() == 2)
     417              :     {
     418              :         /* text,text */
     419            2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     420            2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     421            2 :         rconn = pconn;
     422              :     }
     423            7 :     else if (PG_NARGS() == 3)
     424              :     {
     425              :         /* might be text,text,text or text,text,bool */
     426            6 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     427              :         {
     428            1 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     429            1 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     430            1 :             fail = PG_GETARG_BOOL(2);
     431            1 :             rconn = pconn;
     432              :         }
     433              :         else
     434              :         {
     435            5 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     436            5 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     437            5 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     438            5 :             rconn = getConnectionByName(conname);
     439              :         }
     440              :     }
     441            1 :     else if (PG_NARGS() == 4)
     442              :     {
     443              :         /* text,text,text,bool */
     444            1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     445            1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     446            1 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     447            1 :         fail = PG_GETARG_BOOL(3);
     448            1 :         rconn = getConnectionByName(conname);
     449              :     }
     450              : 
     451            9 :     if (!rconn || !rconn->conn)
     452            0 :         dblink_conn_not_avail(conname);
     453              : 
     454            9 :     conn = rconn->conn;
     455              : 
     456              :     /* If we are not in a transaction, start one */
     457            9 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     458              :     {
     459            7 :         res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
     460            7 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     461            0 :             dblink_res_internalerror(conn, res, "begin error");
     462            7 :         PQclear(res);
     463            7 :         rconn->newXactForCursor = true;
     464              : 
     465              :         /*
     466              :          * Since transaction state was IDLE, we force cursor count to
     467              :          * initially be 0. This is needed as a previous ABORT might have wiped
     468              :          * out our transaction without maintaining the cursor count for us.
     469              :          */
     470            7 :         rconn->openCursorCount = 0;
     471              :     }
     472              : 
     473              :     /* if we started a transaction, increment cursor count */
     474            9 :     if (rconn->newXactForCursor)
     475            9 :         (rconn->openCursorCount)++;
     476              : 
     477            9 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     478            9 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     479            9 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     480              :     {
     481            2 :         dblink_res_error(conn, conname, res, fail,
     482              :                          "while opening cursor \"%s\"", curname);
     483            2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     484              :     }
     485              : 
     486            7 :     PQclear(res);
     487            7 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     488              : }
     489              : 
     490              : /*
     491              :  * closes a cursor
     492              :  */
     493           10 : PG_FUNCTION_INFO_V1(dblink_close);
     494              : Datum
     495            5 : dblink_close(PG_FUNCTION_ARGS)
     496              : {
     497              :     PGconn     *conn;
     498            5 :     PGresult   *res = NULL;
     499            5 :     char       *curname = NULL;
     500            5 :     char       *conname = NULL;
     501              :     StringInfoData buf;
     502            5 :     remoteConn *rconn = NULL;
     503            5 :     bool        fail = true;    /* default to backward compatible behavior */
     504              : 
     505            5 :     dblink_init();
     506            5 :     initStringInfo(&buf);
     507              : 
     508            5 :     if (PG_NARGS() == 1)
     509              :     {
     510              :         /* text */
     511            0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     512            0 :         rconn = pconn;
     513              :     }
     514            5 :     else if (PG_NARGS() == 2)
     515              :     {
     516              :         /* might be text,text or text,bool */
     517            5 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     518              :         {
     519            2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     520            2 :             fail = PG_GETARG_BOOL(1);
     521            2 :             rconn = pconn;
     522              :         }
     523              :         else
     524              :         {
     525            3 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     526            3 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     527            3 :             rconn = getConnectionByName(conname);
     528              :         }
     529              :     }
     530            5 :     if (PG_NARGS() == 3)
     531              :     {
     532              :         /* text,text,bool */
     533            0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     534            0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     535            0 :         fail = PG_GETARG_BOOL(2);
     536            0 :         rconn = getConnectionByName(conname);
     537              :     }
     538              : 
     539            5 :     if (!rconn || !rconn->conn)
     540            0 :         dblink_conn_not_avail(conname);
     541              : 
     542            5 :     conn = rconn->conn;
     543              : 
     544            5 :     appendStringInfo(&buf, "CLOSE %s", curname);
     545              : 
     546              :     /* close the cursor */
     547            5 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     548            5 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     549              :     {
     550            1 :         dblink_res_error(conn, conname, res, fail,
     551              :                          "while closing cursor \"%s\"", curname);
     552            1 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     553              :     }
     554              : 
     555            4 :     PQclear(res);
     556              : 
     557              :     /* if we started a transaction, decrement cursor count */
     558            4 :     if (rconn->newXactForCursor)
     559              :     {
     560            4 :         (rconn->openCursorCount)--;
     561              : 
     562              :         /* if count is zero, commit the transaction */
     563            4 :         if (rconn->openCursorCount == 0)
     564              :         {
     565            2 :             rconn->newXactForCursor = false;
     566              : 
     567            2 :             res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
     568            2 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     569            0 :                 dblink_res_internalerror(conn, res, "commit error");
     570            2 :             PQclear(res);
     571              :         }
     572              :     }
     573              : 
     574            4 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     575              : }
     576              : 
     577              : /*
     578              :  * Fetch results from an open cursor
     579              :  */
     580           13 : PG_FUNCTION_INFO_V1(dblink_fetch);
     581              : Datum
     582           13 : dblink_fetch(PG_FUNCTION_ARGS)
     583              : {
     584           13 :     PGresult   *res = NULL;
     585           13 :     char       *conname = NULL;
     586           13 :     remoteConn *rconn = NULL;
     587           13 :     PGconn     *conn = NULL;
     588              :     StringInfoData buf;
     589           13 :     char       *curname = NULL;
     590           13 :     int         howmany = 0;
     591           13 :     bool        fail = true;    /* default to backward compatible */
     592              : 
     593           13 :     prepTuplestoreResult(fcinfo);
     594              : 
     595           13 :     dblink_init();
     596              : 
     597           13 :     if (PG_NARGS() == 4)
     598              :     {
     599              :         /* text,text,int,bool */
     600            1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     601            1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     602            1 :         howmany = PG_GETARG_INT32(2);
     603            1 :         fail = PG_GETARG_BOOL(3);
     604              : 
     605            1 :         rconn = getConnectionByName(conname);
     606            1 :         if (rconn)
     607            1 :             conn = rconn->conn;
     608              :     }
     609           12 :     else if (PG_NARGS() == 3)
     610              :     {
     611              :         /* text,text,int or text,int,bool */
     612            8 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     613              :         {
     614            2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     615            2 :             howmany = PG_GETARG_INT32(1);
     616            2 :             fail = PG_GETARG_BOOL(2);
     617            2 :             conn = pconn->conn;
     618              :         }
     619              :         else
     620              :         {
     621            6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     622            6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     623            6 :             howmany = PG_GETARG_INT32(2);
     624              : 
     625            6 :             rconn = getConnectionByName(conname);
     626            6 :             if (rconn)
     627            6 :                 conn = rconn->conn;
     628              :         }
     629              :     }
     630            4 :     else if (PG_NARGS() == 2)
     631              :     {
     632              :         /* text,int */
     633            4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     634            4 :         howmany = PG_GETARG_INT32(1);
     635            4 :         conn = pconn->conn;
     636              :     }
     637              : 
     638           13 :     if (!conn)
     639            0 :         dblink_conn_not_avail(conname);
     640              : 
     641           13 :     initStringInfo(&buf);
     642           13 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     643              : 
     644              :     /*
     645              :      * Try to execute the query.  Note that since libpq uses malloc, the
     646              :      * PGresult will be long-lived even though we are still in a short-lived
     647              :      * memory context.
     648              :      */
     649           13 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     650           26 :     if (!res ||
     651           26 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     652           13 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     653              :     {
     654            5 :         dblink_res_error(conn, conname, res, fail,
     655              :                          "while fetching from cursor \"%s\"", curname);
     656            3 :         return (Datum) 0;
     657              :     }
     658            8 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     659              :     {
     660              :         /* cursor does not exist - closed already or bad name */
     661            0 :         PQclear(res);
     662            0 :         ereport(ERROR,
     663              :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     664              :                  errmsg("cursor \"%s\" does not exist", curname)));
     665              :     }
     666              : 
     667            8 :     materializeResult(fcinfo, conn, res);
     668            7 :     return (Datum) 0;
     669              : }
     670              : 
     671              : /*
     672              :  * Note: this is the new preferred version of dblink
     673              :  */
     674           18 : PG_FUNCTION_INFO_V1(dblink_record);
     675              : Datum
     676           28 : dblink_record(PG_FUNCTION_ARGS)
     677              : {
     678           28 :     return dblink_record_internal(fcinfo, false);
     679              : }
     680              : 
     681            4 : PG_FUNCTION_INFO_V1(dblink_send_query);
     682              : Datum
     683            6 : dblink_send_query(PG_FUNCTION_ARGS)
     684              : {
     685              :     PGconn     *conn;
     686              :     char       *sql;
     687              :     int         retval;
     688              : 
     689            6 :     if (PG_NARGS() == 2)
     690              :     {
     691            6 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     692            6 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     693              :     }
     694              :     else
     695              :         /* shouldn't happen */
     696            0 :         elog(ERROR, "wrong number of arguments");
     697              : 
     698              :     /* async query send */
     699            6 :     retval = PQsendQuery(conn, sql);
     700            6 :     if (retval != 1)
     701            0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     702              : 
     703            6 :     PG_RETURN_INT32(retval);
     704              : }
     705              : 
     706            6 : PG_FUNCTION_INFO_V1(dblink_get_result);
     707              : Datum
     708            8 : dblink_get_result(PG_FUNCTION_ARGS)
     709              : {
     710            8 :     return dblink_record_internal(fcinfo, true);
     711              : }
     712              : 
     713              : static Datum
     714           36 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     715              : {
     716           36 :     PGconn     *volatile conn = NULL;
     717           36 :     volatile bool freeconn = false;
     718              : 
     719           36 :     prepTuplestoreResult(fcinfo);
     720              : 
     721           36 :     dblink_init();
     722              : 
     723           36 :     PG_TRY();
     724              :     {
     725           36 :         char       *sql = NULL;
     726           36 :         char       *conname = NULL;
     727           36 :         bool        fail = true;    /* default to backward compatible */
     728              : 
     729           36 :         if (!is_async)
     730              :         {
     731           28 :             if (PG_NARGS() == 3)
     732              :             {
     733              :                 /* text,text,bool */
     734            1 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     735            1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     736            1 :                 fail = PG_GETARG_BOOL(2);
     737            1 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     738              :             }
     739           27 :             else if (PG_NARGS() == 2)
     740              :             {
     741              :                 /* text,text or text,bool */
     742           20 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     743              :                 {
     744            1 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     745            1 :                     fail = PG_GETARG_BOOL(1);
     746            1 :                     conn = pconn->conn;
     747              :                 }
     748              :                 else
     749              :                 {
     750           19 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     751           19 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     752           19 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     753              :                 }
     754              :             }
     755            7 :             else if (PG_NARGS() == 1)
     756              :             {
     757              :                 /* text */
     758            7 :                 conn = pconn->conn;
     759            7 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     760              :             }
     761              :             else
     762              :                 /* shouldn't happen */
     763            0 :                 elog(ERROR, "wrong number of arguments");
     764              :         }
     765              :         else                    /* is_async */
     766              :         {
     767              :             /* get async result */
     768            8 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     769              : 
     770            8 :             if (PG_NARGS() == 2)
     771              :             {
     772              :                 /* text,bool */
     773            0 :                 fail = PG_GETARG_BOOL(1);
     774            0 :                 conn = dblink_get_named_conn(conname);
     775              :             }
     776            8 :             else if (PG_NARGS() == 1)
     777              :             {
     778              :                 /* text */
     779            8 :                 conn = dblink_get_named_conn(conname);
     780              :             }
     781              :             else
     782              :                 /* shouldn't happen */
     783            0 :                 elog(ERROR, "wrong number of arguments");
     784              :         }
     785              : 
     786           33 :         if (!conn)
     787            2 :             dblink_conn_not_avail(conname);
     788              : 
     789           31 :         if (!is_async)
     790              :         {
     791              :             /* synchronous query, use efficient tuple collection method */
     792           23 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     793              :         }
     794              :         else
     795              :         {
     796              :             /* async result retrieval, do it the old way */
     797            8 :             PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
     798              : 
     799              :             /* NULL means we're all done with the async results */
     800            8 :             if (res)
     801              :             {
     802            5 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     803            5 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     804              :                 {
     805            0 :                     dblink_res_error(conn, conname, res, fail,
     806              :                                      "while executing query");
     807              :                     /* if fail isn't set, we'll return an empty query result */
     808              :                 }
     809              :                 else
     810              :                 {
     811            5 :                     materializeResult(fcinfo, conn, res);
     812              :                 }
     813              :             }
     814              :         }
     815              :     }
     816            5 :     PG_FINALLY();
     817              :     {
     818              :         /* if needed, close the connection to the database */
     819           36 :         if (freeconn)
     820            5 :             libpqsrv_disconnect(conn);
     821              :     }
     822           36 :     PG_END_TRY();
     823              : 
     824           31 :     return (Datum) 0;
     825              : }
     826              : 
     827              : /*
     828              :  * Verify function caller can handle a tuplestore result, and set up for that.
     829              :  *
     830              :  * Note: if the caller returns without actually creating a tuplestore, the
     831              :  * executor will treat the function result as an empty set.
     832              :  */
     833              : static void
     834           49 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     835              : {
     836           49 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     837              : 
     838              :     /* check to see if query supports us returning a tuplestore */
     839           49 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     840            0 :         ereport(ERROR,
     841              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     842              :                  errmsg("set-valued function called in context that cannot accept a set")));
     843           49 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     844            0 :         ereport(ERROR,
     845              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     846              :                  errmsg("materialize mode required, but it is not allowed in this context")));
     847              : 
     848              :     /* let the executor know we're sending back a tuplestore */
     849           49 :     rsinfo->returnMode = SFRM_Materialize;
     850              : 
     851              :     /* caller must fill these to return a non-empty result */
     852           49 :     rsinfo->setResult = NULL;
     853           49 :     rsinfo->setDesc = NULL;
     854           49 : }
     855              : 
     856              : /*
     857              :  * Copy the contents of the PGresult into a tuplestore to be returned
     858              :  * as the result of the current function.
     859              :  * The PGresult will be released in this function.
     860              :  */
     861              : static void
     862           13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     863              : {
     864           13 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     865              :     TupleDesc   tupdesc;
     866              :     bool        is_sql_cmd;
     867              :     int         ntuples;
     868              :     int         nfields;
     869              : 
     870              :     /* prepTuplestoreResult must have been called previously */
     871              :     Assert(rsinfo->returnMode == SFRM_Materialize);
     872              : 
     873           13 :     if (PQresultStatus(res) == PGRES_COMMAND_OK)
     874              :     {
     875            0 :         is_sql_cmd = true;
     876              : 
     877              :         /*
     878              :          * need a tuple descriptor representing one TEXT column to return the
     879              :          * command status string as our result tuple
     880              :          */
     881            0 :         tupdesc = CreateTemplateTupleDesc(1);
     882            0 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     883              :                            TEXTOID, -1, 0);
     884            0 :         ntuples = 1;
     885            0 :         nfields = 1;
     886              :     }
     887              :     else
     888              :     {
     889              :         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     890              : 
     891           13 :         is_sql_cmd = false;
     892              : 
     893              :         /* get a tuple descriptor for our result type */
     894           13 :         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     895              :         {
     896           13 :             case TYPEFUNC_COMPOSITE:
     897              :                 /* success */
     898           13 :                 break;
     899            0 :             case TYPEFUNC_RECORD:
     900              :                 /* failed to determine actual type of RECORD */
     901            0 :                 ereport(ERROR,
     902              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     903              :                          errmsg("function returning record called in context "
     904              :                                 "that cannot accept type record")));
     905              :                 break;
     906            0 :             default:
     907              :                 /* result type isn't composite */
     908            0 :                 elog(ERROR, "return type must be a row type");
     909              :                 break;
     910              :         }
     911              : 
     912              :         /* make sure we have a persistent copy of the tupdesc */
     913           13 :         tupdesc = CreateTupleDescCopy(tupdesc);
     914           13 :         ntuples = PQntuples(res);
     915           13 :         nfields = PQnfields(res);
     916              :     }
     917              : 
     918              :     /*
     919              :      * check result and tuple descriptor have the same number of columns
     920              :      */
     921           13 :     if (nfields != tupdesc->natts)
     922            0 :         ereport(ERROR,
     923              :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
     924              :                  errmsg("remote query result rowtype does not match "
     925              :                         "the specified FROM clause rowtype")));
     926              : 
     927           13 :     if (ntuples > 0)
     928              :     {
     929              :         AttInMetadata *attinmeta;
     930           13 :         int         nestlevel = -1;
     931              :         Tuplestorestate *tupstore;
     932              :         MemoryContext oldcontext;
     933              :         int         row;
     934              :         char      **values;
     935              : 
     936           13 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
     937              : 
     938              :         /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     939           13 :         if (!is_sql_cmd)
     940           13 :             nestlevel = applyRemoteGucs(conn);
     941              : 
     942           13 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     943           13 :         tupstore = tuplestore_begin_heap(true, false, work_mem);
     944           13 :         rsinfo->setResult = tupstore;
     945           13 :         rsinfo->setDesc = tupdesc;
     946           13 :         MemoryContextSwitchTo(oldcontext);
     947              : 
     948           13 :         values = palloc_array(char *, nfields);
     949              : 
     950              :         /* put all tuples into the tuplestore */
     951           49 :         for (row = 0; row < ntuples; row++)
     952              :         {
     953              :             HeapTuple   tuple;
     954              : 
     955           37 :             if (!is_sql_cmd)
     956              :             {
     957              :                 int         i;
     958              : 
     959          138 :                 for (i = 0; i < nfields; i++)
     960              :                 {
     961          101 :                     if (PQgetisnull(res, row, i))
     962            0 :                         values[i] = NULL;
     963              :                     else
     964          101 :                         values[i] = PQgetvalue(res, row, i);
     965              :                 }
     966              :             }
     967              :             else
     968              :             {
     969            0 :                 values[0] = PQcmdStatus(res);
     970              :             }
     971              : 
     972              :             /* build the tuple and put it into the tuplestore. */
     973           37 :             tuple = BuildTupleFromCStrings(attinmeta, values);
     974           36 :             tuplestore_puttuple(tupstore, tuple);
     975              :         }
     976              : 
     977              :         /* clean up GUC settings, if we changed any */
     978           12 :         restoreLocalGucs(nestlevel);
     979              :     }
     980              : 
     981           12 :     PQclear(res);
     982           12 : }
     983              : 
     984              : /*
     985              :  * Execute the given SQL command and store its results into a tuplestore
     986              :  * to be returned as the result of the current function.
     987              :  *
     988              :  * This is equivalent to PQexec followed by materializeResult, but we make
     989              :  * use of libpq's single-row mode to avoid accumulating the whole result
     990              :  * inside libpq before it gets transferred to the tuplestore.
     991              :  */
     992              : static void
     993           23 : materializeQueryResult(FunctionCallInfo fcinfo,
     994              :                        PGconn *conn,
     995              :                        const char *conname,
     996              :                        const char *sql,
     997              :                        bool fail)
     998              : {
     999           23 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1000              : 
    1001              :     /* prepTuplestoreResult must have been called previously */
    1002              :     Assert(rsinfo->returnMode == SFRM_Materialize);
    1003              : 
    1004              :     /* Use a PG_TRY block to ensure we pump libpq dry of results */
    1005           23 :     PG_TRY();
    1006              :     {
    1007           23 :         storeInfo   sinfo = {0};
    1008              :         PGresult   *res;
    1009              : 
    1010           23 :         sinfo.fcinfo = fcinfo;
    1011              :         /* Create short-lived memory context for data conversions */
    1012           23 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1013              :                                                  "dblink temporary context",
    1014              :                                                  ALLOCSET_DEFAULT_SIZES);
    1015              : 
    1016              :         /* execute query, collecting any tuples into the tuplestore */
    1017           23 :         res = storeQueryResult(&sinfo, conn, sql);
    1018              : 
    1019           23 :         if (!res ||
    1020           23 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1021           23 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1022              :         {
    1023            2 :             dblink_res_error(conn, conname, res, fail,
    1024              :                              "while executing query");
    1025              :             /* if fail isn't set, we'll return an empty query result */
    1026              :         }
    1027           21 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1028              :         {
    1029              :             /*
    1030              :              * storeRow didn't get called, so we need to convert the command
    1031              :              * status string to a tuple manually
    1032              :              */
    1033              :             TupleDesc   tupdesc;
    1034              :             AttInMetadata *attinmeta;
    1035              :             Tuplestorestate *tupstore;
    1036              :             HeapTuple   tuple;
    1037              :             char       *values[1];
    1038              :             MemoryContext oldcontext;
    1039              : 
    1040              :             /*
    1041              :              * need a tuple descriptor representing one TEXT column to return
    1042              :              * the command status string as our result tuple
    1043              :              */
    1044            0 :             tupdesc = CreateTemplateTupleDesc(1);
    1045            0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1046              :                                TEXTOID, -1, 0);
    1047            0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1048              : 
    1049            0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1050            0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1051            0 :             rsinfo->setResult = tupstore;
    1052            0 :             rsinfo->setDesc = tupdesc;
    1053            0 :             MemoryContextSwitchTo(oldcontext);
    1054              : 
    1055            0 :             values[0] = PQcmdStatus(res);
    1056              : 
    1057              :             /* build the tuple and put it into the tuplestore. */
    1058            0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1059            0 :             tuplestore_puttuple(tupstore, tuple);
    1060              : 
    1061            0 :             PQclear(res);
    1062              :         }
    1063              :         else
    1064              :         {
    1065              :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1066              :             /* storeRow should have created a tuplestore */
    1067              :             Assert(rsinfo->setResult != NULL);
    1068              : 
    1069           21 :             PQclear(res);
    1070              :         }
    1071              : 
    1072              :         /* clean up data conversion short-lived memory context */
    1073           23 :         if (sinfo.tmpcontext != NULL)
    1074           23 :             MemoryContextDelete(sinfo.tmpcontext);
    1075              : 
    1076           23 :         PQclear(sinfo.last_res);
    1077           23 :         PQclear(sinfo.cur_res);
    1078              :     }
    1079            0 :     PG_CATCH();
    1080              :     {
    1081              :         PGresult   *res;
    1082              : 
    1083              :         /* be sure to clear out any pending data in libpq */
    1084            0 :         while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
    1085              :                NULL)
    1086            0 :             PQclear(res);
    1087            0 :         PG_RE_THROW();
    1088              :     }
    1089           23 :     PG_END_TRY();
    1090           23 : }
    1091              : 
    1092              : /*
    1093              :  * Execute query, and send any result rows to sinfo->tuplestore.
    1094              :  */
    1095              : static PGresult *
    1096           23 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
    1097              : {
    1098           23 :     bool        first = true;
    1099           23 :     int         nestlevel = -1;
    1100              :     PGresult   *res;
    1101              : 
    1102           23 :     if (!PQsendQuery(conn, sql))
    1103            0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1104              : 
    1105           23 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1106            0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1107              : 
    1108              :     for (;;)
    1109              :     {
    1110          198 :         CHECK_FOR_INTERRUPTS();
    1111              : 
    1112          198 :         sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
    1113          198 :         if (!sinfo->cur_res)
    1114           23 :             break;
    1115              : 
    1116          175 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1117              :         {
    1118              :             /* got one row from possibly-bigger resultset */
    1119              : 
    1120              :             /*
    1121              :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1122              :              * We shouldn't do this until we have a row in hand, to ensure
    1123              :              * libpq has seen any earlier ParameterStatus protocol messages.
    1124              :              */
    1125          152 :             if (first && nestlevel < 0)
    1126           21 :                 nestlevel = applyRemoteGucs(conn);
    1127              : 
    1128          152 :             storeRow(sinfo, sinfo->cur_res, first);
    1129              : 
    1130          152 :             PQclear(sinfo->cur_res);
    1131          152 :             sinfo->cur_res = NULL;
    1132          152 :             first = false;
    1133              :         }
    1134              :         else
    1135              :         {
    1136              :             /* if empty resultset, fill tuplestore header */
    1137           23 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1138            0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1139              : 
    1140              :             /* store completed result at last_res */
    1141           23 :             PQclear(sinfo->last_res);
    1142           23 :             sinfo->last_res = sinfo->cur_res;
    1143           23 :             sinfo->cur_res = NULL;
    1144           23 :             first = true;
    1145              :         }
    1146              :     }
    1147              : 
    1148              :     /* clean up GUC settings, if we changed any */
    1149           23 :     restoreLocalGucs(nestlevel);
    1150              : 
    1151              :     /* return last_res */
    1152           23 :     res = sinfo->last_res;
    1153           23 :     sinfo->last_res = NULL;
    1154           23 :     return res;
    1155              : }
    1156              : 
    1157              : /*
    1158              :  * Send single row to sinfo->tuplestore.
    1159              :  *
    1160              :  * If "first" is true, create the tuplestore using PGresult's metadata
    1161              :  * (in this case the PGresult might contain either zero or one row).
    1162              :  */
    1163              : static void
    1164          152 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
    1165              : {
    1166          152 :     int         nfields = PQnfields(res);
    1167              :     HeapTuple   tuple;
    1168              :     int         i;
    1169              :     MemoryContext oldcontext;
    1170              : 
    1171          152 :     if (first)
    1172              :     {
    1173              :         /* Prepare for new result set */
    1174           21 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1175              :         TupleDesc   tupdesc;
    1176              : 
    1177              :         /*
    1178              :          * It's possible to get more than one result set if the query string
    1179              :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1180              :          * traditional behavior of throwing away all but the last result.
    1181              :          */
    1182           21 :         if (sinfo->tuplestore)
    1183            0 :             tuplestore_end(sinfo->tuplestore);
    1184           21 :         sinfo->tuplestore = NULL;
    1185              : 
    1186              :         /* get a tuple descriptor for our result type */
    1187           21 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1188              :         {
    1189           21 :             case TYPEFUNC_COMPOSITE:
    1190              :                 /* success */
    1191           21 :                 break;
    1192            0 :             case TYPEFUNC_RECORD:
    1193              :                 /* failed to determine actual type of RECORD */
    1194            0 :                 ereport(ERROR,
    1195              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1196              :                          errmsg("function returning record called in context "
    1197              :                                 "that cannot accept type record")));
    1198              :                 break;
    1199            0 :             default:
    1200              :                 /* result type isn't composite */
    1201            0 :                 elog(ERROR, "return type must be a row type");
    1202              :                 break;
    1203              :         }
    1204              : 
    1205              :         /* make sure we have a persistent copy of the tupdesc */
    1206           21 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1207              : 
    1208              :         /* check result and tuple descriptor have the same number of columns */
    1209           21 :         if (nfields != tupdesc->natts)
    1210            0 :             ereport(ERROR,
    1211              :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1212              :                      errmsg("remote query result rowtype does not match "
    1213              :                             "the specified FROM clause rowtype")));
    1214              : 
    1215              :         /* Prepare attinmeta for later data conversions */
    1216           21 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1217              : 
    1218              :         /* Create a new, empty tuplestore */
    1219           21 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1220           21 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1221           21 :         rsinfo->setResult = sinfo->tuplestore;
    1222           21 :         rsinfo->setDesc = tupdesc;
    1223           21 :         MemoryContextSwitchTo(oldcontext);
    1224              : 
    1225              :         /* Done if empty resultset */
    1226           21 :         if (PQntuples(res) == 0)
    1227            0 :             return;
    1228              : 
    1229              :         /*
    1230              :          * Set up sufficiently-wide string pointers array; this won't change
    1231              :          * in size so it's easy to preallocate.
    1232              :          */
    1233           21 :         if (sinfo->cstrs)
    1234            0 :             pfree(sinfo->cstrs);
    1235           21 :         sinfo->cstrs = palloc_array(char *, nfields);
    1236              :     }
    1237              : 
    1238              :     /* Should have a single-row result if we get here */
    1239              :     Assert(PQntuples(res) == 1);
    1240              : 
    1241              :     /*
    1242              :      * Do the following work in a temp context that we reset after each tuple.
    1243              :      * This cleans up not only the data we have direct access to, but any
    1244              :      * cruft the I/O functions might leak.
    1245              :      */
    1246          152 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1247              : 
    1248              :     /*
    1249              :      * Fill cstrs with null-terminated strings of column values.
    1250              :      */
    1251          570 :     for (i = 0; i < nfields; i++)
    1252              :     {
    1253          418 :         if (PQgetisnull(res, 0, i))
    1254            0 :             sinfo->cstrs[i] = NULL;
    1255              :         else
    1256          418 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1257              :     }
    1258              : 
    1259              :     /* Convert row to a tuple, and add it to the tuplestore */
    1260          152 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1261              : 
    1262          152 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1263              : 
    1264              :     /* Clean up */
    1265          152 :     MemoryContextSwitchTo(oldcontext);
    1266          152 :     MemoryContextReset(sinfo->tmpcontext);
    1267              : }
    1268              : 
    1269              : /*
    1270              :  * List all open dblink connections by name.
    1271              :  * Returns an array of all connection names.
    1272              :  * Takes no params
    1273              :  */
    1274            3 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1275              : Datum
    1276            1 : dblink_get_connections(PG_FUNCTION_ARGS)
    1277              : {
    1278              :     HASH_SEQ_STATUS status;
    1279              :     remoteConnHashEnt *hentry;
    1280            1 :     ArrayBuildState *astate = NULL;
    1281              : 
    1282            1 :     if (remoteConnHash)
    1283              :     {
    1284            1 :         hash_seq_init(&status, remoteConnHash);
    1285            4 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1286              :         {
    1287              :             /* ignore it if it's not an open connection */
    1288            3 :             if (hentry->rconn.conn == NULL)
    1289            0 :                 continue;
    1290              :             /* stash away current value */
    1291            3 :             astate = accumArrayResult(astate,
    1292            3 :                                       CStringGetTextDatum(hentry->name),
    1293              :                                       false, TEXTOID, CurrentMemoryContext);
    1294              :         }
    1295              :     }
    1296              : 
    1297            1 :     if (astate)
    1298            1 :         PG_RETURN_DATUM(makeArrayResult(astate,
    1299              :                                         CurrentMemoryContext));
    1300              :     else
    1301            0 :         PG_RETURN_NULL();
    1302              : }
    1303              : 
    1304              : /*
    1305              :  * Checks if a given remote connection is busy
    1306              :  *
    1307              :  * Returns 1 if the connection is busy, 0 otherwise
    1308              :  * Params:
    1309              :  *  text connection_name - name of the connection to check
    1310              :  *
    1311              :  */
    1312            3 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1313              : Datum
    1314            1 : dblink_is_busy(PG_FUNCTION_ARGS)
    1315              : {
    1316              :     PGconn     *conn;
    1317              : 
    1318            1 :     dblink_init();
    1319            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1320              : 
    1321            1 :     PQconsumeInput(conn);
    1322            1 :     PG_RETURN_INT32(PQisBusy(conn));
    1323              : }
    1324              : 
    1325              : /*
    1326              :  * Cancels a running request on a connection
    1327              :  *
    1328              :  * Returns text:
    1329              :  *  "OK" if the cancel request has been sent correctly,
    1330              :  *      an error message otherwise
    1331              :  *
    1332              :  * Params:
    1333              :  *  text connection_name - name of the connection to check
    1334              :  *
    1335              :  */
    1336            3 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1337              : Datum
    1338            1 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1339              : {
    1340              :     PGconn     *conn;
    1341              :     const char *msg;
    1342              :     TimestampTz endtime;
    1343              : 
    1344            1 :     dblink_init();
    1345            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1346            1 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1347              :                                           30000);
    1348            1 :     msg = libpqsrv_cancel(conn, endtime);
    1349            1 :     if (msg == NULL)
    1350            1 :         msg = "OK";
    1351              : 
    1352            1 :     PG_RETURN_TEXT_P(cstring_to_text(msg));
    1353              : }
    1354              : 
    1355              : 
    1356              : /*
    1357              :  * Get error message from a connection
    1358              :  *
    1359              :  * Returns text:
    1360              :  *  "OK" if no error, an error message otherwise
    1361              :  *
    1362              :  * Params:
    1363              :  *  text connection_name - name of the connection to check
    1364              :  *
    1365              :  */
    1366            3 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1367              : Datum
    1368            1 : dblink_error_message(PG_FUNCTION_ARGS)
    1369              : {
    1370              :     char       *msg;
    1371              :     PGconn     *conn;
    1372              : 
    1373            1 :     dblink_init();
    1374            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1375              : 
    1376            1 :     msg = PQerrorMessage(conn);
    1377            1 :     if (msg == NULL || msg[0] == '\0')
    1378            1 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1379              :     else
    1380            0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1381              : }
    1382              : 
    1383              : /*
    1384              :  * Execute an SQL non-SELECT command
    1385              :  */
    1386           13 : PG_FUNCTION_INFO_V1(dblink_exec);
    1387              : Datum
    1388           26 : dblink_exec(PG_FUNCTION_ARGS)
    1389              : {
    1390           26 :     text       *volatile sql_cmd_status = NULL;
    1391           26 :     PGconn     *volatile conn = NULL;
    1392           26 :     volatile bool freeconn = false;
    1393              : 
    1394           26 :     dblink_init();
    1395              : 
    1396           26 :     PG_TRY();
    1397              :     {
    1398           26 :         PGresult   *res = NULL;
    1399           26 :         char       *sql = NULL;
    1400           26 :         char       *conname = NULL;
    1401           26 :         bool        fail = true;    /* default to backward compatible behavior */
    1402              : 
    1403           26 :         if (PG_NARGS() == 3)
    1404              :         {
    1405              :             /* must be text,text,bool */
    1406            0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1407            0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1408            0 :             fail = PG_GETARG_BOOL(2);
    1409            0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1410              :         }
    1411           26 :         else if (PG_NARGS() == 2)
    1412              :         {
    1413              :             /* might be text,text or text,bool */
    1414           17 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1415              :             {
    1416            1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1417            1 :                 fail = PG_GETARG_BOOL(1);
    1418            1 :                 conn = pconn->conn;
    1419              :             }
    1420              :             else
    1421              :             {
    1422           16 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1423           16 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1424           16 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1425              :             }
    1426              :         }
    1427            9 :         else if (PG_NARGS() == 1)
    1428              :         {
    1429              :             /* must be single text argument */
    1430            9 :             conn = pconn->conn;
    1431            9 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1432              :         }
    1433              :         else
    1434              :             /* shouldn't happen */
    1435            0 :             elog(ERROR, "wrong number of arguments");
    1436              : 
    1437           26 :         if (!conn)
    1438            0 :             dblink_conn_not_avail(conname);
    1439              : 
    1440           26 :         res = libpqsrv_exec(conn, sql, dblink_we_get_result);
    1441           26 :         if (!res ||
    1442           26 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1443            2 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1444              :         {
    1445            2 :             dblink_res_error(conn, conname, res, fail,
    1446              :                              "while executing command");
    1447              : 
    1448              :             /*
    1449              :              * and save a copy of the command status string to return as our
    1450              :              * result tuple
    1451              :              */
    1452            1 :             sql_cmd_status = cstring_to_text("ERROR");
    1453              :         }
    1454           24 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1455              :         {
    1456              :             /*
    1457              :              * and save a copy of the command status string to return as our
    1458              :              * result tuple
    1459              :              */
    1460           24 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1461           24 :             PQclear(res);
    1462              :         }
    1463              :         else
    1464              :         {
    1465            0 :             PQclear(res);
    1466            0 :             ereport(ERROR,
    1467              :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1468              :                      errmsg("statement returning results not allowed")));
    1469              :         }
    1470              :     }
    1471            1 :     PG_FINALLY();
    1472              :     {
    1473              :         /* if needed, close the connection to the database */
    1474           26 :         if (freeconn)
    1475            1 :             libpqsrv_disconnect(conn);
    1476              :     }
    1477           26 :     PG_END_TRY();
    1478              : 
    1479           25 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1480              : }
    1481              : 
    1482              : 
    1483              : /*
    1484              :  * dblink_get_pkey
    1485              :  *
    1486              :  * Return list of primary key fields for the supplied relation,
    1487              :  * or NULL if none exists.
    1488              :  */
    1489            3 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1490              : Datum
    1491            9 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1492              : {
    1493              :     int16       indnkeyatts;
    1494              :     char      **results;
    1495              :     FuncCallContext *funcctx;
    1496              :     int32       call_cntr;
    1497              :     int32       max_calls;
    1498              :     AttInMetadata *attinmeta;
    1499              :     MemoryContext oldcontext;
    1500              : 
    1501              :     /* stuff done only on the first call of the function */
    1502            9 :     if (SRF_IS_FIRSTCALL())
    1503              :     {
    1504              :         Relation    rel;
    1505              :         TupleDesc   tupdesc;
    1506              : 
    1507              :         /* create a function context for cross-call persistence */
    1508            3 :         funcctx = SRF_FIRSTCALL_INIT();
    1509              : 
    1510              :         /*
    1511              :          * switch to memory context appropriate for multiple function calls
    1512              :          */
    1513            3 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1514              : 
    1515              :         /* open target relation */
    1516            3 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1517              : 
    1518              :         /* get the array of attnums */
    1519            3 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1520              : 
    1521            3 :         relation_close(rel, AccessShareLock);
    1522              : 
    1523              :         /*
    1524              :          * need a tuple descriptor representing one INT and one TEXT column
    1525              :          */
    1526            3 :         tupdesc = CreateTemplateTupleDesc(2);
    1527            3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1528              :                            INT4OID, -1, 0);
    1529            3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1530              :                            TEXTOID, -1, 0);
    1531              : 
    1532              :         /*
    1533              :          * Generate attribute metadata needed later to produce tuples from raw
    1534              :          * C strings
    1535              :          */
    1536            3 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1537            3 :         funcctx->attinmeta = attinmeta;
    1538              : 
    1539            3 :         if ((results != NULL) && (indnkeyatts > 0))
    1540              :         {
    1541            3 :             funcctx->max_calls = indnkeyatts;
    1542              : 
    1543              :             /* got results, keep track of them */
    1544            3 :             funcctx->user_fctx = results;
    1545              :         }
    1546              :         else
    1547              :         {
    1548              :             /* fast track when no results */
    1549            0 :             MemoryContextSwitchTo(oldcontext);
    1550            0 :             SRF_RETURN_DONE(funcctx);
    1551              :         }
    1552              : 
    1553            3 :         MemoryContextSwitchTo(oldcontext);
    1554              :     }
    1555              : 
    1556              :     /* stuff done on every call of the function */
    1557            9 :     funcctx = SRF_PERCALL_SETUP();
    1558              : 
    1559              :     /*
    1560              :      * initialize per-call variables
    1561              :      */
    1562            9 :     call_cntr = funcctx->call_cntr;
    1563            9 :     max_calls = funcctx->max_calls;
    1564              : 
    1565            9 :     results = (char **) funcctx->user_fctx;
    1566            9 :     attinmeta = funcctx->attinmeta;
    1567              : 
    1568            9 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1569              :     {
    1570              :         char      **values;
    1571              :         HeapTuple   tuple;
    1572              :         Datum       result;
    1573              : 
    1574            6 :         values = palloc_array(char *, 2);
    1575            6 :         values[0] = psprintf("%d", call_cntr + 1);
    1576            6 :         values[1] = results[call_cntr];
    1577              : 
    1578              :         /* build the tuple */
    1579            6 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1580              : 
    1581              :         /* make the tuple into a datum */
    1582            6 :         result = HeapTupleGetDatum(tuple);
    1583              : 
    1584            6 :         SRF_RETURN_NEXT(funcctx, result);
    1585              :     }
    1586              :     else
    1587              :     {
    1588              :         /* do when there is no more left */
    1589            3 :         SRF_RETURN_DONE(funcctx);
    1590              :     }
    1591              : }
    1592              : 
    1593              : 
    1594              : /*
    1595              :  * dblink_build_sql_insert
    1596              :  *
    1597              :  * Used to generate an SQL insert statement
    1598              :  * based on an existing tuple in a local relation.
    1599              :  * This is useful for selectively replicating data
    1600              :  * to another server via dblink.
    1601              :  *
    1602              :  * API:
    1603              :  * <relname> - name of local table of interest
    1604              :  * <pkattnums> - an int2vector of attnums which will be used
    1605              :  * to identify the local tuple of interest
    1606              :  * <pknumatts> - number of attnums in pkattnums
    1607              :  * <src_pkattvals_arry> - text array of key values which will be used
    1608              :  * to identify the local tuple of interest
    1609              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1610              :  * to build the string for execution remotely. These are substituted
    1611              :  * for their counterparts in src_pkattvals_arry
    1612              :  */
    1613            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1614              : Datum
    1615            6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1616              : {
    1617            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1618            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1619            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1620            6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1621            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1622              :     Relation    rel;
    1623              :     int        *pkattnums;
    1624              :     int         pknumatts;
    1625              :     char      **src_pkattvals;
    1626              :     char      **tgt_pkattvals;
    1627              :     int         src_nitems;
    1628              :     int         tgt_nitems;
    1629              :     char       *sql;
    1630              : 
    1631              :     /*
    1632              :      * Open target relation.
    1633              :      */
    1634            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1635              : 
    1636              :     /*
    1637              :      * Process pkattnums argument.
    1638              :      */
    1639            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1640              :                        &pkattnums, &pknumatts);
    1641              : 
    1642              :     /*
    1643              :      * Source array is made up of key values that will be used to locate the
    1644              :      * tuple of interest from the local system.
    1645              :      */
    1646            4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1647              : 
    1648              :     /*
    1649              :      * There should be one source array key value for each key attnum
    1650              :      */
    1651            4 :     if (src_nitems != pknumatts)
    1652            0 :         ereport(ERROR,
    1653              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1654              :                  errmsg("source key array length must match number of key attributes")));
    1655              : 
    1656              :     /*
    1657              :      * Target array is made up of key values that will be used to build the
    1658              :      * SQL string for use on the remote system.
    1659              :      */
    1660            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1661              : 
    1662              :     /*
    1663              :      * There should be one target array key value for each key attnum
    1664              :      */
    1665            4 :     if (tgt_nitems != pknumatts)
    1666            0 :         ereport(ERROR,
    1667              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1668              :                  errmsg("target key array length must match number of key attributes")));
    1669              : 
    1670              :     /*
    1671              :      * Prep work is finally done. Go get the SQL string.
    1672              :      */
    1673            4 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1674              : 
    1675              :     /*
    1676              :      * Now we can close the relation.
    1677              :      */
    1678            4 :     relation_close(rel, AccessShareLock);
    1679              : 
    1680              :     /*
    1681              :      * And send it
    1682              :      */
    1683            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1684              : }
    1685              : 
    1686              : 
    1687              : /*
    1688              :  * dblink_build_sql_delete
    1689              :  *
    1690              :  * Used to generate an SQL delete statement.
    1691              :  * This is useful for selectively replicating a
    1692              :  * delete to another server via dblink.
    1693              :  *
    1694              :  * API:
    1695              :  * <relname> - name of remote table of interest
    1696              :  * <pkattnums> - an int2vector of attnums which will be used
    1697              :  * to identify the remote tuple of interest
    1698              :  * <pknumatts> - number of attnums in pkattnums
    1699              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1700              :  * to build the string for execution remotely.
    1701              :  */
    1702            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1703              : Datum
    1704            6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1705              : {
    1706            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1707            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1708            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1709            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1710              :     Relation    rel;
    1711              :     int        *pkattnums;
    1712              :     int         pknumatts;
    1713              :     char      **tgt_pkattvals;
    1714              :     int         tgt_nitems;
    1715              :     char       *sql;
    1716              : 
    1717              :     /*
    1718              :      * Open target relation.
    1719              :      */
    1720            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1721              : 
    1722              :     /*
    1723              :      * Process pkattnums argument.
    1724              :      */
    1725            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1726              :                        &pkattnums, &pknumatts);
    1727              : 
    1728              :     /*
    1729              :      * Target array is made up of key values that will be used to build the
    1730              :      * SQL string for use on the remote system.
    1731              :      */
    1732            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1733              : 
    1734              :     /*
    1735              :      * There should be one target array key value for each key attnum
    1736              :      */
    1737            4 :     if (tgt_nitems != pknumatts)
    1738            0 :         ereport(ERROR,
    1739              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1740              :                  errmsg("target key array length must match number of key attributes")));
    1741              : 
    1742              :     /*
    1743              :      * Prep work is finally done. Go get the SQL string.
    1744              :      */
    1745            4 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1746              : 
    1747              :     /*
    1748              :      * Now we can close the relation.
    1749              :      */
    1750            4 :     relation_close(rel, AccessShareLock);
    1751              : 
    1752              :     /*
    1753              :      * And send it
    1754              :      */
    1755            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1756              : }
    1757              : 
    1758              : 
    1759              : /*
    1760              :  * dblink_build_sql_update
    1761              :  *
    1762              :  * Used to generate an SQL update statement
    1763              :  * based on an existing tuple in a local relation.
    1764              :  * This is useful for selectively replicating data
    1765              :  * to another server via dblink.
    1766              :  *
    1767              :  * API:
    1768              :  * <relname> - name of local table of interest
    1769              :  * <pkattnums> - an int2vector of attnums which will be used
    1770              :  * to identify the local tuple of interest
    1771              :  * <pknumatts> - number of attnums in pkattnums
    1772              :  * <src_pkattvals_arry> - text array of key values which will be used
    1773              :  * to identify the local tuple of interest
    1774              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1775              :  * to build the string for execution remotely. These are substituted
    1776              :  * for their counterparts in src_pkattvals_arry
    1777              :  */
    1778            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1779              : Datum
    1780            6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1781              : {
    1782            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1783            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1784            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1785            6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1786            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1787              :     Relation    rel;
    1788              :     int        *pkattnums;
    1789              :     int         pknumatts;
    1790              :     char      **src_pkattvals;
    1791              :     char      **tgt_pkattvals;
    1792              :     int         src_nitems;
    1793              :     int         tgt_nitems;
    1794              :     char       *sql;
    1795              : 
    1796              :     /*
    1797              :      * Open target relation.
    1798              :      */
    1799            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1800              : 
    1801              :     /*
    1802              :      * Process pkattnums argument.
    1803              :      */
    1804            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1805              :                        &pkattnums, &pknumatts);
    1806              : 
    1807              :     /*
    1808              :      * Source array is made up of key values that will be used to locate the
    1809              :      * tuple of interest from the local system.
    1810              :      */
    1811            4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1812              : 
    1813              :     /*
    1814              :      * There should be one source array key value for each key attnum
    1815              :      */
    1816            4 :     if (src_nitems != pknumatts)
    1817            0 :         ereport(ERROR,
    1818              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1819              :                  errmsg("source key array length must match number of key attributes")));
    1820              : 
    1821              :     /*
    1822              :      * Target array is made up of key values that will be used to build the
    1823              :      * SQL string for use on the remote system.
    1824              :      */
    1825            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1826              : 
    1827              :     /*
    1828              :      * There should be one target array key value for each key attnum
    1829              :      */
    1830            4 :     if (tgt_nitems != pknumatts)
    1831            0 :         ereport(ERROR,
    1832              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1833              :                  errmsg("target key array length must match number of key attributes")));
    1834              : 
    1835              :     /*
    1836              :      * Prep work is finally done. Go get the SQL string.
    1837              :      */
    1838            4 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1839              : 
    1840              :     /*
    1841              :      * Now we can close the relation.
    1842              :      */
    1843            4 :     relation_close(rel, AccessShareLock);
    1844              : 
    1845              :     /*
    1846              :      * And send it
    1847              :      */
    1848            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1849              : }
    1850              : 
    1851              : /*
    1852              :  * dblink_current_query
    1853              :  * return the current query string
    1854              :  * to allow its use in (among other things)
    1855              :  * rewrite rules
    1856              :  */
    1857            2 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1858              : Datum
    1859            0 : dblink_current_query(PG_FUNCTION_ARGS)
    1860              : {
    1861              :     /* This is now just an alias for the built-in function current_query() */
    1862            0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1863              : }
    1864              : 
    1865              : /*
    1866              :  * Retrieve async notifications for a connection.
    1867              :  *
    1868              :  * Returns a setof record of notifications, or an empty set if none received.
    1869              :  * Can optionally take a named connection as parameter, but uses the unnamed
    1870              :  * connection per default.
    1871              :  *
    1872              :  */
    1873              : #define DBLINK_NOTIFY_COLS      3
    1874              : 
    1875            5 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1876              : Datum
    1877            2 : dblink_get_notify(PG_FUNCTION_ARGS)
    1878              : {
    1879              :     PGconn     *conn;
    1880              :     PGnotify   *notify;
    1881            2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1882              : 
    1883            2 :     dblink_init();
    1884            2 :     if (PG_NARGS() == 1)
    1885            0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1886              :     else
    1887            2 :         conn = pconn->conn;
    1888              : 
    1889            2 :     InitMaterializedSRF(fcinfo, 0);
    1890              : 
    1891            2 :     PQconsumeInput(conn);
    1892            4 :     while ((notify = PQnotifies(conn)) != NULL)
    1893              :     {
    1894              :         Datum       values[DBLINK_NOTIFY_COLS];
    1895              :         bool        nulls[DBLINK_NOTIFY_COLS];
    1896              : 
    1897            2 :         memset(values, 0, sizeof(values));
    1898            2 :         memset(nulls, 0, sizeof(nulls));
    1899              : 
    1900            2 :         if (notify->relname != NULL)
    1901            2 :             values[0] = CStringGetTextDatum(notify->relname);
    1902              :         else
    1903            0 :             nulls[0] = true;
    1904              : 
    1905            2 :         values[1] = Int32GetDatum(notify->be_pid);
    1906              : 
    1907            2 :         if (notify->extra != NULL)
    1908            2 :             values[2] = CStringGetTextDatum(notify->extra);
    1909              :         else
    1910            0 :             nulls[2] = true;
    1911              : 
    1912            2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1913              : 
    1914            2 :         PQfreemem(notify);
    1915            2 :         PQconsumeInput(conn);
    1916              :     }
    1917              : 
    1918            2 :     return (Datum) 0;
    1919              : }
    1920              : 
    1921              : /*
    1922              :  * Validate the options given to a dblink foreign server or user mapping.
    1923              :  * Raise an error if any option is invalid.
    1924              :  *
    1925              :  * We just check the names of options here, so semantic errors in options,
    1926              :  * such as invalid numeric format, will be detected at the attempt to connect.
    1927              :  */
    1928           14 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1929              : Datum
    1930           19 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1931              : {
    1932           19 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1933           19 :     Oid         context = PG_GETARG_OID(1);
    1934              :     ListCell   *cell;
    1935              : 
    1936              :     static const PQconninfoOption *options = NULL;
    1937              : 
    1938              :     /*
    1939              :      * Get list of valid libpq options.
    1940              :      *
    1941              :      * To avoid unnecessary work, we get the list once and use it throughout
    1942              :      * the lifetime of this backend process.  We don't need to care about
    1943              :      * memory context issues, because PQconndefaults allocates with malloc.
    1944              :      */
    1945           19 :     if (!options)
    1946              :     {
    1947           12 :         options = PQconndefaults();
    1948           12 :         if (!options)           /* assume reason for failure is OOM */
    1949            0 :             ereport(ERROR,
    1950              :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1951              :                      errmsg("out of memory"),
    1952              :                      errdetail("Could not get libpq's default connection options.")));
    1953              :     }
    1954              : 
    1955              :     /* Validate each supplied option. */
    1956           50 :     foreach(cell, options_list)
    1957              :     {
    1958           39 :         DefElem    *def = (DefElem *) lfirst(cell);
    1959              : 
    1960           39 :         if (!is_valid_dblink_fdw_option(options, def->defname, context))
    1961              :         {
    1962              :             /*
    1963              :              * Unknown option, or invalid option for the context specified, so
    1964              :              * complain about it.  Provide a hint with a valid option that
    1965              :              * looks similar, if there is one.
    1966              :              */
    1967              :             const PQconninfoOption *opt;
    1968              :             const char *closest_match;
    1969              :             ClosestMatchState match_state;
    1970            8 :             bool        has_valid_options = false;
    1971              : 
    1972            8 :             initClosestMatch(&match_state, def->defname, 4);
    1973          416 :             for (opt = options; opt->keyword; opt++)
    1974              :             {
    1975          408 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    1976              :                 {
    1977           93 :                     has_valid_options = true;
    1978           93 :                     updateClosestMatch(&match_state, opt->keyword);
    1979              :                 }
    1980              :             }
    1981              : 
    1982            8 :             closest_match = getClosestMatch(&match_state);
    1983            8 :             ereport(ERROR,
    1984              :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    1985              :                      errmsg("invalid option \"%s\"", def->defname),
    1986              :                      has_valid_options ? closest_match ?
    1987              :                      errhint("Perhaps you meant the option \"%s\".",
    1988              :                              closest_match) : 0 :
    1989              :                      errhint("There are no valid options in this context.")));
    1990              :         }
    1991              :     }
    1992              : 
    1993           11 :     PG_RETURN_VOID();
    1994              : }
    1995              : 
    1996              : 
    1997              : /*************************************************************
    1998              :  * internal functions
    1999              :  */
    2000              : 
    2001              : 
    2002              : /*
    2003              :  * get_pkey_attnames
    2004              :  *
    2005              :  * Get the primary key attnames for the given relation.
    2006              :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2007              :  */
    2008              : static char **
    2009            3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2010              : {
    2011              :     Relation    indexRelation;
    2012              :     ScanKeyData skey;
    2013              :     SysScanDesc scan;
    2014              :     HeapTuple   indexTuple;
    2015              :     int         i;
    2016            3 :     char      **result = NULL;
    2017              :     TupleDesc   tupdesc;
    2018              : 
    2019              :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2020            3 :     *indnkeyatts = 0;
    2021              : 
    2022            3 :     tupdesc = rel->rd_att;
    2023              : 
    2024              :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2025            3 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2026            3 :     ScanKeyInit(&skey,
    2027              :                 Anum_pg_index_indrelid,
    2028              :                 BTEqualStrategyNumber, F_OIDEQ,
    2029              :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2030              : 
    2031            3 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2032              :                               NULL, 1, &skey);
    2033              : 
    2034            3 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2035              :     {
    2036            3 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2037              : 
    2038              :         /* we're only interested if it is the primary key */
    2039            3 :         if (index->indisprimary)
    2040              :         {
    2041            3 :             *indnkeyatts = index->indnkeyatts;
    2042            3 :             if (*indnkeyatts > 0)
    2043              :             {
    2044            3 :                 result = palloc_array(char *, *indnkeyatts);
    2045              : 
    2046            9 :                 for (i = 0; i < *indnkeyatts; i++)
    2047            6 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2048              :             }
    2049            3 :             break;
    2050              :         }
    2051              :     }
    2052              : 
    2053            3 :     systable_endscan(scan);
    2054            3 :     table_close(indexRelation, AccessShareLock);
    2055              : 
    2056            3 :     return result;
    2057              : }
    2058              : 
    2059              : /*
    2060              :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2061              :  * returned as NULL pointers)
    2062              :  */
    2063              : static char **
    2064           20 : get_text_array_contents(ArrayType *array, int *numitems)
    2065              : {
    2066           20 :     int         ndim = ARR_NDIM(array);
    2067           20 :     int        *dims = ARR_DIMS(array);
    2068              :     int         nitems;
    2069              :     int16       typlen;
    2070              :     bool        typbyval;
    2071              :     char        typalign;
    2072              :     uint8       typalignby;
    2073              :     char      **values;
    2074              :     char       *ptr;
    2075              :     bits8      *bitmap;
    2076              :     int         bitmask;
    2077              :     int         i;
    2078              : 
    2079              :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2080              : 
    2081           20 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2082              : 
    2083           20 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2084              :                          &typlen, &typbyval, &typalign);
    2085           20 :     typalignby = typalign_to_alignby(typalign);
    2086              : 
    2087           20 :     values = palloc_array(char *, nitems);
    2088              : 
    2089           20 :     ptr = ARR_DATA_PTR(array);
    2090           20 :     bitmap = ARR_NULLBITMAP(array);
    2091           20 :     bitmask = 1;
    2092              : 
    2093           55 :     for (i = 0; i < nitems; i++)
    2094              :     {
    2095           35 :         if (bitmap && (*bitmap & bitmask) == 0)
    2096              :         {
    2097            0 :             values[i] = NULL;
    2098              :         }
    2099              :         else
    2100              :         {
    2101           35 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2102           35 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2103           35 :             ptr = (char *) att_nominal_alignby(ptr, typalignby);
    2104              :         }
    2105              : 
    2106              :         /* advance bitmap pointer if any */
    2107           35 :         if (bitmap)
    2108              :         {
    2109            0 :             bitmask <<= 1;
    2110            0 :             if (bitmask == 0x100)
    2111              :             {
    2112            0 :                 bitmap++;
    2113            0 :                 bitmask = 1;
    2114              :             }
    2115              :         }
    2116              :     }
    2117              : 
    2118           20 :     return values;
    2119              : }
    2120              : 
    2121              : static char *
    2122            4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2123              : {
    2124              :     char       *relname;
    2125              :     HeapTuple   tuple;
    2126              :     TupleDesc   tupdesc;
    2127              :     int         natts;
    2128              :     StringInfoData buf;
    2129              :     char       *val;
    2130              :     int         key;
    2131              :     int         i;
    2132              :     bool        needComma;
    2133              : 
    2134            4 :     initStringInfo(&buf);
    2135              : 
    2136              :     /* get relation name including any needed schema prefix and quoting */
    2137            4 :     relname = generate_relation_name(rel);
    2138              : 
    2139            4 :     tupdesc = rel->rd_att;
    2140            4 :     natts = tupdesc->natts;
    2141              : 
    2142            4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2143            4 :     if (!tuple)
    2144            0 :         ereport(ERROR,
    2145              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2146              :                  errmsg("source row not found")));
    2147              : 
    2148            4 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2149              : 
    2150            4 :     needComma = false;
    2151           19 :     for (i = 0; i < natts; i++)
    2152              :     {
    2153           15 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2154              : 
    2155           15 :         if (att->attisdropped)
    2156            2 :             continue;
    2157              : 
    2158           13 :         if (needComma)
    2159            9 :             appendStringInfoChar(&buf, ',');
    2160              : 
    2161           13 :         appendStringInfoString(&buf,
    2162           13 :                                quote_ident_cstr(NameStr(att->attname)));
    2163           13 :         needComma = true;
    2164              :     }
    2165              : 
    2166            4 :     appendStringInfoString(&buf, ") VALUES(");
    2167              : 
    2168              :     /*
    2169              :      * Note: i is physical column number (counting from 0).
    2170              :      */
    2171            4 :     needComma = false;
    2172           19 :     for (i = 0; i < natts; i++)
    2173              :     {
    2174           15 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2175            2 :             continue;
    2176              : 
    2177           13 :         if (needComma)
    2178            9 :             appendStringInfoChar(&buf, ',');
    2179              : 
    2180           13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2181              : 
    2182           13 :         if (key >= 0)
    2183            7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2184              :         else
    2185            6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2186              : 
    2187           13 :         if (val != NULL)
    2188              :         {
    2189           13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2190           13 :             pfree(val);
    2191              :         }
    2192              :         else
    2193            0 :             appendStringInfoString(&buf, "NULL");
    2194           13 :         needComma = true;
    2195              :     }
    2196            4 :     appendStringInfoChar(&buf, ')');
    2197              : 
    2198            4 :     return buf.data;
    2199              : }
    2200              : 
    2201              : static char *
    2202            4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2203              : {
    2204              :     char       *relname;
    2205              :     TupleDesc   tupdesc;
    2206              :     StringInfoData buf;
    2207              :     int         i;
    2208              : 
    2209            4 :     initStringInfo(&buf);
    2210              : 
    2211              :     /* get relation name including any needed schema prefix and quoting */
    2212            4 :     relname = generate_relation_name(rel);
    2213              : 
    2214            4 :     tupdesc = rel->rd_att;
    2215              : 
    2216            4 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2217           11 :     for (i = 0; i < pknumatts; i++)
    2218              :     {
    2219            7 :         int         pkattnum = pkattnums[i];
    2220            7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2221              : 
    2222            7 :         if (i > 0)
    2223            3 :             appendStringInfoString(&buf, " AND ");
    2224              : 
    2225            7 :         appendStringInfoString(&buf,
    2226            7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2227              : 
    2228            7 :         if (tgt_pkattvals[i] != NULL)
    2229            7 :             appendStringInfo(&buf, " = %s",
    2230            7 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2231              :         else
    2232            0 :             appendStringInfoString(&buf, " IS NULL");
    2233              :     }
    2234              : 
    2235            4 :     return buf.data;
    2236              : }
    2237              : 
    2238              : static char *
    2239            4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2240              : {
    2241              :     char       *relname;
    2242              :     HeapTuple   tuple;
    2243              :     TupleDesc   tupdesc;
    2244              :     int         natts;
    2245              :     StringInfoData buf;
    2246              :     char       *val;
    2247              :     int         key;
    2248              :     int         i;
    2249              :     bool        needComma;
    2250              : 
    2251            4 :     initStringInfo(&buf);
    2252              : 
    2253              :     /* get relation name including any needed schema prefix and quoting */
    2254            4 :     relname = generate_relation_name(rel);
    2255              : 
    2256            4 :     tupdesc = rel->rd_att;
    2257            4 :     natts = tupdesc->natts;
    2258              : 
    2259            4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2260            4 :     if (!tuple)
    2261            0 :         ereport(ERROR,
    2262              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2263              :                  errmsg("source row not found")));
    2264              : 
    2265            4 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2266              : 
    2267              :     /*
    2268              :      * Note: i is physical column number (counting from 0).
    2269              :      */
    2270            4 :     needComma = false;
    2271           19 :     for (i = 0; i < natts; i++)
    2272              :     {
    2273           15 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2274              : 
    2275           15 :         if (attr->attisdropped)
    2276            2 :             continue;
    2277              : 
    2278           13 :         if (needComma)
    2279            9 :             appendStringInfoString(&buf, ", ");
    2280              : 
    2281           13 :         appendStringInfo(&buf, "%s = ",
    2282           13 :                          quote_ident_cstr(NameStr(attr->attname)));
    2283              : 
    2284           13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2285              : 
    2286           13 :         if (key >= 0)
    2287            7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2288              :         else
    2289            6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2290              : 
    2291           13 :         if (val != NULL)
    2292              :         {
    2293           13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2294           13 :             pfree(val);
    2295              :         }
    2296              :         else
    2297            0 :             appendStringInfoString(&buf, "NULL");
    2298           13 :         needComma = true;
    2299              :     }
    2300              : 
    2301            4 :     appendStringInfoString(&buf, " WHERE ");
    2302              : 
    2303           11 :     for (i = 0; i < pknumatts; i++)
    2304              :     {
    2305            7 :         int         pkattnum = pkattnums[i];
    2306            7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2307              : 
    2308            7 :         if (i > 0)
    2309            3 :             appendStringInfoString(&buf, " AND ");
    2310              : 
    2311            7 :         appendStringInfoString(&buf,
    2312            7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2313              : 
    2314            7 :         val = tgt_pkattvals[i];
    2315              : 
    2316            7 :         if (val != NULL)
    2317            7 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2318              :         else
    2319            0 :             appendStringInfoString(&buf, " IS NULL");
    2320              :     }
    2321              : 
    2322            4 :     return buf.data;
    2323              : }
    2324              : 
    2325              : /*
    2326              :  * Return a properly quoted identifier.
    2327              :  * Uses quote_ident in quote.c
    2328              :  */
    2329              : static char *
    2330           80 : quote_ident_cstr(char *rawstr)
    2331              : {
    2332              :     text       *rawstr_text;
    2333              :     text       *result_text;
    2334              :     char       *result;
    2335              : 
    2336           80 :     rawstr_text = cstring_to_text(rawstr);
    2337           80 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2338              :                                                      PointerGetDatum(rawstr_text)));
    2339           80 :     result = text_to_cstring(result_text);
    2340              : 
    2341           80 :     return result;
    2342              : }
    2343              : 
    2344              : static int
    2345           26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2346              : {
    2347              :     int         i;
    2348              : 
    2349              :     /*
    2350              :      * Not likely a long list anyway, so just scan for the value
    2351              :      */
    2352           50 :     for (i = 0; i < pknumatts; i++)
    2353           38 :         if (key == pkattnums[i])
    2354           14 :             return i;
    2355              : 
    2356           12 :     return -1;
    2357              : }
    2358              : 
    2359              : static HeapTuple
    2360            8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2361              : {
    2362              :     char       *relname;
    2363              :     TupleDesc   tupdesc;
    2364              :     int         natts;
    2365              :     StringInfoData buf;
    2366              :     int         ret;
    2367              :     HeapTuple   tuple;
    2368              :     int         i;
    2369              : 
    2370              :     /*
    2371              :      * Connect to SPI manager
    2372              :      */
    2373            8 :     SPI_connect();
    2374              : 
    2375            8 :     initStringInfo(&buf);
    2376              : 
    2377              :     /* get relation name including any needed schema prefix and quoting */
    2378            8 :     relname = generate_relation_name(rel);
    2379              : 
    2380            8 :     tupdesc = rel->rd_att;
    2381            8 :     natts = tupdesc->natts;
    2382              : 
    2383              :     /*
    2384              :      * Build sql statement to look up tuple of interest, ie, the one matching
    2385              :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2386              :      * generate a result tuple that matches the table's physical structure,
    2387              :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2388              :      * different tupdescs and everything's very confusing.
    2389              :      */
    2390            8 :     appendStringInfoString(&buf, "SELECT ");
    2391              : 
    2392           38 :     for (i = 0; i < natts; i++)
    2393              :     {
    2394           30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2395              : 
    2396           30 :         if (i > 0)
    2397           22 :             appendStringInfoString(&buf, ", ");
    2398              : 
    2399           30 :         if (attr->attisdropped)
    2400            4 :             appendStringInfoString(&buf, "NULL");
    2401              :         else
    2402           26 :             appendStringInfoString(&buf,
    2403           26 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2404              :     }
    2405              : 
    2406            8 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2407              : 
    2408           22 :     for (i = 0; i < pknumatts; i++)
    2409              :     {
    2410           14 :         int         pkattnum = pkattnums[i];
    2411           14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2412              : 
    2413           14 :         if (i > 0)
    2414            6 :             appendStringInfoString(&buf, " AND ");
    2415              : 
    2416           14 :         appendStringInfoString(&buf,
    2417           14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2418              : 
    2419           14 :         if (src_pkattvals[i] != NULL)
    2420           14 :             appendStringInfo(&buf, " = %s",
    2421           14 :                              quote_literal_cstr(src_pkattvals[i]));
    2422              :         else
    2423            0 :             appendStringInfoString(&buf, " IS NULL");
    2424              :     }
    2425              : 
    2426              :     /*
    2427              :      * Retrieve the desired tuple
    2428              :      */
    2429            8 :     ret = SPI_exec(buf.data, 0);
    2430            8 :     pfree(buf.data);
    2431              : 
    2432              :     /*
    2433              :      * Only allow one qualifying tuple
    2434              :      */
    2435            8 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2436            0 :         ereport(ERROR,
    2437              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2438              :                  errmsg("source criteria matched more than one record")));
    2439              : 
    2440            8 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2441              :     {
    2442            8 :         SPITupleTable *tuptable = SPI_tuptable;
    2443              : 
    2444            8 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2445            8 :         SPI_finish();
    2446              : 
    2447            8 :         return tuple;
    2448              :     }
    2449              :     else
    2450              :     {
    2451              :         /*
    2452              :          * no qualifying tuples
    2453              :          */
    2454            0 :         SPI_finish();
    2455              : 
    2456            0 :         return NULL;
    2457              :     }
    2458              : 
    2459              :     /*
    2460              :      * never reached, but keep compiler quiet
    2461              :      */
    2462              :     return NULL;
    2463              : }
    2464              : 
    2465              : static void
    2466           21 : RangeVarCallbackForDblink(const RangeVar *relation,
    2467              :                           Oid relId, Oid oldRelId, void *arg)
    2468              : {
    2469              :     AclResult   aclresult;
    2470              : 
    2471           21 :     if (!OidIsValid(relId))
    2472            0 :         return;
    2473              : 
    2474           21 :     aclresult = pg_class_aclcheck(relId, GetUserId(), *((AclMode *) arg));
    2475           21 :     if (aclresult != ACLCHECK_OK)
    2476            0 :         aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relId)),
    2477            0 :                        relation->relname);
    2478              : }
    2479              : 
    2480              : /*
    2481              :  * Open the relation named by relname_text, acquire specified type of lock,
    2482              :  * verify we have specified permissions.
    2483              :  * Caller must close rel when done with it.
    2484              :  */
    2485              : static Relation
    2486           21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2487              : {
    2488              :     RangeVar   *relvar;
    2489              :     Oid         relid;
    2490              : 
    2491           21 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2492           21 :     relid = RangeVarGetRelidExtended(relvar, lockmode, 0,
    2493              :                                      RangeVarCallbackForDblink, &aclmode);
    2494              : 
    2495           21 :     return table_open(relid, NoLock);
    2496              : }
    2497              : 
    2498              : /*
    2499              :  * generate_relation_name - copied from ruleutils.c
    2500              :  *      Compute the name to display for a relation
    2501              :  *
    2502              :  * The result includes all necessary quoting and schema-prefixing.
    2503              :  */
    2504              : static char *
    2505           20 : generate_relation_name(Relation rel)
    2506              : {
    2507              :     char       *nspname;
    2508              :     char       *result;
    2509              : 
    2510              :     /* Qualify the name if not visible in search path */
    2511           20 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2512           15 :         nspname = NULL;
    2513              :     else
    2514            5 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2515              : 
    2516           20 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2517              : 
    2518           20 :     return result;
    2519              : }
    2520              : 
    2521              : 
    2522              : static remoteConn *
    2523           78 : getConnectionByName(const char *name)
    2524              : {
    2525              :     remoteConnHashEnt *hentry;
    2526              :     char       *key;
    2527              : 
    2528           78 :     if (!remoteConnHash)
    2529            5 :         remoteConnHash = createConnHash();
    2530              : 
    2531           78 :     key = pstrdup(name);
    2532           78 :     truncate_identifier(key, strlen(key), false);
    2533           78 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2534              :                                                key, HASH_FIND, NULL);
    2535              : 
    2536           78 :     if (hentry && hentry->rconn.conn != NULL)
    2537           68 :         return &hentry->rconn;
    2538              : 
    2539           10 :     return NULL;
    2540              : }
    2541              : 
    2542              : static HTAB *
    2543            6 : createConnHash(void)
    2544              : {
    2545              :     HASHCTL     ctl;
    2546              : 
    2547            6 :     ctl.keysize = NAMEDATALEN;
    2548            6 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2549              : 
    2550            6 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2551              :                        HASH_ELEM | HASH_STRINGS);
    2552              : }
    2553              : 
    2554              : static remoteConn *
    2555           10 : createNewConnection(const char *name)
    2556              : {
    2557              :     remoteConnHashEnt *hentry;
    2558              :     bool        found;
    2559              :     char       *key;
    2560              : 
    2561           10 :     if (!remoteConnHash)
    2562            1 :         remoteConnHash = createConnHash();
    2563              : 
    2564           10 :     key = pstrdup(name);
    2565           10 :     truncate_identifier(key, strlen(key), true);
    2566           10 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2567              :                                                HASH_ENTER, &found);
    2568              : 
    2569           10 :     if (found && hentry->rconn.conn != NULL)
    2570            1 :         ereport(ERROR,
    2571              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2572              :                  errmsg("duplicate connection name")));
    2573              : 
    2574              :     /* New, or reusable, so initialize the rconn struct to zeroes */
    2575            9 :     memset(&hentry->rconn, 0, sizeof(remoteConn));
    2576              : 
    2577            9 :     return &hentry->rconn;
    2578              : }
    2579              : 
    2580              : static void
    2581            8 : deleteConnection(const char *name)
    2582              : {
    2583              :     remoteConnHashEnt *hentry;
    2584              :     bool        found;
    2585              :     char       *key;
    2586              : 
    2587            8 :     if (!remoteConnHash)
    2588            0 :         remoteConnHash = createConnHash();
    2589              : 
    2590            8 :     key = pstrdup(name);
    2591            8 :     truncate_identifier(key, strlen(key), false);
    2592            8 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2593              :                                                key, HASH_REMOVE, &found);
    2594              : 
    2595            8 :     if (!hentry)
    2596            0 :         ereport(ERROR,
    2597              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2598              :                  errmsg("undefined connection name")));
    2599            8 : }
    2600              : 
    2601              :  /*
    2602              :   * Ensure that require_auth and SCRAM keys are correctly set on connstr.
    2603              :   * SCRAM keys used to pass-through are coming from the initial connection
    2604              :   * from the client with the server.
    2605              :   *
    2606              :   * All required SCRAM options are set by dblink, so we just need to ensure
    2607              :   * that these options are not overwritten by the user.
    2608              :   *
    2609              :   * See appendSCRAMKeysInfo and its usage for more.
    2610              :   */
    2611              : bool
    2612            5 : dblink_connstr_has_required_scram_options(const char *connstr)
    2613              : {
    2614              :     PQconninfoOption *options;
    2615            5 :     bool        has_scram_server_key = false;
    2616            5 :     bool        has_scram_client_key = false;
    2617            5 :     bool        has_require_auth = false;
    2618            5 :     bool        has_scram_keys = false;
    2619              : 
    2620            5 :     options = PQconninfoParse(connstr, NULL);
    2621            5 :     if (options)
    2622              :     {
    2623              :         /*
    2624              :          * Continue iterating even if we found the keys that we need to
    2625              :          * validate to make sure that there is no other declaration of these
    2626              :          * keys that can overwrite the first.
    2627              :          */
    2628          260 :         for (PQconninfoOption *option = options; option->keyword != NULL; option++)
    2629              :         {
    2630          255 :             if (strcmp(option->keyword, "require_auth") == 0)
    2631              :             {
    2632            5 :                 if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
    2633            4 :                     has_require_auth = true;
    2634              :                 else
    2635            1 :                     has_require_auth = false;
    2636              :             }
    2637              : 
    2638          255 :             if (strcmp(option->keyword, "scram_client_key") == 0)
    2639              :             {
    2640            5 :                 if (option->val != NULL && option->val[0] != '\0')
    2641            5 :                     has_scram_client_key = true;
    2642              :                 else
    2643            0 :                     has_scram_client_key = false;
    2644              :             }
    2645              : 
    2646          255 :             if (strcmp(option->keyword, "scram_server_key") == 0)
    2647              :             {
    2648            5 :                 if (option->val != NULL && option->val[0] != '\0')
    2649            5 :                     has_scram_server_key = true;
    2650              :                 else
    2651            0 :                     has_scram_server_key = false;
    2652              :             }
    2653              :         }
    2654            5 :         PQconninfoFree(options);
    2655              :     }
    2656              : 
    2657            5 :     has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
    2658              : 
    2659            5 :     return (has_scram_keys && has_require_auth);
    2660              : }
    2661              : 
    2662              : /*
    2663              :  * We need to make sure that the connection made used credentials
    2664              :  * which were provided by the user, so check what credentials were
    2665              :  * used to connect and then make sure that they came from the user.
    2666              :  *
    2667              :  * On failure, we close "conn" and also delete the hashtable entry
    2668              :  * identified by "connname" (if that's not NULL).
    2669              :  */
    2670              : static void
    2671           20 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
    2672              : {
    2673              :     /* Superuser bypasses security check */
    2674           20 :     if (superuser())
    2675           18 :         return;
    2676              : 
    2677              :     /* If password was used to connect, make sure it was one provided */
    2678            2 :     if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
    2679            0 :         return;
    2680              : 
    2681              :     /*
    2682              :      * Password was not used to connect, check if SCRAM pass-through is in
    2683              :      * use.
    2684              :      *
    2685              :      * If dblink_connstr_has_required_scram_options is true we assume that
    2686              :      * UseScramPassthrough is also true because the required SCRAM keys are
    2687              :      * only added if UseScramPassthrough is set, and the user is not allowed
    2688              :      * to add the SCRAM keys on fdw and user mapping options.
    2689              :      */
    2690            2 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2691            2 :         return;
    2692              : 
    2693              : #ifdef ENABLE_GSS
    2694              :     /* If GSSAPI creds used to connect, make sure it was one delegated */
    2695              :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
    2696              :         return;
    2697              : #endif
    2698              : 
    2699              :     /* Otherwise, fail out */
    2700            0 :     libpqsrv_disconnect(conn);
    2701            0 :     if (connname)
    2702            0 :         deleteConnection(connname);
    2703              : 
    2704            0 :     ereport(ERROR,
    2705              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2706              :              errmsg("password or GSSAPI delegated credentials required"),
    2707              :              errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
    2708              :              errhint("Ensure provided credentials match target server's authentication method.")));
    2709              : }
    2710              : 
    2711              : /*
    2712              :  * Function to check if the connection string includes an explicit
    2713              :  * password, needed to ensure that non-superuser password-based auth
    2714              :  * is using a provided password and not one picked up from the
    2715              :  * environment.
    2716              :  */
    2717              : static bool
    2718            6 : dblink_connstr_has_pw(const char *connstr)
    2719              : {
    2720              :     PQconninfoOption *options;
    2721              :     PQconninfoOption *option;
    2722            6 :     bool        connstr_gives_password = false;
    2723              : 
    2724            6 :     options = PQconninfoParse(connstr, NULL);
    2725            6 :     if (options)
    2726              :     {
    2727          312 :         for (option = options; option->keyword != NULL; option++)
    2728              :         {
    2729          306 :             if (strcmp(option->keyword, "password") == 0)
    2730              :             {
    2731            6 :                 if (option->val != NULL && option->val[0] != '\0')
    2732              :                 {
    2733            0 :                     connstr_gives_password = true;
    2734            0 :                     break;
    2735              :                 }
    2736              :             }
    2737              :         }
    2738            6 :         PQconninfoFree(options);
    2739              :     }
    2740              : 
    2741            6 :     return connstr_gives_password;
    2742              : }
    2743              : 
    2744              : /*
    2745              :  * For non-superusers, insist that the connstr specify a password, except if
    2746              :  * GSSAPI credentials have been delegated (and we check that they are used for
    2747              :  * the connection in dblink_security_check later) or if SCRAM pass-through is
    2748              :  * being used.  This prevents a password or GSSAPI credentials from being
    2749              :  * picked up from .pgpass, a service file, the environment, etc.  We don't want
    2750              :  * the postgres user's passwords or Kerberos credentials to be accessible to
    2751              :  * non-superusers. In case of SCRAM pass-through insist that the connstr
    2752              :  * has the required SCRAM pass-through options.
    2753              :  */
    2754              : static void
    2755           25 : dblink_connstr_check(const char *connstr)
    2756              : {
    2757           25 :     if (superuser())
    2758           21 :         return;
    2759              : 
    2760            4 :     if (dblink_connstr_has_pw(connstr))
    2761            0 :         return;
    2762              : 
    2763            4 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2764            2 :         return;
    2765              : 
    2766              : #ifdef ENABLE_GSS
    2767              :     if (be_gssapi_get_delegation(MyProcPort))
    2768              :         return;
    2769              : #endif
    2770              : 
    2771            2 :     ereport(ERROR,
    2772              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2773              :              errmsg("password or GSSAPI delegated credentials required"),
    2774              :              errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
    2775              : }
    2776              : 
    2777              : /*
    2778              :  * Report an error received from the remote server
    2779              :  *
    2780              :  * res: the received error result
    2781              :  * fail: true for ERROR ereport, false for NOTICE
    2782              :  * fmt and following args: sprintf-style format and values for errcontext;
    2783              :  * the resulting string should be worded like "while <some action>"
    2784              :  *
    2785              :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
    2786              :  * in which case memory context cleanup will clear it eventually).
    2787              :  */
    2788              : static void
    2789           12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2790              :                  bool fail, const char *fmt,...)
    2791              : {
    2792              :     int         level;
    2793           12 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2794           12 :     char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2795           12 :     char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2796           12 :     char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2797           12 :     char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2798              :     int         sqlstate;
    2799              :     va_list     ap;
    2800              :     char        dblink_context_msg[512];
    2801              : 
    2802           12 :     if (fail)
    2803            3 :         level = ERROR;
    2804              :     else
    2805            9 :         level = NOTICE;
    2806              : 
    2807           12 :     if (pg_diag_sqlstate)
    2808           12 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2809              :                                  pg_diag_sqlstate[1],
    2810              :                                  pg_diag_sqlstate[2],
    2811              :                                  pg_diag_sqlstate[3],
    2812              :                                  pg_diag_sqlstate[4]);
    2813              :     else
    2814            0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2815              : 
    2816              :     /*
    2817              :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2818              :      * needed because for connection-level failures, PQgetResult may just
    2819              :      * return NULL, not a PGresult at all.
    2820              :      */
    2821           12 :     if (message_primary == NULL)
    2822            0 :         message_primary = pchomp(PQerrorMessage(conn));
    2823              : 
    2824              :     /*
    2825              :      * Format the basic errcontext string.  Below, we'll add on something
    2826              :      * about the connection name.  That's a violation of the translatability
    2827              :      * guidelines about constructing error messages out of parts, but since
    2828              :      * there's no translation support for dblink, there's no need to worry
    2829              :      * about that (yet).
    2830              :      */
    2831           12 :     va_start(ap, fmt);
    2832           12 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2833           12 :     va_end(ap);
    2834              : 
    2835           12 :     ereport(level,
    2836              :             (errcode(sqlstate),
    2837              :              (message_primary != NULL && message_primary[0] != '\0') ?
    2838              :              errmsg_internal("%s", message_primary) :
    2839              :              errmsg("could not obtain message string for remote error"),
    2840              :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2841              :              message_hint ? errhint("%s", message_hint) : 0,
    2842              :              message_context ? (errcontext("%s", message_context)) : 0,
    2843              :              conname ?
    2844              :              (errcontext("%s on dblink connection named \"%s\"",
    2845              :                          dblink_context_msg, conname)) :
    2846              :              (errcontext("%s on unnamed dblink connection",
    2847              :                          dblink_context_msg))));
    2848            9 :     PQclear(res);
    2849            9 : }
    2850              : 
    2851              : /*
    2852              :  * Obtain connection string for a foreign server
    2853              :  */
    2854              : static char *
    2855           25 : get_connect_string(const char *servername)
    2856              : {
    2857           25 :     ForeignServer *foreign_server = NULL;
    2858              :     UserMapping *user_mapping;
    2859              :     ListCell   *cell;
    2860              :     StringInfoData buf;
    2861              :     ForeignDataWrapper *fdw;
    2862              :     AclResult   aclresult;
    2863              :     char       *srvname;
    2864              : 
    2865              :     static const PQconninfoOption *options = NULL;
    2866              : 
    2867           25 :     initStringInfo(&buf);
    2868              : 
    2869              :     /*
    2870              :      * Get list of valid libpq options.
    2871              :      *
    2872              :      * To avoid unnecessary work, we get the list once and use it throughout
    2873              :      * the lifetime of this backend process.  We don't need to care about
    2874              :      * memory context issues, because PQconndefaults allocates with malloc.
    2875              :      */
    2876           25 :     if (!options)
    2877              :     {
    2878            6 :         options = PQconndefaults();
    2879            6 :         if (!options)           /* assume reason for failure is OOM */
    2880            0 :             ereport(ERROR,
    2881              :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2882              :                      errmsg("out of memory"),
    2883              :                      errdetail("Could not get libpq's default connection options.")));
    2884              :     }
    2885              : 
    2886              :     /* first gather the server connstr options */
    2887           25 :     srvname = pstrdup(servername);
    2888           25 :     truncate_identifier(srvname, strlen(srvname), false);
    2889           25 :     foreign_server = GetForeignServerByName(srvname, true);
    2890              : 
    2891           25 :     if (foreign_server)
    2892              :     {
    2893            5 :         Oid         serverid = foreign_server->serverid;
    2894            5 :         Oid         fdwid = foreign_server->fdwid;
    2895            5 :         Oid         userid = GetUserId();
    2896              : 
    2897            5 :         user_mapping = GetUserMapping(userid, serverid);
    2898            5 :         fdw = GetForeignDataWrapper(fdwid);
    2899              : 
    2900              :         /* Check permissions, user must have usage on the server. */
    2901            5 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2902            5 :         if (aclresult != ACLCHECK_OK)
    2903            0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2904              : 
    2905              :         /*
    2906              :          * First append hardcoded options needed for SCRAM pass-through, so if
    2907              :          * the user overwrites these options we can ereport on
    2908              :          * dblink_connstr_check and dblink_security_check.
    2909              :          */
    2910            5 :         if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
    2911            3 :             appendSCRAMKeysInfo(&buf);
    2912              : 
    2913            5 :         foreach(cell, fdw->options)
    2914              :         {
    2915            0 :             DefElem    *def = lfirst(cell);
    2916              : 
    2917            0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2918            0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2919            0 :                                  escape_param_str(strVal(def->arg)));
    2920              :         }
    2921              : 
    2922           22 :         foreach(cell, foreign_server->options)
    2923              :         {
    2924           17 :             DefElem    *def = lfirst(cell);
    2925              : 
    2926           17 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2927           14 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2928           14 :                                  escape_param_str(strVal(def->arg)));
    2929              :         }
    2930              : 
    2931           10 :         foreach(cell, user_mapping->options)
    2932              :         {
    2933              : 
    2934            5 :             DefElem    *def = lfirst(cell);
    2935              : 
    2936            5 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2937            5 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2938            5 :                                  escape_param_str(strVal(def->arg)));
    2939              :         }
    2940              : 
    2941            5 :         return buf.data;
    2942              :     }
    2943              :     else
    2944           20 :         return NULL;
    2945              : }
    2946              : 
    2947              : /*
    2948              :  * Escaping libpq connect parameter strings.
    2949              :  *
    2950              :  * Replaces "'" with "\'" and "\" with "\\".
    2951              :  */
    2952              : static char *
    2953           19 : escape_param_str(const char *str)
    2954              : {
    2955              :     const char *cp;
    2956              :     StringInfoData buf;
    2957              : 
    2958           19 :     initStringInfo(&buf);
    2959              : 
    2960          172 :     for (cp = str; *cp; cp++)
    2961              :     {
    2962          153 :         if (*cp == '\\' || *cp == '\'')
    2963            0 :             appendStringInfoChar(&buf, '\\');
    2964          153 :         appendStringInfoChar(&buf, *cp);
    2965              :     }
    2966              : 
    2967           19 :     return buf.data;
    2968              : }
    2969              : 
    2970              : /*
    2971              :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2972              :  * functions, and translate to the internal representation.
    2973              :  *
    2974              :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2975              :  * argument (the need for the separate count argument is historical, but we
    2976              :  * still check it).  We check that each attnum corresponds to a valid,
    2977              :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2978              :  * listed twice, though the actual use-case for such things is dubious.
    2979              :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2980              :  * physical not logical column numbers; this was changed for future-proofing.
    2981              :  *
    2982              :  * The internal representation is a palloc'd int array of 0-based physical
    2983              :  * attnums.
    2984              :  */
    2985              : static void
    2986           18 : validate_pkattnums(Relation rel,
    2987              :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2988              :                    int **pkattnums, int *pknumatts)
    2989              : {
    2990           18 :     TupleDesc   tupdesc = rel->rd_att;
    2991           18 :     int         natts = tupdesc->natts;
    2992              :     int         i;
    2993              : 
    2994              :     /* Don't take more array elements than there are */
    2995           18 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    2996              : 
    2997              :     /* Must have at least one pk attnum selected */
    2998           18 :     if (pknumatts_arg <= 0)
    2999            0 :         ereport(ERROR,
    3000              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3001              :                  errmsg("number of key attributes must be > 0")));
    3002              : 
    3003              :     /* Allocate output array */
    3004           18 :     *pkattnums = palloc_array(int, pknumatts_arg);
    3005           18 :     *pknumatts = pknumatts_arg;
    3006              : 
    3007              :     /* Validate attnums and convert to internal form */
    3008           57 :     for (i = 0; i < pknumatts_arg; i++)
    3009              :     {
    3010           45 :         int         pkattnum = pkattnums_arg->values[i];
    3011              :         int         lnum;
    3012              :         int         j;
    3013              : 
    3014              :         /* Can throw error immediately if out of range */
    3015           45 :         if (pkattnum <= 0 || pkattnum > natts)
    3016            6 :             ereport(ERROR,
    3017              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3018              :                      errmsg("invalid attribute number %d", pkattnum)));
    3019              : 
    3020              :         /* Identify which physical column has this logical number */
    3021           39 :         lnum = 0;
    3022           69 :         for (j = 0; j < natts; j++)
    3023              :         {
    3024              :             /* dropped columns don't count */
    3025           69 :             if (TupleDescCompactAttr(tupdesc, j)->attisdropped)
    3026            3 :                 continue;
    3027              : 
    3028           66 :             if (++lnum == pkattnum)
    3029           39 :                 break;
    3030              :         }
    3031              : 
    3032           39 :         if (j < natts)
    3033           39 :             (*pkattnums)[i] = j;
    3034              :         else
    3035            0 :             ereport(ERROR,
    3036              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3037              :                      errmsg("invalid attribute number %d", pkattnum)));
    3038              :     }
    3039           12 : }
    3040              : 
    3041              : /*
    3042              :  * Check if the specified connection option is valid.
    3043              :  *
    3044              :  * We basically allow whatever libpq thinks is an option, with these
    3045              :  * restrictions:
    3046              :  *      debug options: disallowed
    3047              :  *      "client_encoding": disallowed
    3048              :  *      "user": valid only in USER MAPPING options
    3049              :  *      secure options (eg password): valid only in USER MAPPING options
    3050              :  *      others: valid only in FOREIGN SERVER options
    3051              :  *
    3052              :  * We disallow client_encoding because it would be overridden anyway via
    3053              :  * PQclientEncoding; allowing it to be specified would merely promote
    3054              :  * confusion.
    3055              :  */
    3056              : static bool
    3057          465 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    3058              :                        Oid context)
    3059              : {
    3060              :     const PQconninfoOption *opt;
    3061              : 
    3062              :     /* Look up the option in libpq result */
    3063        11545 :     for (opt = options; opt->keyword; opt++)
    3064              :     {
    3065        11540 :         if (strcmp(opt->keyword, option) == 0)
    3066          460 :             break;
    3067              :     }
    3068          465 :     if (opt->keyword == NULL)
    3069            5 :         return false;
    3070              : 
    3071              :     /* Disallow debug options (particularly "replication") */
    3072          460 :     if (strchr(opt->dispchar, 'D'))
    3073           34 :         return false;
    3074              : 
    3075              :     /* Disallow "client_encoding" */
    3076          426 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    3077            8 :         return false;
    3078              : 
    3079              :     /*
    3080              :      * Disallow OAuth options for now, since the builtin flow communicates on
    3081              :      * stderr by default and can't cache tokens yet.
    3082              :      */
    3083          418 :     if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
    3084           36 :         return false;
    3085              : 
    3086              :     /*
    3087              :      * If the option is "user" or marked secure, it should be specified only
    3088              :      * in USER MAPPING.  Others should be specified only in SERVER.
    3089              :      */
    3090          382 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3091              :     {
    3092           37 :         if (context != UserMappingRelationId)
    3093            9 :             return false;
    3094              :     }
    3095              :     else
    3096              :     {
    3097          345 :         if (context != ForeignServerRelationId)
    3098          234 :             return false;
    3099              :     }
    3100              : 
    3101          139 :     return true;
    3102              : }
    3103              : 
    3104              : /*
    3105              :  * Same as is_valid_dblink_option but also check for only dblink_fdw specific
    3106              :  * options.
    3107              :  */
    3108              : static bool
    3109           39 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
    3110              :                            Oid context)
    3111              : {
    3112           39 :     if (strcmp(option, "use_scram_passthrough") == 0)
    3113            4 :         return true;
    3114              : 
    3115           35 :     return is_valid_dblink_option(options, option, context);
    3116              : }
    3117              : 
    3118              : /*
    3119              :  * Copy the remote session's values of GUCs that affect datatype I/O
    3120              :  * and apply them locally in a new GUC nesting level.  Returns the new
    3121              :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3122              :  * or -1 if no new nestlevel was needed.
    3123              :  *
    3124              :  * We use the equivalent of a function SET option to allow the settings to
    3125              :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3126              :  * thrown in between, guc.c will take care of undoing the settings.
    3127              :  */
    3128              : static int
    3129           34 : applyRemoteGucs(PGconn *conn)
    3130              : {
    3131              :     static const char *const GUCsAffectingIO[] = {
    3132              :         "DateStyle",
    3133              :         "IntervalStyle"
    3134              :     };
    3135              : 
    3136           34 :     int         nestlevel = -1;
    3137              :     int         i;
    3138              : 
    3139          102 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3140              :     {
    3141           68 :         const char *gucName = GUCsAffectingIO[i];
    3142           68 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3143              :         const char *localVal;
    3144              : 
    3145              :         /*
    3146              :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3147              :          * that's okay because its output format won't be ambiguous.  So just
    3148              :          * skip the GUC if we don't get a value for it.  (We might eventually
    3149              :          * need more complicated logic with remote-version checks here.)
    3150              :          */
    3151           68 :         if (remoteVal == NULL)
    3152            0 :             continue;
    3153              : 
    3154              :         /*
    3155              :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3156              :          * have the same value.
    3157              :          */
    3158           68 :         localVal = GetConfigOption(gucName, false, false);
    3159              :         Assert(localVal != NULL);
    3160              : 
    3161           68 :         if (strcmp(remoteVal, localVal) == 0)
    3162           51 :             continue;
    3163              : 
    3164              :         /* Create new GUC nest level if we didn't already */
    3165           17 :         if (nestlevel < 0)
    3166            9 :             nestlevel = NewGUCNestLevel();
    3167              : 
    3168              :         /* Apply the option (this will throw error on failure) */
    3169           17 :         (void) set_config_option(gucName, remoteVal,
    3170              :                                  PGC_USERSET, PGC_S_SESSION,
    3171              :                                  GUC_ACTION_SAVE, true, 0, false);
    3172              :     }
    3173              : 
    3174           34 :     return nestlevel;
    3175              : }
    3176              : 
    3177              : /*
    3178              :  * Restore local GUCs after they have been overlaid with remote settings.
    3179              :  */
    3180              : static void
    3181           35 : restoreLocalGucs(int nestlevel)
    3182              : {
    3183              :     /* Do nothing if no new nestlevel was created */
    3184           35 :     if (nestlevel > 0)
    3185            8 :         AtEOXact_GUC(true, nestlevel);
    3186           35 : }
    3187              : 
    3188              : /*
    3189              :  * Append SCRAM client key and server key information from the global
    3190              :  * MyProcPort into the given StringInfo buffer.
    3191              :  */
    3192              : static void
    3193            3 : appendSCRAMKeysInfo(StringInfo buf)
    3194              : {
    3195              :     int         len;
    3196              :     int         encoded_len;
    3197              :     char       *client_key;
    3198              :     char       *server_key;
    3199              : 
    3200            3 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
    3201              :     /* don't forget the zero-terminator */
    3202            3 :     client_key = palloc0(len + 1);
    3203            3 :     encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
    3204              :                                 sizeof(MyProcPort->scram_ClientKey),
    3205              :                                 client_key, len);
    3206            3 :     if (encoded_len < 0)
    3207            0 :         elog(ERROR, "could not encode SCRAM client key");
    3208              : 
    3209            3 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
    3210              :     /* don't forget the zero-terminator */
    3211            3 :     server_key = palloc0(len + 1);
    3212            3 :     encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
    3213              :                                 sizeof(MyProcPort->scram_ServerKey),
    3214              :                                 server_key, len);
    3215            3 :     if (encoded_len < 0)
    3216            0 :         elog(ERROR, "could not encode SCRAM server key");
    3217              : 
    3218            3 :     appendStringInfo(buf, "scram_client_key='%s' ", client_key);
    3219            3 :     appendStringInfo(buf, "scram_server_key='%s' ", server_key);
    3220            3 :     appendStringInfoString(buf, "require_auth='scram-sha-256' ");
    3221              : 
    3222            3 :     pfree(client_key);
    3223            3 :     pfree(server_key);
    3224            3 : }
    3225              : 
    3226              : 
    3227              : static bool
    3228            3 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
    3229              : {
    3230              :     ListCell   *cell;
    3231              : 
    3232           12 :     foreach(cell, foreign_server->options)
    3233              :     {
    3234           12 :         DefElem    *def = lfirst(cell);
    3235              : 
    3236           12 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3237            3 :             return defGetBoolean(def);
    3238              :     }
    3239              : 
    3240            0 :     foreach(cell, user->options)
    3241              :     {
    3242            0 :         DefElem    *def = (DefElem *) lfirst(cell);
    3243              : 
    3244            0 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3245            0 :             return defGetBoolean(def);
    3246              :     }
    3247              : 
    3248            0 :     return false;
    3249              : }
        

Generated by: LCOV version 2.0-1