LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.7 % 1123 974
Test Date: 2026-05-24 12:17:26 Functions: 97.5 % 81 79
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*
       2              :  * dblink.c
       3              :  *
       4              :  * Functions returning results from a remote database
       5              :  *
       6              :  * Joe Conway <mail@joeconway.com>
       7              :  * And contributors:
       8              :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9              :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10              :  *
      11              :  * contrib/dblink/dblink.c
      12              :  * Copyright (c) 2001-2026, PostgreSQL Global Development Group
      13              :  * ALL RIGHTS RESERVED;
      14              :  *
      15              :  * Permission to use, copy, modify, and distribute this software and its
      16              :  * documentation for any purpose, without fee, and without a written agreement
      17              :  * is hereby granted, provided that the above copyright notice and this
      18              :  * paragraph and the following two paragraphs appear in all copies.
      19              :  *
      20              :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21              :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22              :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23              :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24              :  * POSSIBILITY OF SUCH DAMAGE.
      25              :  *
      26              :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27              :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28              :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29              :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30              :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31              :  *
      32              :  */
      33              : #include "postgres.h"
      34              : 
      35              : #include <limits.h>
      36              : 
      37              : #include "access/htup_details.h"
      38              : #include "access/relation.h"
      39              : #include "access/reloptions.h"
      40              : #include "access/table.h"
      41              : #include "catalog/namespace.h"
      42              : #include "catalog/pg_foreign_data_wrapper.h"
      43              : #include "catalog/pg_foreign_server.h"
      44              : #include "catalog/pg_type.h"
      45              : #include "catalog/pg_user_mapping.h"
      46              : #include "commands/defrem.h"
      47              : #include "common/base64.h"
      48              : #include "executor/spi.h"
      49              : #include "foreign/foreign.h"
      50              : #include "funcapi.h"
      51              : #include "lib/stringinfo.h"
      52              : #include "libpq-fe.h"
      53              : #include "libpq/libpq-be.h"
      54              : #include "libpq/libpq-be-fe-helpers.h"
      55              : #include "mb/pg_wchar.h"
      56              : #include "miscadmin.h"
      57              : #include "parser/scansup.h"
      58              : #include "utils/acl.h"
      59              : #include "utils/builtins.h"
      60              : #include "utils/fmgroids.h"
      61              : #include "utils/guc.h"
      62              : #include "utils/hsearch.h"
      63              : #include "utils/lsyscache.h"
      64              : #include "utils/memutils.h"
      65              : #include "utils/rel.h"
      66              : #include "utils/tuplestore.h"
      67              : #include "utils/varlena.h"
      68              : #include "utils/wait_event.h"
      69              : 
      70           17 : PG_MODULE_MAGIC_EXT(
      71              :                     .name = "dblink",
      72              :                     .version = PG_VERSION
      73              : );
      74              : 
      75              : typedef struct remoteConn
      76              : {
      77              :     PGconn     *conn;           /* Hold the remote connection */
      78              :     int         openCursorCount;    /* The number of open cursors */
      79              :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      80              : } remoteConn;
      81              : 
      82              : typedef struct storeInfo
      83              : {
      84              :     FunctionCallInfo fcinfo;
      85              :     Tuplestorestate *tuplestore;
      86              :     AttInMetadata *attinmeta;
      87              :     MemoryContext tmpcontext;
      88              :     char      **cstrs;
      89              :     /* temp storage for results to avoid leaks on exception */
      90              :     PGresult   *last_res;
      91              :     PGresult   *cur_res;
      92              : } storeInfo;
      93              : 
      94              : /*
      95              :  * Internal declarations
      96              :  */
      97              : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      98              : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      99              : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
     100              :                               PGresult *res);
     101              : static void materializeQueryResult(FunctionCallInfo fcinfo,
     102              :                                    PGconn *conn,
     103              :                                    const char *conname,
     104              :                                    const char *sql,
     105              :                                    bool fail);
     106              : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
     107              : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
     108              : static remoteConn *getConnectionByName(const char *name);
     109              : static HTAB *createConnHash(void);
     110              : static remoteConn *createNewConnection(const char *name);
     111              : static void deleteConnection(const char *name);
     112              : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     113              : static char **get_text_array_contents(ArrayType *array, int *numitems);
     114              : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     115              : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     116              : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     117              : static char *quote_ident_cstr(char *rawstr);
     118              : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     119              : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     120              : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     121              : static char *generate_relation_name(Relation rel);
     122              : static void dblink_connstr_check(const char *connstr);
     123              : static bool dblink_connstr_has_pw(const char *connstr);
     124              : static void dblink_security_check(PGconn *conn, const char *connname,
     125              :                                   const char *connstr);
     126              : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     127              :                              bool fail, const char *fmt, ...) pg_attribute_printf(5, 6);
     128              : static char *get_connect_string(const char *servername);
     129              : static char *escape_param_str(const char *str);
     130              : static void validate_pkattnums(Relation rel,
     131              :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     132              :                                int **pkattnums, int *pknumatts);
     133              : static bool is_valid_dblink_option(const PQconninfoOption *options,
     134              :                                    const char *option, Oid context);
     135              : static int  applyRemoteGucs(PGconn *conn);
     136              : static void restoreLocalGucs(int nestlevel);
     137              : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
     138              : static void appendSCRAMKeysInfo(StringInfo buf);
     139              : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
     140              :                                        Oid context);
     141              : static bool dblink_connstr_has_required_scram_options(const char *connstr);
     142              : 
     143              : /* Global */
     144              : static remoteConn *pconn = NULL;
     145              : static HTAB *remoteConnHash = NULL;
     146              : 
     147              : /* custom wait event values, retrieved from shared memory */
     148              : static uint32 dblink_we_connect = 0;
     149              : static uint32 dblink_we_get_conn = 0;
     150              : static uint32 dblink_we_get_result = 0;
     151              : 
     152              : /*
     153              :  *  Following is hash that holds multiple remote connections.
     154              :  *  Calling convention of each dblink function changes to accept
     155              :  *  connection name as the first parameter. The connection hash is
     156              :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     157              :  *
     158              :  *  To avoid potentially leaking a PGconn object in case of out-of-memory
     159              :  *  errors, we first create the hash entry, then open the PGconn.
     160              :  *  Hence, a hash entry whose rconn.conn pointer is NULL must be
     161              :  *  understood as a leftover from a failed create; it should be ignored
     162              :  *  by lookup operations, and silently replaced by create operations.
     163              :  */
     164              : 
     165              : typedef struct remoteConnHashEnt
     166              : {
     167              :     char        name[NAMEDATALEN];
     168              :     remoteConn  rconn;
     169              : } remoteConnHashEnt;
     170              : 
     171              : /* initial number of connection hashes */
     172              : #define NUMCONN 16
     173              : 
     174              : pg_noreturn static void
     175            0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     176              : {
     177            0 :     char       *msg = pchomp(PQerrorMessage(conn));
     178              : 
     179            0 :     PQclear(res);
     180            0 :     elog(ERROR, "%s: %s", p2, msg);
     181              : }
     182              : 
     183              : pg_noreturn static void
     184            3 : dblink_conn_not_avail(const char *conname)
     185              : {
     186            3 :     if (conname)
     187            1 :         ereport(ERROR,
     188              :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     189              :                  errmsg("connection \"%s\" not available", conname)));
     190              :     else
     191            2 :         ereport(ERROR,
     192              :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     193              :                  errmsg("connection not available")));
     194              : }
     195              : 
     196              : static void
     197           37 : dblink_get_conn(char *conname_or_str,
     198              :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     199              : {
     200           37 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     201              :     PGconn     *conn;
     202              :     char       *conname;
     203              :     bool        freeconn;
     204              : 
     205           37 :     if (rconn)
     206              :     {
     207           27 :         conn = rconn->conn;
     208           27 :         conname = conname_or_str;
     209           27 :         freeconn = false;
     210              :     }
     211              :     else
     212              :     {
     213              :         const char *connstr;
     214              : 
     215           10 :         connstr = get_connect_string(conname_or_str);
     216           10 :         if (connstr == NULL)
     217            6 :             connstr = conname_or_str;
     218           10 :         dblink_connstr_check(connstr);
     219              : 
     220              :         /* first time, allocate or get the custom wait event */
     221            9 :         if (dblink_we_get_conn == 0)
     222            5 :             dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
     223              : 
     224              :         /* OK to make connection */
     225            9 :         conn = libpqsrv_connect_start(connstr);
     226            9 :         PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     227              :                             "received message via remote connection");
     228            9 :         libpqsrv_connect_complete(conn, dblink_we_get_conn);
     229              : 
     230            9 :         if (PQstatus(conn) == CONNECTION_BAD)
     231              :         {
     232            3 :             char       *msg = pchomp(PQerrorMessage(conn));
     233              : 
     234            3 :             libpqsrv_disconnect(conn);
     235            3 :             ereport(ERROR,
     236              :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     237              :                      errmsg("could not establish connection"),
     238              :                      errdetail_internal("%s", msg)));
     239              :         }
     240              : 
     241            6 :         dblink_security_check(conn, NULL, connstr);
     242            6 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     243            0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     244            6 :         freeconn = true;
     245            6 :         conname = NULL;
     246              :     }
     247              : 
     248           33 :     *conn_p = conn;
     249           33 :     *conname_p = conname;
     250           33 :     *freeconn_p = freeconn;
     251           33 : }
     252              : 
     253              : static PGconn *
     254           17 : dblink_get_named_conn(const char *conname)
     255              : {
     256           17 :     remoteConn *rconn = getConnectionByName(conname);
     257              : 
     258           17 :     if (rconn)
     259           17 :         return rconn->conn;
     260              : 
     261            0 :     dblink_conn_not_avail(conname);
     262              :     return NULL;                /* keep compiler quiet */
     263              : }
     264              : 
     265              : static void
     266          124 : dblink_init(void)
     267              : {
     268          124 :     if (!pconn)
     269              :     {
     270            7 :         if (dblink_we_get_result == 0)
     271            7 :             dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
     272              : 
     273            7 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     274            7 :         pconn->conn = NULL;
     275            7 :         pconn->openCursorCount = 0;
     276            7 :         pconn->newXactForCursor = false;
     277              :     }
     278          124 : }
     279              : 
     280              : /*
     281              :  * Create a persistent connection to another database
     282              :  */
     283           13 : PG_FUNCTION_INFO_V1(dblink_connect);
     284              : Datum
     285           16 : dblink_connect(PG_FUNCTION_ARGS)
     286              : {
     287           16 :     char       *conname_or_str = NULL;
     288           16 :     char       *connstr = NULL;
     289           16 :     char       *connname = NULL;
     290              :     char       *msg;
     291           16 :     PGconn     *conn = NULL;
     292           16 :     remoteConn *rconn = NULL;
     293              : 
     294           16 :     dblink_init();
     295              : 
     296           16 :     if (PG_NARGS() == 2)
     297              :     {
     298           11 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     299           11 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     300              :     }
     301            5 :     else if (PG_NARGS() == 1)
     302            5 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     303              : 
     304              :     /* first check for valid foreign data server */
     305           16 :     connstr = get_connect_string(conname_or_str);
     306           16 :     if (connstr == NULL)
     307           14 :         connstr = conname_or_str;
     308              : 
     309              :     /* check password in connection string if not superuser */
     310           16 :     dblink_connstr_check(connstr);
     311              : 
     312              :     /* first time, allocate or get the custom wait event */
     313           15 :     if (dblink_we_connect == 0)
     314            2 :         dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
     315              : 
     316              :     /* if we need a hashtable entry, make that first, since it might fail */
     317           15 :     if (connname)
     318              :     {
     319           10 :         rconn = createNewConnection(connname);
     320              :         Assert(rconn->conn == NULL);
     321              :     }
     322              : 
     323              :     /* OK to make connection */
     324           14 :     conn = libpqsrv_connect_start(connstr);
     325           14 :     if (conn != NULL)
     326           14 :         PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     327              :                             "received message via remote connection");
     328           14 :     libpqsrv_connect_complete(conn, dblink_we_connect);
     329              : 
     330           14 :     if (PQstatus(conn) == CONNECTION_BAD)
     331              :     {
     332            0 :         msg = pchomp(PQerrorMessage(conn));
     333            0 :         libpqsrv_disconnect(conn);
     334            0 :         if (connname)
     335            0 :             deleteConnection(connname);
     336              : 
     337            0 :         ereport(ERROR,
     338              :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     339              :                  errmsg("could not establish connection"),
     340              :                  errdetail_internal("%s", msg)));
     341              :     }
     342              : 
     343              :     /* check password actually used if not superuser */
     344           14 :     dblink_security_check(conn, connname, connstr);
     345              : 
     346              :     /* attempt to set client encoding to match server encoding, if needed */
     347           14 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     348            0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     349              : 
     350              :     /* all OK, save away the conn */
     351           14 :     if (connname)
     352              :     {
     353            9 :         rconn->conn = conn;
     354              :     }
     355              :     else
     356              :     {
     357            5 :         if (pconn->conn)
     358            1 :             libpqsrv_disconnect(pconn->conn);
     359            5 :         pconn->conn = conn;
     360              :     }
     361              : 
     362           14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     363              : }
     364              : 
     365              : /*
     366              :  * Clear a persistent connection to another database
     367              :  */
     368            8 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     369              : Datum
     370           13 : dblink_disconnect(PG_FUNCTION_ARGS)
     371              : {
     372           13 :     char       *conname = NULL;
     373           13 :     remoteConn *rconn = NULL;
     374           13 :     PGconn     *conn = NULL;
     375              : 
     376           13 :     dblink_init();
     377              : 
     378           13 :     if (PG_NARGS() == 1)
     379              :     {
     380            9 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     381            9 :         rconn = getConnectionByName(conname);
     382            9 :         if (rconn)
     383            8 :             conn = rconn->conn;
     384              :     }
     385              :     else
     386            4 :         conn = pconn->conn;
     387              : 
     388           13 :     if (!conn)
     389            1 :         dblink_conn_not_avail(conname);
     390              : 
     391           12 :     libpqsrv_disconnect(conn);
     392           12 :     if (rconn)
     393            8 :         deleteConnection(conname);
     394              :     else
     395            4 :         pconn->conn = NULL;
     396              : 
     397           12 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     398              : }
     399              : 
     400              : /*
     401              :  * opens a cursor using a persistent connection
     402              :  */
     403           13 : PG_FUNCTION_INFO_V1(dblink_open);
     404              : Datum
     405            9 : dblink_open(PG_FUNCTION_ARGS)
     406              : {
     407            9 :     PGresult   *res = NULL;
     408              :     PGconn     *conn;
     409            9 :     char       *curname = NULL;
     410            9 :     char       *sql = NULL;
     411            9 :     char       *conname = NULL;
     412              :     StringInfoData buf;
     413            9 :     remoteConn *rconn = NULL;
     414            9 :     bool        fail = true;    /* default to backward compatible behavior */
     415              : 
     416            9 :     dblink_init();
     417            9 :     initStringInfo(&buf);
     418              : 
     419            9 :     if (PG_NARGS() == 2)
     420              :     {
     421              :         /* text,text */
     422            2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     423            2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     424            2 :         rconn = pconn;
     425              :     }
     426            7 :     else if (PG_NARGS() == 3)
     427              :     {
     428              :         /* might be text,text,text or text,text,bool */
     429            6 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     430              :         {
     431            1 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     432            1 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     433            1 :             fail = PG_GETARG_BOOL(2);
     434            1 :             rconn = pconn;
     435              :         }
     436              :         else
     437              :         {
     438            5 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     439            5 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     440            5 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     441            5 :             rconn = getConnectionByName(conname);
     442              :         }
     443              :     }
     444            1 :     else if (PG_NARGS() == 4)
     445              :     {
     446              :         /* text,text,text,bool */
     447            1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     448            1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     449            1 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     450            1 :         fail = PG_GETARG_BOOL(3);
     451            1 :         rconn = getConnectionByName(conname);
     452              :     }
     453              : 
     454            9 :     if (!rconn || !rconn->conn)
     455            0 :         dblink_conn_not_avail(conname);
     456              : 
     457            9 :     conn = rconn->conn;
     458              : 
     459              :     /* If we are not in a transaction, start one */
     460            9 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     461              :     {
     462            7 :         res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
     463            7 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     464            0 :             dblink_res_internalerror(conn, res, "begin error");
     465            7 :         PQclear(res);
     466            7 :         rconn->newXactForCursor = true;
     467              : 
     468              :         /*
     469              :          * Since transaction state was IDLE, we force cursor count to
     470              :          * initially be 0. This is needed as a previous ABORT might have wiped
     471              :          * out our transaction without maintaining the cursor count for us.
     472              :          */
     473            7 :         rconn->openCursorCount = 0;
     474              :     }
     475              : 
     476              :     /* if we started a transaction, increment cursor count */
     477            9 :     if (rconn->newXactForCursor)
     478            9 :         (rconn->openCursorCount)++;
     479              : 
     480            9 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     481            9 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     482            9 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     483              :     {
     484            2 :         dblink_res_error(conn, conname, res, fail,
     485              :                          "while opening cursor \"%s\"", curname);
     486            2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     487              :     }
     488              : 
     489            7 :     PQclear(res);
     490            7 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     491              : }
     492              : 
     493              : /*
     494              :  * closes a cursor
     495              :  */
     496           10 : PG_FUNCTION_INFO_V1(dblink_close);
     497              : Datum
     498            5 : dblink_close(PG_FUNCTION_ARGS)
     499              : {
     500              :     PGconn     *conn;
     501            5 :     PGresult   *res = NULL;
     502            5 :     char       *curname = NULL;
     503            5 :     char       *conname = NULL;
     504              :     StringInfoData buf;
     505            5 :     remoteConn *rconn = NULL;
     506            5 :     bool        fail = true;    /* default to backward compatible behavior */
     507              : 
     508            5 :     dblink_init();
     509            5 :     initStringInfo(&buf);
     510              : 
     511            5 :     if (PG_NARGS() == 1)
     512              :     {
     513              :         /* text */
     514            0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     515            0 :         rconn = pconn;
     516              :     }
     517            5 :     else if (PG_NARGS() == 2)
     518              :     {
     519              :         /* might be text,text or text,bool */
     520            5 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     521              :         {
     522            2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     523            2 :             fail = PG_GETARG_BOOL(1);
     524            2 :             rconn = pconn;
     525              :         }
     526              :         else
     527              :         {
     528            3 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     529            3 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     530            3 :             rconn = getConnectionByName(conname);
     531              :         }
     532              :     }
     533            5 :     if (PG_NARGS() == 3)
     534              :     {
     535              :         /* text,text,bool */
     536            0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     537            0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     538            0 :         fail = PG_GETARG_BOOL(2);
     539            0 :         rconn = getConnectionByName(conname);
     540              :     }
     541              : 
     542            5 :     if (!rconn || !rconn->conn)
     543            0 :         dblink_conn_not_avail(conname);
     544              : 
     545            5 :     conn = rconn->conn;
     546              : 
     547            5 :     appendStringInfo(&buf, "CLOSE %s", curname);
     548              : 
     549              :     /* close the cursor */
     550            5 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     551            5 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     552              :     {
     553            1 :         dblink_res_error(conn, conname, res, fail,
     554              :                          "while closing cursor \"%s\"", curname);
     555            1 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     556              :     }
     557              : 
     558            4 :     PQclear(res);
     559              : 
     560              :     /* if we started a transaction, decrement cursor count */
     561            4 :     if (rconn->newXactForCursor)
     562              :     {
     563            4 :         (rconn->openCursorCount)--;
     564              : 
     565              :         /* if count is zero, commit the transaction */
     566            4 :         if (rconn->openCursorCount == 0)
     567              :         {
     568            2 :             rconn->newXactForCursor = false;
     569              : 
     570            2 :             res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
     571            2 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     572            0 :                 dblink_res_internalerror(conn, res, "commit error");
     573            2 :             PQclear(res);
     574              :         }
     575              :     }
     576              : 
     577            4 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     578              : }
     579              : 
     580              : /*
     581              :  * Fetch results from an open cursor
     582              :  */
     583           13 : PG_FUNCTION_INFO_V1(dblink_fetch);
     584              : Datum
     585           13 : dblink_fetch(PG_FUNCTION_ARGS)
     586              : {
     587           13 :     PGresult   *res = NULL;
     588           13 :     char       *conname = NULL;
     589           13 :     remoteConn *rconn = NULL;
     590           13 :     PGconn     *conn = NULL;
     591              :     StringInfoData buf;
     592           13 :     char       *curname = NULL;
     593           13 :     int         howmany = 0;
     594           13 :     bool        fail = true;    /* default to backward compatible */
     595              : 
     596           13 :     prepTuplestoreResult(fcinfo);
     597              : 
     598           13 :     dblink_init();
     599              : 
     600           13 :     if (PG_NARGS() == 4)
     601              :     {
     602              :         /* text,text,int,bool */
     603            1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     604            1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     605            1 :         howmany = PG_GETARG_INT32(2);
     606            1 :         fail = PG_GETARG_BOOL(3);
     607              : 
     608            1 :         rconn = getConnectionByName(conname);
     609            1 :         if (rconn)
     610            1 :             conn = rconn->conn;
     611              :     }
     612           12 :     else if (PG_NARGS() == 3)
     613              :     {
     614              :         /* text,text,int or text,int,bool */
     615            8 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     616              :         {
     617            2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     618            2 :             howmany = PG_GETARG_INT32(1);
     619            2 :             fail = PG_GETARG_BOOL(2);
     620            2 :             conn = pconn->conn;
     621              :         }
     622              :         else
     623              :         {
     624            6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     625            6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     626            6 :             howmany = PG_GETARG_INT32(2);
     627              : 
     628            6 :             rconn = getConnectionByName(conname);
     629            6 :             if (rconn)
     630            6 :                 conn = rconn->conn;
     631              :         }
     632              :     }
     633            4 :     else if (PG_NARGS() == 2)
     634              :     {
     635              :         /* text,int */
     636            4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     637            4 :         howmany = PG_GETARG_INT32(1);
     638            4 :         conn = pconn->conn;
     639              :     }
     640              : 
     641           13 :     if (!conn)
     642            0 :         dblink_conn_not_avail(conname);
     643              : 
     644           13 :     initStringInfo(&buf);
     645           13 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     646              : 
     647              :     /*
     648              :      * Try to execute the query.  Note that since libpq uses malloc, the
     649              :      * PGresult will be long-lived even though we are still in a short-lived
     650              :      * memory context.
     651              :      */
     652           13 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     653           26 :     if (!res ||
     654           26 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     655           13 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     656              :     {
     657            5 :         dblink_res_error(conn, conname, res, fail,
     658              :                          "while fetching from cursor \"%s\"", curname);
     659            3 :         return (Datum) 0;
     660              :     }
     661            8 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     662              :     {
     663              :         /* cursor does not exist - closed already or bad name */
     664            0 :         PQclear(res);
     665            0 :         ereport(ERROR,
     666              :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     667              :                  errmsg("cursor \"%s\" does not exist", curname)));
     668              :     }
     669              : 
     670            8 :     materializeResult(fcinfo, conn, res);
     671            7 :     return (Datum) 0;
     672              : }
     673              : 
     674              : /*
     675              :  * Note: this is the new preferred version of dblink
     676              :  */
     677           19 : PG_FUNCTION_INFO_V1(dblink_record);
     678              : Datum
     679           29 : dblink_record(PG_FUNCTION_ARGS)
     680              : {
     681           29 :     return dblink_record_internal(fcinfo, false);
     682              : }
     683              : 
     684            4 : PG_FUNCTION_INFO_V1(dblink_send_query);
     685              : Datum
     686            6 : dblink_send_query(PG_FUNCTION_ARGS)
     687              : {
     688              :     PGconn     *conn;
     689              :     char       *sql;
     690              :     int         retval;
     691              : 
     692            6 :     if (PG_NARGS() == 2)
     693              :     {
     694            6 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     695            6 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     696              :     }
     697              :     else
     698              :         /* shouldn't happen */
     699            0 :         elog(ERROR, "wrong number of arguments");
     700              : 
     701              :     /* async query send */
     702            6 :     retval = PQsendQuery(conn, sql);
     703            6 :     if (retval != 1)
     704            0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     705              : 
     706            6 :     PG_RETURN_INT32(retval);
     707              : }
     708              : 
     709            6 : PG_FUNCTION_INFO_V1(dblink_get_result);
     710              : Datum
     711            8 : dblink_get_result(PG_FUNCTION_ARGS)
     712              : {
     713            8 :     return dblink_record_internal(fcinfo, true);
     714              : }
     715              : 
     716              : static Datum
     717           37 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     718              : {
     719           37 :     PGconn     *volatile conn = NULL;
     720           37 :     volatile bool freeconn = false;
     721              : 
     722           37 :     prepTuplestoreResult(fcinfo);
     723              : 
     724           37 :     dblink_init();
     725              : 
     726           37 :     PG_TRY();
     727              :     {
     728           37 :         char       *sql = NULL;
     729           37 :         char       *conname = NULL;
     730           37 :         bool        fail = true;    /* default to backward compatible */
     731              : 
     732           37 :         if (!is_async)
     733              :         {
     734           29 :             if (PG_NARGS() == 3)
     735              :             {
     736              :                 /* text,text,bool */
     737            1 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     738            1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     739            1 :                 fail = PG_GETARG_BOOL(2);
     740            1 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     741              :             }
     742           28 :             else if (PG_NARGS() == 2)
     743              :             {
     744              :                 /* text,text or text,bool */
     745           21 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     746              :                 {
     747            1 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     748            1 :                     fail = PG_GETARG_BOOL(1);
     749            1 :                     conn = pconn->conn;
     750              :                 }
     751              :                 else
     752              :                 {
     753           20 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     754           20 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     755           20 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     756              :                 }
     757              :             }
     758            7 :             else if (PG_NARGS() == 1)
     759              :             {
     760              :                 /* text */
     761            7 :                 conn = pconn->conn;
     762            7 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     763              :             }
     764              :             else
     765              :                 /* shouldn't happen */
     766            0 :                 elog(ERROR, "wrong number of arguments");
     767              :         }
     768              :         else                    /* is_async */
     769              :         {
     770              :             /* get async result */
     771            8 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     772              : 
     773            8 :             if (PG_NARGS() == 2)
     774              :             {
     775              :                 /* text,bool */
     776            0 :                 fail = PG_GETARG_BOOL(1);
     777            0 :                 conn = dblink_get_named_conn(conname);
     778              :             }
     779            8 :             else if (PG_NARGS() == 1)
     780              :             {
     781              :                 /* text */
     782            8 :                 conn = dblink_get_named_conn(conname);
     783              :             }
     784              :             else
     785              :                 /* shouldn't happen */
     786            0 :                 elog(ERROR, "wrong number of arguments");
     787              :         }
     788              : 
     789           33 :         if (!conn)
     790            2 :             dblink_conn_not_avail(conname);
     791              : 
     792           31 :         if (!is_async)
     793              :         {
     794              :             /* synchronous query, use efficient tuple collection method */
     795           23 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     796              :         }
     797              :         else
     798              :         {
     799              :             /* async result retrieval, do it the old way */
     800            8 :             PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
     801              : 
     802              :             /* NULL means we're all done with the async results */
     803            8 :             if (res)
     804              :             {
     805            5 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     806            5 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     807              :                 {
     808            0 :                     dblink_res_error(conn, conname, res, fail,
     809              :                                      "while executing query");
     810              :                     /* if fail isn't set, we'll return an empty query result */
     811              :                 }
     812              :                 else
     813              :                 {
     814            5 :                     materializeResult(fcinfo, conn, res);
     815              :                 }
     816              :             }
     817              :         }
     818              :     }
     819            6 :     PG_FINALLY();
     820              :     {
     821              :         /* if needed, close the connection to the database */
     822           37 :         if (freeconn)
     823            5 :             libpqsrv_disconnect(conn);
     824              :     }
     825           37 :     PG_END_TRY();
     826              : 
     827           31 :     return (Datum) 0;
     828              : }
     829              : 
     830              : /*
     831              :  * Verify function caller can handle a tuplestore result, and set up for that.
     832              :  *
     833              :  * Note: if the caller returns without actually creating a tuplestore, the
     834              :  * executor will treat the function result as an empty set.
     835              :  */
     836              : static void
     837           50 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     838              : {
     839           50 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     840              : 
     841              :     /* check to see if query supports us returning a tuplestore */
     842           50 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     843            0 :         ereport(ERROR,
     844              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     845              :                  errmsg("set-valued function called in context that cannot accept a set")));
     846           50 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     847            0 :         ereport(ERROR,
     848              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     849              :                  errmsg("materialize mode required, but it is not allowed in this context")));
     850              : 
     851              :     /* let the executor know we're sending back a tuplestore */
     852           50 :     rsinfo->returnMode = SFRM_Materialize;
     853              : 
     854              :     /* caller must fill these to return a non-empty result */
     855           50 :     rsinfo->setResult = NULL;
     856           50 :     rsinfo->setDesc = NULL;
     857           50 : }
     858              : 
     859              : /*
     860              :  * Copy the contents of the PGresult into a tuplestore to be returned
     861              :  * as the result of the current function.
     862              :  * The PGresult will be released in this function.
     863              :  */
     864              : static void
     865           13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     866              : {
     867           13 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     868              :     TupleDesc   tupdesc;
     869              :     bool        is_sql_cmd;
     870              :     int         ntuples;
     871              :     int         nfields;
     872              : 
     873              :     /* prepTuplestoreResult must have been called previously */
     874              :     Assert(rsinfo->returnMode == SFRM_Materialize);
     875              : 
     876           13 :     if (PQresultStatus(res) == PGRES_COMMAND_OK)
     877              :     {
     878            0 :         is_sql_cmd = true;
     879              : 
     880              :         /*
     881              :          * need a tuple descriptor representing one TEXT column to return the
     882              :          * command status string as our result tuple
     883              :          */
     884            0 :         tupdesc = CreateTemplateTupleDesc(1);
     885            0 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     886              :                            TEXTOID, -1, 0);
     887            0 :         TupleDescFinalize(tupdesc);
     888            0 :         ntuples = 1;
     889            0 :         nfields = 1;
     890              :     }
     891              :     else
     892              :     {
     893              :         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     894              : 
     895           13 :         is_sql_cmd = false;
     896              : 
     897              :         /* get a tuple descriptor for our result type */
     898           13 :         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     899              :         {
     900           13 :             case TYPEFUNC_COMPOSITE:
     901              :                 /* success */
     902           13 :                 break;
     903            0 :             case TYPEFUNC_RECORD:
     904              :                 /* failed to determine actual type of RECORD */
     905            0 :                 ereport(ERROR,
     906              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     907              :                          errmsg("function returning record called in context "
     908              :                                 "that cannot accept type record")));
     909              :                 break;
     910            0 :             default:
     911              :                 /* result type isn't composite */
     912            0 :                 elog(ERROR, "return type must be a row type");
     913              :                 break;
     914              :         }
     915              : 
     916              :         /* make sure we have a persistent copy of the tupdesc */
     917           13 :         tupdesc = CreateTupleDescCopy(tupdesc);
     918           13 :         ntuples = PQntuples(res);
     919           13 :         nfields = PQnfields(res);
     920              :     }
     921              : 
     922              :     /*
     923              :      * check result and tuple descriptor have the same number of columns
     924              :      */
     925           13 :     if (nfields != tupdesc->natts)
     926            0 :         ereport(ERROR,
     927              :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
     928              :                  errmsg("remote query result rowtype does not match "
     929              :                         "the specified FROM clause rowtype")));
     930              : 
     931           13 :     if (ntuples > 0)
     932              :     {
     933              :         AttInMetadata *attinmeta;
     934           13 :         int         nestlevel = -1;
     935              :         Tuplestorestate *tupstore;
     936              :         MemoryContext oldcontext;
     937              :         int         row;
     938              :         char      **values;
     939              : 
     940           13 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
     941              : 
     942              :         /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     943           13 :         if (!is_sql_cmd)
     944           13 :             nestlevel = applyRemoteGucs(conn);
     945              : 
     946           13 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     947           13 :         tupstore = tuplestore_begin_heap(true, false, work_mem);
     948           13 :         rsinfo->setResult = tupstore;
     949           13 :         rsinfo->setDesc = tupdesc;
     950           13 :         MemoryContextSwitchTo(oldcontext);
     951              : 
     952           13 :         values = palloc_array(char *, nfields);
     953              : 
     954              :         /* put all tuples into the tuplestore */
     955           49 :         for (row = 0; row < ntuples; row++)
     956              :         {
     957              :             HeapTuple   tuple;
     958              : 
     959           37 :             if (!is_sql_cmd)
     960              :             {
     961              :                 int         i;
     962              : 
     963          138 :                 for (i = 0; i < nfields; i++)
     964              :                 {
     965          101 :                     if (PQgetisnull(res, row, i))
     966            0 :                         values[i] = NULL;
     967              :                     else
     968          101 :                         values[i] = PQgetvalue(res, row, i);
     969              :                 }
     970              :             }
     971              :             else
     972              :             {
     973            0 :                 values[0] = PQcmdStatus(res);
     974              :             }
     975              : 
     976              :             /* build the tuple and put it into the tuplestore. */
     977           37 :             tuple = BuildTupleFromCStrings(attinmeta, values);
     978           36 :             tuplestore_puttuple(tupstore, tuple);
     979              :         }
     980              : 
     981              :         /* clean up GUC settings, if we changed any */
     982           12 :         restoreLocalGucs(nestlevel);
     983              :     }
     984              : 
     985           12 :     PQclear(res);
     986           12 : }
     987              : 
     988              : /*
     989              :  * Execute the given SQL command and store its results into a tuplestore
     990              :  * to be returned as the result of the current function.
     991              :  *
     992              :  * This is equivalent to PQexec followed by materializeResult, but we make
     993              :  * use of libpq's single-row mode to avoid accumulating the whole result
     994              :  * inside libpq before it gets transferred to the tuplestore.
     995              :  */
     996              : static void
     997           23 : materializeQueryResult(FunctionCallInfo fcinfo,
     998              :                        PGconn *conn,
     999              :                        const char *conname,
    1000              :                        const char *sql,
    1001              :                        bool fail)
    1002              : {
    1003           23 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1004              : 
    1005              :     /* prepTuplestoreResult must have been called previously */
    1006              :     Assert(rsinfo->returnMode == SFRM_Materialize);
    1007              : 
    1008              :     /* Use a PG_TRY block to ensure we pump libpq dry of results */
    1009           23 :     PG_TRY();
    1010              :     {
    1011           23 :         storeInfo   sinfo = {0};
    1012              :         PGresult   *res;
    1013              : 
    1014           23 :         sinfo.fcinfo = fcinfo;
    1015              :         /* Create short-lived memory context for data conversions */
    1016           23 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1017              :                                                  "dblink temporary context",
    1018              :                                                  ALLOCSET_DEFAULT_SIZES);
    1019              : 
    1020              :         /* execute query, collecting any tuples into the tuplestore */
    1021           23 :         res = storeQueryResult(&sinfo, conn, sql);
    1022              : 
    1023           23 :         if (!res ||
    1024           23 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1025           23 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1026              :         {
    1027            2 :             dblink_res_error(conn, conname, res, fail,
    1028              :                              "while executing query");
    1029              :             /* if fail isn't set, we'll return an empty query result */
    1030              :         }
    1031           21 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1032              :         {
    1033              :             /*
    1034              :              * storeRow didn't get called, so we need to convert the command
    1035              :              * status string to a tuple manually
    1036              :              */
    1037              :             TupleDesc   tupdesc;
    1038              :             AttInMetadata *attinmeta;
    1039              :             Tuplestorestate *tupstore;
    1040              :             HeapTuple   tuple;
    1041              :             char       *values[1];
    1042              :             MemoryContext oldcontext;
    1043              : 
    1044              :             /*
    1045              :              * need a tuple descriptor representing one TEXT column to return
    1046              :              * the command status string as our result tuple
    1047              :              */
    1048            0 :             tupdesc = CreateTemplateTupleDesc(1);
    1049            0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1050              :                                TEXTOID, -1, 0);
    1051            0 :             TupleDescFinalize(tupdesc);
    1052            0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1053              : 
    1054            0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1055            0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1056            0 :             rsinfo->setResult = tupstore;
    1057            0 :             rsinfo->setDesc = tupdesc;
    1058            0 :             MemoryContextSwitchTo(oldcontext);
    1059              : 
    1060            0 :             values[0] = PQcmdStatus(res);
    1061              : 
    1062              :             /* build the tuple and put it into the tuplestore. */
    1063            0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1064            0 :             tuplestore_puttuple(tupstore, tuple);
    1065              : 
    1066            0 :             PQclear(res);
    1067              :         }
    1068              :         else
    1069              :         {
    1070              :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1071              :             /* storeRow should have created a tuplestore */
    1072              :             Assert(rsinfo->setResult != NULL);
    1073              : 
    1074           21 :             PQclear(res);
    1075              :         }
    1076              : 
    1077              :         /* clean up data conversion short-lived memory context */
    1078           23 :         if (sinfo.tmpcontext != NULL)
    1079           23 :             MemoryContextDelete(sinfo.tmpcontext);
    1080              : 
    1081           23 :         PQclear(sinfo.last_res);
    1082           23 :         PQclear(sinfo.cur_res);
    1083              :     }
    1084            0 :     PG_CATCH();
    1085              :     {
    1086              :         PGresult   *res;
    1087              : 
    1088              :         /* be sure to clear out any pending data in libpq */
    1089            0 :         while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
    1090              :                NULL)
    1091            0 :             PQclear(res);
    1092            0 :         PG_RE_THROW();
    1093              :     }
    1094           23 :     PG_END_TRY();
    1095           23 : }
    1096              : 
    1097              : /*
    1098              :  * Execute query, and send any result rows to sinfo->tuplestore.
    1099              :  */
    1100              : static PGresult *
    1101           23 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
    1102              : {
    1103           23 :     bool        first = true;
    1104           23 :     int         nestlevel = -1;
    1105              :     PGresult   *res;
    1106              : 
    1107           23 :     if (!PQsendQuery(conn, sql))
    1108            0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1109              : 
    1110           23 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1111            0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1112              : 
    1113              :     for (;;)
    1114              :     {
    1115          198 :         CHECK_FOR_INTERRUPTS();
    1116              : 
    1117          198 :         sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
    1118          198 :         if (!sinfo->cur_res)
    1119           23 :             break;
    1120              : 
    1121          175 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1122              :         {
    1123              :             /* got one row from possibly-bigger resultset */
    1124              : 
    1125              :             /*
    1126              :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1127              :              * We shouldn't do this until we have a row in hand, to ensure
    1128              :              * libpq has seen any earlier ParameterStatus protocol messages.
    1129              :              */
    1130          152 :             if (first && nestlevel < 0)
    1131           21 :                 nestlevel = applyRemoteGucs(conn);
    1132              : 
    1133          152 :             storeRow(sinfo, sinfo->cur_res, first);
    1134              : 
    1135          152 :             PQclear(sinfo->cur_res);
    1136          152 :             sinfo->cur_res = NULL;
    1137          152 :             first = false;
    1138              :         }
    1139              :         else
    1140              :         {
    1141              :             /* if empty resultset, fill tuplestore header */
    1142           23 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1143            0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1144              : 
    1145              :             /* store completed result at last_res */
    1146           23 :             PQclear(sinfo->last_res);
    1147           23 :             sinfo->last_res = sinfo->cur_res;
    1148           23 :             sinfo->cur_res = NULL;
    1149           23 :             first = true;
    1150              :         }
    1151              :     }
    1152              : 
    1153              :     /* clean up GUC settings, if we changed any */
    1154           23 :     restoreLocalGucs(nestlevel);
    1155              : 
    1156              :     /* return last_res */
    1157           23 :     res = sinfo->last_res;
    1158           23 :     sinfo->last_res = NULL;
    1159           23 :     return res;
    1160              : }
    1161              : 
    1162              : /*
    1163              :  * Send single row to sinfo->tuplestore.
    1164              :  *
    1165              :  * If "first" is true, create the tuplestore using PGresult's metadata
    1166              :  * (in this case the PGresult might contain either zero or one row).
    1167              :  */
    1168              : static void
    1169          152 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
    1170              : {
    1171          152 :     int         nfields = PQnfields(res);
    1172              :     HeapTuple   tuple;
    1173              :     int         i;
    1174              :     MemoryContext oldcontext;
    1175              : 
    1176          152 :     if (first)
    1177              :     {
    1178              :         /* Prepare for new result set */
    1179           21 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1180              :         TupleDesc   tupdesc;
    1181              : 
    1182              :         /*
    1183              :          * It's possible to get more than one result set if the query string
    1184              :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1185              :          * traditional behavior of throwing away all but the last result.
    1186              :          */
    1187           21 :         if (sinfo->tuplestore)
    1188            0 :             tuplestore_end(sinfo->tuplestore);
    1189           21 :         sinfo->tuplestore = NULL;
    1190              : 
    1191              :         /* get a tuple descriptor for our result type */
    1192           21 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1193              :         {
    1194           21 :             case TYPEFUNC_COMPOSITE:
    1195              :                 /* success */
    1196           21 :                 break;
    1197            0 :             case TYPEFUNC_RECORD:
    1198              :                 /* failed to determine actual type of RECORD */
    1199            0 :                 ereport(ERROR,
    1200              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1201              :                          errmsg("function returning record called in context "
    1202              :                                 "that cannot accept type record")));
    1203              :                 break;
    1204            0 :             default:
    1205              :                 /* result type isn't composite */
    1206            0 :                 elog(ERROR, "return type must be a row type");
    1207              :                 break;
    1208              :         }
    1209              : 
    1210              :         /* make sure we have a persistent copy of the tupdesc */
    1211           21 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1212              : 
    1213              :         /* check result and tuple descriptor have the same number of columns */
    1214           21 :         if (nfields != tupdesc->natts)
    1215            0 :             ereport(ERROR,
    1216              :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1217              :                      errmsg("remote query result rowtype does not match "
    1218              :                             "the specified FROM clause rowtype")));
    1219              : 
    1220              :         /* Prepare attinmeta for later data conversions */
    1221           21 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1222              : 
    1223              :         /* Create a new, empty tuplestore */
    1224           21 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1225           21 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1226           21 :         rsinfo->setResult = sinfo->tuplestore;
    1227           21 :         rsinfo->setDesc = tupdesc;
    1228           21 :         MemoryContextSwitchTo(oldcontext);
    1229              : 
    1230              :         /* Done if empty resultset */
    1231           21 :         if (PQntuples(res) == 0)
    1232            0 :             return;
    1233              : 
    1234              :         /*
    1235              :          * Set up sufficiently-wide string pointers array; this won't change
    1236              :          * in size so it's easy to preallocate.
    1237              :          */
    1238           21 :         if (sinfo->cstrs)
    1239            0 :             pfree(sinfo->cstrs);
    1240           21 :         sinfo->cstrs = palloc_array(char *, nfields);
    1241              :     }
    1242              : 
    1243              :     /* Should have a single-row result if we get here */
    1244              :     Assert(PQntuples(res) == 1);
    1245              : 
    1246              :     /*
    1247              :      * Do the following work in a temp context that we reset after each tuple.
    1248              :      * This cleans up not only the data we have direct access to, but any
    1249              :      * cruft the I/O functions might leak.
    1250              :      */
    1251          152 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1252              : 
    1253              :     /*
    1254              :      * Fill cstrs with null-terminated strings of column values.
    1255              :      */
    1256          570 :     for (i = 0; i < nfields; i++)
    1257              :     {
    1258          418 :         if (PQgetisnull(res, 0, i))
    1259            0 :             sinfo->cstrs[i] = NULL;
    1260              :         else
    1261          418 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1262              :     }
    1263              : 
    1264              :     /* Convert row to a tuple, and add it to the tuplestore */
    1265          152 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1266              : 
    1267          152 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1268              : 
    1269              :     /* Clean up */
    1270          152 :     MemoryContextSwitchTo(oldcontext);
    1271          152 :     MemoryContextReset(sinfo->tmpcontext);
    1272              : }
    1273              : 
    1274              : /*
    1275              :  * List all open dblink connections by name.
    1276              :  * Returns an array of all connection names.
    1277              :  * Takes no params
    1278              :  */
    1279            3 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1280              : Datum
    1281            1 : dblink_get_connections(PG_FUNCTION_ARGS)
    1282              : {
    1283              :     HASH_SEQ_STATUS status;
    1284              :     remoteConnHashEnt *hentry;
    1285            1 :     ArrayBuildState *astate = NULL;
    1286              : 
    1287            1 :     if (remoteConnHash)
    1288              :     {
    1289            1 :         hash_seq_init(&status, remoteConnHash);
    1290            4 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1291              :         {
    1292              :             /* ignore it if it's not an open connection */
    1293            3 :             if (hentry->rconn.conn == NULL)
    1294            0 :                 continue;
    1295              :             /* stash away current value */
    1296            3 :             astate = accumArrayResult(astate,
    1297            3 :                                       CStringGetTextDatum(hentry->name),
    1298              :                                       false, TEXTOID, CurrentMemoryContext);
    1299              :         }
    1300              :     }
    1301              : 
    1302            1 :     if (astate)
    1303            1 :         PG_RETURN_DATUM(makeArrayResult(astate,
    1304              :                                         CurrentMemoryContext));
    1305              :     else
    1306            0 :         PG_RETURN_NULL();
    1307              : }
    1308              : 
    1309              : /*
    1310              :  * Checks if a given remote connection is busy
    1311              :  *
    1312              :  * Returns 1 if the connection is busy, 0 otherwise
    1313              :  * Params:
    1314              :  *  text connection_name - name of the connection to check
    1315              :  *
    1316              :  */
    1317            3 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1318              : Datum
    1319            1 : dblink_is_busy(PG_FUNCTION_ARGS)
    1320              : {
    1321              :     PGconn     *conn;
    1322              : 
    1323            1 :     dblink_init();
    1324            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1325              : 
    1326            1 :     PQconsumeInput(conn);
    1327            1 :     PG_RETURN_INT32(PQisBusy(conn));
    1328              : }
    1329              : 
    1330              : /*
    1331              :  * Cancels a running request on a connection
    1332              :  *
    1333              :  * Returns text:
    1334              :  *  "OK" if the cancel request has been sent correctly,
    1335              :  *      an error message otherwise
    1336              :  *
    1337              :  * Params:
    1338              :  *  text connection_name - name of the connection to check
    1339              :  *
    1340              :  */
    1341            3 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1342              : Datum
    1343            1 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1344              : {
    1345              :     PGconn     *conn;
    1346              :     const char *msg;
    1347              :     TimestampTz endtime;
    1348              : 
    1349            1 :     dblink_init();
    1350            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1351            1 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1352              :                                           30000);
    1353            1 :     msg = libpqsrv_cancel(conn, endtime);
    1354            1 :     if (msg == NULL)
    1355            1 :         msg = "OK";
    1356              : 
    1357            1 :     PG_RETURN_TEXT_P(cstring_to_text(msg));
    1358              : }
    1359              : 
    1360              : 
    1361              : /*
    1362              :  * Get error message from a connection
    1363              :  *
    1364              :  * Returns text:
    1365              :  *  "OK" if no error, an error message otherwise
    1366              :  *
    1367              :  * Params:
    1368              :  *  text connection_name - name of the connection to check
    1369              :  *
    1370              :  */
    1371            3 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1372              : Datum
    1373            1 : dblink_error_message(PG_FUNCTION_ARGS)
    1374              : {
    1375              :     char       *msg;
    1376              :     PGconn     *conn;
    1377              : 
    1378            1 :     dblink_init();
    1379            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1380              : 
    1381            1 :     msg = PQerrorMessage(conn);
    1382            1 :     if (msg == NULL || msg[0] == '\0')
    1383            1 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1384              :     else
    1385            0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1386              : }
    1387              : 
    1388              : /*
    1389              :  * Execute an SQL non-SELECT command
    1390              :  */
    1391           13 : PG_FUNCTION_INFO_V1(dblink_exec);
    1392              : Datum
    1393           26 : dblink_exec(PG_FUNCTION_ARGS)
    1394              : {
    1395           26 :     text       *volatile sql_cmd_status = NULL;
    1396           26 :     PGconn     *volatile conn = NULL;
    1397           26 :     volatile bool freeconn = false;
    1398              : 
    1399           26 :     dblink_init();
    1400              : 
    1401           26 :     PG_TRY();
    1402              :     {
    1403           26 :         PGresult   *res = NULL;
    1404           26 :         char       *sql = NULL;
    1405           26 :         char       *conname = NULL;
    1406           26 :         bool        fail = true;    /* default to backward compatible behavior */
    1407              : 
    1408           26 :         if (PG_NARGS() == 3)
    1409              :         {
    1410              :             /* must be text,text,bool */
    1411            0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1412            0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1413            0 :             fail = PG_GETARG_BOOL(2);
    1414            0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1415              :         }
    1416           26 :         else if (PG_NARGS() == 2)
    1417              :         {
    1418              :             /* might be text,text or text,bool */
    1419           17 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1420              :             {
    1421            1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1422            1 :                 fail = PG_GETARG_BOOL(1);
    1423            1 :                 conn = pconn->conn;
    1424              :             }
    1425              :             else
    1426              :             {
    1427           16 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1428           16 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1429           16 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1430              :             }
    1431              :         }
    1432            9 :         else if (PG_NARGS() == 1)
    1433              :         {
    1434              :             /* must be single text argument */
    1435            9 :             conn = pconn->conn;
    1436            9 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1437              :         }
    1438              :         else
    1439              :             /* shouldn't happen */
    1440            0 :             elog(ERROR, "wrong number of arguments");
    1441              : 
    1442           26 :         if (!conn)
    1443            0 :             dblink_conn_not_avail(conname);
    1444              : 
    1445           26 :         res = libpqsrv_exec(conn, sql, dblink_we_get_result);
    1446           26 :         if (!res ||
    1447           26 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1448            2 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1449              :         {
    1450            2 :             dblink_res_error(conn, conname, res, fail,
    1451              :                              "while executing command");
    1452              : 
    1453              :             /*
    1454              :              * and save a copy of the command status string to return as our
    1455              :              * result tuple
    1456              :              */
    1457            1 :             sql_cmd_status = cstring_to_text("ERROR");
    1458              :         }
    1459           24 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1460              :         {
    1461              :             /*
    1462              :              * and save a copy of the command status string to return as our
    1463              :              * result tuple
    1464              :              */
    1465           24 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1466           24 :             PQclear(res);
    1467              :         }
    1468              :         else
    1469              :         {
    1470            0 :             PQclear(res);
    1471            0 :             ereport(ERROR,
    1472              :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1473              :                      errmsg("statement returning results not allowed")));
    1474              :         }
    1475              :     }
    1476            1 :     PG_FINALLY();
    1477              :     {
    1478              :         /* if needed, close the connection to the database */
    1479           26 :         if (freeconn)
    1480            1 :             libpqsrv_disconnect(conn);
    1481              :     }
    1482           26 :     PG_END_TRY();
    1483              : 
    1484           25 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1485              : }
    1486              : 
    1487              : 
    1488              : /*
    1489              :  * dblink_get_pkey
    1490              :  *
    1491              :  * Return list of primary key fields for the supplied relation,
    1492              :  * or NULL if none exists.
    1493              :  */
    1494            3 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1495              : Datum
    1496            9 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1497              : {
    1498              :     int16       indnkeyatts;
    1499              :     char      **results;
    1500              :     FuncCallContext *funcctx;
    1501              :     int32       call_cntr;
    1502              :     int32       max_calls;
    1503              :     AttInMetadata *attinmeta;
    1504              :     MemoryContext oldcontext;
    1505              : 
    1506              :     /* stuff done only on the first call of the function */
    1507            9 :     if (SRF_IS_FIRSTCALL())
    1508              :     {
    1509              :         Relation    rel;
    1510              :         TupleDesc   tupdesc;
    1511              : 
    1512              :         /* create a function context for cross-call persistence */
    1513            3 :         funcctx = SRF_FIRSTCALL_INIT();
    1514              : 
    1515              :         /*
    1516              :          * switch to memory context appropriate for multiple function calls
    1517              :          */
    1518            3 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1519              : 
    1520              :         /* open target relation */
    1521            3 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1522              : 
    1523              :         /* get the array of attnums */
    1524            3 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1525              : 
    1526            3 :         relation_close(rel, AccessShareLock);
    1527              : 
    1528              :         /*
    1529              :          * need a tuple descriptor representing one INT and one TEXT column
    1530              :          */
    1531            3 :         tupdesc = CreateTemplateTupleDesc(2);
    1532            3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1533              :                            INT4OID, -1, 0);
    1534            3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1535              :                            TEXTOID, -1, 0);
    1536              : 
    1537            3 :         TupleDescFinalize(tupdesc);
    1538              : 
    1539              :         /*
    1540              :          * Generate attribute metadata needed later to produce tuples from raw
    1541              :          * C strings
    1542              :          */
    1543            3 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1544            3 :         funcctx->attinmeta = attinmeta;
    1545              : 
    1546            3 :         if ((results != NULL) && (indnkeyatts > 0))
    1547              :         {
    1548            3 :             funcctx->max_calls = indnkeyatts;
    1549              : 
    1550              :             /* got results, keep track of them */
    1551            3 :             funcctx->user_fctx = results;
    1552              :         }
    1553              :         else
    1554              :         {
    1555              :             /* fast track when no results */
    1556            0 :             MemoryContextSwitchTo(oldcontext);
    1557            0 :             SRF_RETURN_DONE(funcctx);
    1558              :         }
    1559              : 
    1560            3 :         MemoryContextSwitchTo(oldcontext);
    1561              :     }
    1562              : 
    1563              :     /* stuff done on every call of the function */
    1564            9 :     funcctx = SRF_PERCALL_SETUP();
    1565              : 
    1566              :     /*
    1567              :      * initialize per-call variables
    1568              :      */
    1569            9 :     call_cntr = funcctx->call_cntr;
    1570            9 :     max_calls = funcctx->max_calls;
    1571              : 
    1572            9 :     results = (char **) funcctx->user_fctx;
    1573            9 :     attinmeta = funcctx->attinmeta;
    1574              : 
    1575            9 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1576              :     {
    1577              :         char      **values;
    1578              :         HeapTuple   tuple;
    1579              :         Datum       result;
    1580              : 
    1581            6 :         values = palloc_array(char *, 2);
    1582            6 :         values[0] = psprintf("%d", call_cntr + 1);
    1583            6 :         values[1] = results[call_cntr];
    1584              : 
    1585              :         /* build the tuple */
    1586            6 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1587              : 
    1588              :         /* make the tuple into a datum */
    1589            6 :         result = HeapTupleGetDatum(tuple);
    1590              : 
    1591            6 :         SRF_RETURN_NEXT(funcctx, result);
    1592              :     }
    1593              :     else
    1594              :     {
    1595              :         /* do when there is no more left */
    1596            3 :         SRF_RETURN_DONE(funcctx);
    1597              :     }
    1598              : }
    1599              : 
    1600              : 
    1601              : /*
    1602              :  * dblink_build_sql_insert
    1603              :  *
    1604              :  * Used to generate an SQL insert statement
    1605              :  * based on an existing tuple in a local relation.
    1606              :  * This is useful for selectively replicating data
    1607              :  * to another server via dblink.
    1608              :  *
    1609              :  * API:
    1610              :  * <relname> - name of local table of interest
    1611              :  * <pkattnums> - an int2vector of attnums which will be used
    1612              :  * to identify the local tuple of interest
    1613              :  * <pknumatts> - number of attnums in pkattnums
    1614              :  * <src_pkattvals_arry> - text array of key values which will be used
    1615              :  * to identify the local tuple of interest
    1616              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1617              :  * to build the string for execution remotely. These are substituted
    1618              :  * for their counterparts in src_pkattvals_arry
    1619              :  */
    1620            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1621              : Datum
    1622            6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1623              : {
    1624            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1625            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1626            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1627            6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1628            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1629              :     Relation    rel;
    1630              :     int        *pkattnums;
    1631              :     int         pknumatts;
    1632              :     char      **src_pkattvals;
    1633              :     char      **tgt_pkattvals;
    1634              :     int         src_nitems;
    1635              :     int         tgt_nitems;
    1636              :     char       *sql;
    1637              : 
    1638              :     /*
    1639              :      * Open target relation.
    1640              :      */
    1641            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1642              : 
    1643              :     /*
    1644              :      * Process pkattnums argument.
    1645              :      */
    1646            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1647              :                        &pkattnums, &pknumatts);
    1648              : 
    1649              :     /*
    1650              :      * Source array is made up of key values that will be used to locate the
    1651              :      * tuple of interest from the local system.
    1652              :      */
    1653            4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1654              : 
    1655              :     /*
    1656              :      * There should be one source array key value for each key attnum
    1657              :      */
    1658            4 :     if (src_nitems != pknumatts)
    1659            0 :         ereport(ERROR,
    1660              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1661              :                  errmsg("source key array length must match number of key attributes")));
    1662              : 
    1663              :     /*
    1664              :      * Target array is made up of key values that will be used to build the
    1665              :      * SQL string for use on the remote system.
    1666              :      */
    1667            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1668              : 
    1669              :     /*
    1670              :      * There should be one target array key value for each key attnum
    1671              :      */
    1672            4 :     if (tgt_nitems != pknumatts)
    1673            0 :         ereport(ERROR,
    1674              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1675              :                  errmsg("target key array length must match number of key attributes")));
    1676              : 
    1677              :     /*
    1678              :      * Prep work is finally done. Go get the SQL string.
    1679              :      */
    1680            4 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1681              : 
    1682              :     /*
    1683              :      * Now we can close the relation.
    1684              :      */
    1685            4 :     relation_close(rel, AccessShareLock);
    1686              : 
    1687              :     /*
    1688              :      * And send it
    1689              :      */
    1690            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1691              : }
    1692              : 
    1693              : 
    1694              : /*
    1695              :  * dblink_build_sql_delete
    1696              :  *
    1697              :  * Used to generate an SQL delete statement.
    1698              :  * This is useful for selectively replicating a
    1699              :  * delete to another server via dblink.
    1700              :  *
    1701              :  * API:
    1702              :  * <relname> - name of remote table of interest
    1703              :  * <pkattnums> - an int2vector of attnums which will be used
    1704              :  * to identify the remote tuple of interest
    1705              :  * <pknumatts> - number of attnums in pkattnums
    1706              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1707              :  * to build the string for execution remotely.
    1708              :  */
    1709            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1710              : Datum
    1711            6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1712              : {
    1713            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1714            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1715            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1716            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1717              :     Relation    rel;
    1718              :     int        *pkattnums;
    1719              :     int         pknumatts;
    1720              :     char      **tgt_pkattvals;
    1721              :     int         tgt_nitems;
    1722              :     char       *sql;
    1723              : 
    1724              :     /*
    1725              :      * Open target relation.
    1726              :      */
    1727            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1728              : 
    1729              :     /*
    1730              :      * Process pkattnums argument.
    1731              :      */
    1732            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1733              :                        &pkattnums, &pknumatts);
    1734              : 
    1735              :     /*
    1736              :      * Target array is made up of key values that will be used to build the
    1737              :      * SQL string for use on the remote system.
    1738              :      */
    1739            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1740              : 
    1741              :     /*
    1742              :      * There should be one target array key value for each key attnum
    1743              :      */
    1744            4 :     if (tgt_nitems != pknumatts)
    1745            0 :         ereport(ERROR,
    1746              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1747              :                  errmsg("target key array length must match number of key attributes")));
    1748              : 
    1749              :     /*
    1750              :      * Prep work is finally done. Go get the SQL string.
    1751              :      */
    1752            4 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1753              : 
    1754              :     /*
    1755              :      * Now we can close the relation.
    1756              :      */
    1757            4 :     relation_close(rel, AccessShareLock);
    1758              : 
    1759              :     /*
    1760              :      * And send it
    1761              :      */
    1762            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1763              : }
    1764              : 
    1765              : 
    1766              : /*
    1767              :  * dblink_build_sql_update
    1768              :  *
    1769              :  * Used to generate an SQL update statement
    1770              :  * based on an existing tuple in a local relation.
    1771              :  * This is useful for selectively replicating data
    1772              :  * to another server via dblink.
    1773              :  *
    1774              :  * API:
    1775              :  * <relname> - name of local table of interest
    1776              :  * <pkattnums> - an int2vector of attnums which will be used
    1777              :  * to identify the local tuple of interest
    1778              :  * <pknumatts> - number of attnums in pkattnums
    1779              :  * <src_pkattvals_arry> - text array of key values which will be used
    1780              :  * to identify the local tuple of interest
    1781              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1782              :  * to build the string for execution remotely. These are substituted
    1783              :  * for their counterparts in src_pkattvals_arry
    1784              :  */
    1785            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1786              : Datum
    1787            6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1788              : {
    1789            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1790            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1791            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1792            6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1793            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1794              :     Relation    rel;
    1795              :     int        *pkattnums;
    1796              :     int         pknumatts;
    1797              :     char      **src_pkattvals;
    1798              :     char      **tgt_pkattvals;
    1799              :     int         src_nitems;
    1800              :     int         tgt_nitems;
    1801              :     char       *sql;
    1802              : 
    1803              :     /*
    1804              :      * Open target relation.
    1805              :      */
    1806            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1807              : 
    1808              :     /*
    1809              :      * Process pkattnums argument.
    1810              :      */
    1811            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1812              :                        &pkattnums, &pknumatts);
    1813              : 
    1814              :     /*
    1815              :      * Source array is made up of key values that will be used to locate the
    1816              :      * tuple of interest from the local system.
    1817              :      */
    1818            4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1819              : 
    1820              :     /*
    1821              :      * There should be one source array key value for each key attnum
    1822              :      */
    1823            4 :     if (src_nitems != pknumatts)
    1824            0 :         ereport(ERROR,
    1825              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1826              :                  errmsg("source key array length must match number of key attributes")));
    1827              : 
    1828              :     /*
    1829              :      * Target array is made up of key values that will be used to build the
    1830              :      * SQL string for use on the remote system.
    1831              :      */
    1832            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1833              : 
    1834              :     /*
    1835              :      * There should be one target array key value for each key attnum
    1836              :      */
    1837            4 :     if (tgt_nitems != pknumatts)
    1838            0 :         ereport(ERROR,
    1839              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1840              :                  errmsg("target key array length must match number of key attributes")));
    1841              : 
    1842              :     /*
    1843              :      * Prep work is finally done. Go get the SQL string.
    1844              :      */
    1845            4 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1846              : 
    1847              :     /*
    1848              :      * Now we can close the relation.
    1849              :      */
    1850            4 :     relation_close(rel, AccessShareLock);
    1851              : 
    1852              :     /*
    1853              :      * And send it
    1854              :      */
    1855            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1856              : }
    1857              : 
    1858              : /*
    1859              :  * dblink_current_query
    1860              :  * return the current query string
    1861              :  * to allow its use in (among other things)
    1862              :  * rewrite rules
    1863              :  */
    1864            2 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1865              : Datum
    1866            0 : dblink_current_query(PG_FUNCTION_ARGS)
    1867              : {
    1868              :     /* This is now just an alias for the built-in function current_query() */
    1869            0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1870              : }
    1871              : 
    1872              : /*
    1873              :  * Retrieve async notifications for a connection.
    1874              :  *
    1875              :  * Returns a setof record of notifications, or an empty set if none received.
    1876              :  * Can optionally take a named connection as parameter, but uses the unnamed
    1877              :  * connection per default.
    1878              :  *
    1879              :  */
    1880              : #define DBLINK_NOTIFY_COLS      3
    1881              : 
    1882            5 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1883              : Datum
    1884            2 : dblink_get_notify(PG_FUNCTION_ARGS)
    1885              : {
    1886              :     PGconn     *conn;
    1887              :     PGnotify   *notify;
    1888            2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1889              : 
    1890            2 :     dblink_init();
    1891            2 :     if (PG_NARGS() == 1)
    1892            0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1893              :     else
    1894            2 :         conn = pconn->conn;
    1895              : 
    1896            2 :     InitMaterializedSRF(fcinfo, 0);
    1897              : 
    1898            2 :     PQconsumeInput(conn);
    1899            4 :     while ((notify = PQnotifies(conn)) != NULL)
    1900              :     {
    1901              :         Datum       values[DBLINK_NOTIFY_COLS];
    1902              :         bool        nulls[DBLINK_NOTIFY_COLS];
    1903              : 
    1904            2 :         memset(values, 0, sizeof(values));
    1905            2 :         memset(nulls, 0, sizeof(nulls));
    1906              : 
    1907            2 :         if (notify->relname != NULL)
    1908            2 :             values[0] = CStringGetTextDatum(notify->relname);
    1909              :         else
    1910            0 :             nulls[0] = true;
    1911              : 
    1912            2 :         values[1] = Int32GetDatum(notify->be_pid);
    1913              : 
    1914            2 :         if (notify->extra != NULL)
    1915            2 :             values[2] = CStringGetTextDatum(notify->extra);
    1916              :         else
    1917            0 :             nulls[2] = true;
    1918              : 
    1919            2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1920              : 
    1921            2 :         PQfreemem(notify);
    1922            2 :         PQconsumeInput(conn);
    1923              :     }
    1924              : 
    1925            2 :     return (Datum) 0;
    1926              : }
    1927              : 
    1928              : /*
    1929              :  * Validate the options given to a dblink foreign server or user mapping.
    1930              :  * Raise an error if any option is invalid.
    1931              :  *
    1932              :  * We just check the names of options here, so semantic errors in options,
    1933              :  * such as invalid numeric format, will be detected at the attempt to connect.
    1934              :  */
    1935           14 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1936              : Datum
    1937           19 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1938              : {
    1939           19 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1940           19 :     Oid         context = PG_GETARG_OID(1);
    1941              :     ListCell   *cell;
    1942              : 
    1943              :     static const PQconninfoOption *options = NULL;
    1944              : 
    1945              :     /*
    1946              :      * Get list of valid libpq options.
    1947              :      *
    1948              :      * To avoid unnecessary work, we get the list once and use it throughout
    1949              :      * the lifetime of this backend process.  We don't need to care about
    1950              :      * memory context issues, because PQconndefaults allocates with malloc.
    1951              :      */
    1952           19 :     if (!options)
    1953              :     {
    1954           12 :         options = PQconndefaults();
    1955           12 :         if (!options)           /* assume reason for failure is OOM */
    1956            0 :             ereport(ERROR,
    1957              :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1958              :                      errmsg("out of memory"),
    1959              :                      errdetail("Could not get libpq's default connection options.")));
    1960              :     }
    1961              : 
    1962              :     /* Validate each supplied option. */
    1963           50 :     foreach(cell, options_list)
    1964              :     {
    1965           39 :         DefElem    *def = (DefElem *) lfirst(cell);
    1966              : 
    1967           39 :         if (!is_valid_dblink_fdw_option(options, def->defname, context))
    1968              :         {
    1969              :             /*
    1970              :              * Unknown option, or invalid option for the context specified, so
    1971              :              * complain about it.  Provide a hint with a valid option that
    1972              :              * looks similar, if there is one.
    1973              :              */
    1974              :             const PQconninfoOption *opt;
    1975              :             const char *closest_match;
    1976              :             ClosestMatchState match_state;
    1977            8 :             bool        has_valid_options = false;
    1978              : 
    1979            8 :             initClosestMatch(&match_state, def->defname, 4);
    1980          424 :             for (opt = options; opt->keyword; opt++)
    1981              :             {
    1982          416 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    1983              :                 {
    1984           93 :                     has_valid_options = true;
    1985           93 :                     updateClosestMatch(&match_state, opt->keyword);
    1986              :                 }
    1987              :             }
    1988              : 
    1989            8 :             closest_match = getClosestMatch(&match_state);
    1990            8 :             ereport(ERROR,
    1991              :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    1992              :                      errmsg("invalid option \"%s\"", def->defname),
    1993              :                      has_valid_options ? closest_match ?
    1994              :                      errhint("Perhaps you meant the option \"%s\".",
    1995              :                              closest_match) : 0 :
    1996              :                      errhint("There are no valid options in this context.")));
    1997              :         }
    1998              :     }
    1999              : 
    2000           11 :     PG_RETURN_VOID();
    2001              : }
    2002              : 
    2003              : 
    2004              : /*************************************************************
    2005              :  * internal functions
    2006              :  */
    2007              : 
    2008              : 
    2009              : /*
    2010              :  * get_pkey_attnames
    2011              :  *
    2012              :  * Get the primary key attnames for the given relation.
    2013              :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2014              :  */
    2015              : static char **
    2016            3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2017              : {
    2018              :     Relation    indexRelation;
    2019              :     ScanKeyData skey;
    2020              :     SysScanDesc scan;
    2021              :     HeapTuple   indexTuple;
    2022              :     int         i;
    2023            3 :     char      **result = NULL;
    2024              :     TupleDesc   tupdesc;
    2025              : 
    2026              :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2027            3 :     *indnkeyatts = 0;
    2028              : 
    2029            3 :     tupdesc = rel->rd_att;
    2030              : 
    2031              :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2032            3 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2033            3 :     ScanKeyInit(&skey,
    2034              :                 Anum_pg_index_indrelid,
    2035              :                 BTEqualStrategyNumber, F_OIDEQ,
    2036              :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2037              : 
    2038            3 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2039              :                               NULL, 1, &skey);
    2040              : 
    2041            3 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2042              :     {
    2043            3 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2044              : 
    2045              :         /* we're only interested if it is the primary key */
    2046            3 :         if (index->indisprimary)
    2047              :         {
    2048            3 :             *indnkeyatts = index->indnkeyatts;
    2049            3 :             if (*indnkeyatts > 0)
    2050              :             {
    2051            3 :                 result = palloc_array(char *, *indnkeyatts);
    2052              : 
    2053            9 :                 for (i = 0; i < *indnkeyatts; i++)
    2054            6 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2055              :             }
    2056            3 :             break;
    2057              :         }
    2058              :     }
    2059              : 
    2060            3 :     systable_endscan(scan);
    2061            3 :     table_close(indexRelation, AccessShareLock);
    2062              : 
    2063            3 :     return result;
    2064              : }
    2065              : 
    2066              : /*
    2067              :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2068              :  * returned as NULL pointers)
    2069              :  */
    2070              : static char **
    2071           20 : get_text_array_contents(ArrayType *array, int *numitems)
    2072              : {
    2073           20 :     int         ndim = ARR_NDIM(array);
    2074           20 :     int        *dims = ARR_DIMS(array);
    2075              :     int         nitems;
    2076              :     int16       typlen;
    2077              :     bool        typbyval;
    2078              :     char        typalign;
    2079              :     uint8       typalignby;
    2080              :     char      **values;
    2081              :     char       *ptr;
    2082              :     uint8      *bitmap;
    2083              :     int         bitmask;
    2084              :     int         i;
    2085              : 
    2086              :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2087              : 
    2088           20 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2089              : 
    2090           20 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2091              :                          &typlen, &typbyval, &typalign);
    2092           20 :     typalignby = typalign_to_alignby(typalign);
    2093              : 
    2094           20 :     values = palloc_array(char *, nitems);
    2095              : 
    2096           20 :     ptr = ARR_DATA_PTR(array);
    2097           20 :     bitmap = ARR_NULLBITMAP(array);
    2098           20 :     bitmask = 1;
    2099              : 
    2100           55 :     for (i = 0; i < nitems; i++)
    2101              :     {
    2102           35 :         if (bitmap && (*bitmap & bitmask) == 0)
    2103              :         {
    2104            0 :             values[i] = NULL;
    2105              :         }
    2106              :         else
    2107              :         {
    2108           35 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2109           35 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2110           35 :             ptr = (char *) att_nominal_alignby(ptr, typalignby);
    2111              :         }
    2112              : 
    2113              :         /* advance bitmap pointer if any */
    2114           35 :         if (bitmap)
    2115              :         {
    2116            0 :             bitmask <<= 1;
    2117            0 :             if (bitmask == 0x100)
    2118              :             {
    2119            0 :                 bitmap++;
    2120            0 :                 bitmask = 1;
    2121              :             }
    2122              :         }
    2123              :     }
    2124              : 
    2125           20 :     return values;
    2126              : }
    2127              : 
    2128              : static char *
    2129            4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2130              : {
    2131              :     char       *relname;
    2132              :     HeapTuple   tuple;
    2133              :     TupleDesc   tupdesc;
    2134              :     int         natts;
    2135              :     StringInfoData buf;
    2136              :     char       *val;
    2137              :     int         key;
    2138              :     int         i;
    2139              :     bool        needComma;
    2140              : 
    2141            4 :     initStringInfo(&buf);
    2142              : 
    2143              :     /* get relation name including any needed schema prefix and quoting */
    2144            4 :     relname = generate_relation_name(rel);
    2145              : 
    2146            4 :     tupdesc = rel->rd_att;
    2147            4 :     natts = tupdesc->natts;
    2148              : 
    2149            4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2150            4 :     if (!tuple)
    2151            0 :         ereport(ERROR,
    2152              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2153              :                  errmsg("source row not found")));
    2154              : 
    2155            4 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2156              : 
    2157            4 :     needComma = false;
    2158           19 :     for (i = 0; i < natts; i++)
    2159              :     {
    2160           15 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2161              : 
    2162           15 :         if (att->attisdropped)
    2163            2 :             continue;
    2164              : 
    2165           13 :         if (needComma)
    2166            9 :             appendStringInfoChar(&buf, ',');
    2167              : 
    2168           13 :         appendStringInfoString(&buf,
    2169           13 :                                quote_ident_cstr(NameStr(att->attname)));
    2170           13 :         needComma = true;
    2171              :     }
    2172              : 
    2173            4 :     appendStringInfoString(&buf, ") VALUES(");
    2174              : 
    2175              :     /*
    2176              :      * Note: i is physical column number (counting from 0).
    2177              :      */
    2178            4 :     needComma = false;
    2179           19 :     for (i = 0; i < natts; i++)
    2180              :     {
    2181           15 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2182            2 :             continue;
    2183              : 
    2184           13 :         if (needComma)
    2185            9 :             appendStringInfoChar(&buf, ',');
    2186              : 
    2187           13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2188              : 
    2189           13 :         if (key >= 0)
    2190            7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2191              :         else
    2192            6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2193              : 
    2194           13 :         if (val != NULL)
    2195              :         {
    2196           13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2197           13 :             pfree(val);
    2198              :         }
    2199              :         else
    2200            0 :             appendStringInfoString(&buf, "NULL");
    2201           13 :         needComma = true;
    2202              :     }
    2203            4 :     appendStringInfoChar(&buf, ')');
    2204              : 
    2205            4 :     return buf.data;
    2206              : }
    2207              : 
    2208              : static char *
    2209            4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2210              : {
    2211              :     char       *relname;
    2212              :     TupleDesc   tupdesc;
    2213              :     StringInfoData buf;
    2214              :     int         i;
    2215              : 
    2216            4 :     initStringInfo(&buf);
    2217              : 
    2218              :     /* get relation name including any needed schema prefix and quoting */
    2219            4 :     relname = generate_relation_name(rel);
    2220              : 
    2221            4 :     tupdesc = rel->rd_att;
    2222              : 
    2223            4 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2224           11 :     for (i = 0; i < pknumatts; i++)
    2225              :     {
    2226            7 :         int         pkattnum = pkattnums[i];
    2227            7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2228              : 
    2229            7 :         if (i > 0)
    2230            3 :             appendStringInfoString(&buf, " AND ");
    2231              : 
    2232            7 :         appendStringInfoString(&buf,
    2233            7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2234              : 
    2235            7 :         if (tgt_pkattvals[i] != NULL)
    2236            7 :             appendStringInfo(&buf, " = %s",
    2237            7 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2238              :         else
    2239            0 :             appendStringInfoString(&buf, " IS NULL");
    2240              :     }
    2241              : 
    2242            4 :     return buf.data;
    2243              : }
    2244              : 
    2245              : static char *
    2246            4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2247              : {
    2248              :     char       *relname;
    2249              :     HeapTuple   tuple;
    2250              :     TupleDesc   tupdesc;
    2251              :     int         natts;
    2252              :     StringInfoData buf;
    2253              :     char       *val;
    2254              :     int         key;
    2255              :     int         i;
    2256              :     bool        needComma;
    2257              : 
    2258            4 :     initStringInfo(&buf);
    2259              : 
    2260              :     /* get relation name including any needed schema prefix and quoting */
    2261            4 :     relname = generate_relation_name(rel);
    2262              : 
    2263            4 :     tupdesc = rel->rd_att;
    2264            4 :     natts = tupdesc->natts;
    2265              : 
    2266            4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2267            4 :     if (!tuple)
    2268            0 :         ereport(ERROR,
    2269              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2270              :                  errmsg("source row not found")));
    2271              : 
    2272            4 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2273              : 
    2274              :     /*
    2275              :      * Note: i is physical column number (counting from 0).
    2276              :      */
    2277            4 :     needComma = false;
    2278           19 :     for (i = 0; i < natts; i++)
    2279              :     {
    2280           15 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2281              : 
    2282           15 :         if (attr->attisdropped)
    2283            2 :             continue;
    2284              : 
    2285           13 :         if (needComma)
    2286            9 :             appendStringInfoString(&buf, ", ");
    2287              : 
    2288           13 :         appendStringInfo(&buf, "%s = ",
    2289           13 :                          quote_ident_cstr(NameStr(attr->attname)));
    2290              : 
    2291           13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2292              : 
    2293           13 :         if (key >= 0)
    2294            7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2295              :         else
    2296            6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2297              : 
    2298           13 :         if (val != NULL)
    2299              :         {
    2300           13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2301           13 :             pfree(val);
    2302              :         }
    2303              :         else
    2304            0 :             appendStringInfoString(&buf, "NULL");
    2305           13 :         needComma = true;
    2306              :     }
    2307              : 
    2308            4 :     appendStringInfoString(&buf, " WHERE ");
    2309              : 
    2310           11 :     for (i = 0; i < pknumatts; i++)
    2311              :     {
    2312            7 :         int         pkattnum = pkattnums[i];
    2313            7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2314              : 
    2315            7 :         if (i > 0)
    2316            3 :             appendStringInfoString(&buf, " AND ");
    2317              : 
    2318            7 :         appendStringInfoString(&buf,
    2319            7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2320              : 
    2321            7 :         val = tgt_pkattvals[i];
    2322              : 
    2323            7 :         if (val != NULL)
    2324            7 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2325              :         else
    2326            0 :             appendStringInfoString(&buf, " IS NULL");
    2327              :     }
    2328              : 
    2329            4 :     return buf.data;
    2330              : }
    2331              : 
    2332              : /*
    2333              :  * Return a properly quoted identifier.
    2334              :  * Uses quote_ident in quote.c
    2335              :  */
    2336              : static char *
    2337           80 : quote_ident_cstr(char *rawstr)
    2338              : {
    2339              :     text       *rawstr_text;
    2340              :     text       *result_text;
    2341              :     char       *result;
    2342              : 
    2343           80 :     rawstr_text = cstring_to_text(rawstr);
    2344           80 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2345              :                                                      PointerGetDatum(rawstr_text)));
    2346           80 :     result = text_to_cstring(result_text);
    2347              : 
    2348           80 :     return result;
    2349              : }
    2350              : 
    2351              : static int
    2352           26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2353              : {
    2354              :     int         i;
    2355              : 
    2356              :     /*
    2357              :      * Not likely a long list anyway, so just scan for the value
    2358              :      */
    2359           50 :     for (i = 0; i < pknumatts; i++)
    2360           38 :         if (key == pkattnums[i])
    2361           14 :             return i;
    2362              : 
    2363           12 :     return -1;
    2364              : }
    2365              : 
    2366              : static HeapTuple
    2367            8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2368              : {
    2369              :     char       *relname;
    2370              :     TupleDesc   tupdesc;
    2371              :     int         natts;
    2372              :     StringInfoData buf;
    2373              :     int         ret;
    2374              :     HeapTuple   tuple;
    2375              :     int         i;
    2376              : 
    2377              :     /*
    2378              :      * Connect to SPI manager
    2379              :      */
    2380            8 :     SPI_connect();
    2381              : 
    2382            8 :     initStringInfo(&buf);
    2383              : 
    2384              :     /* get relation name including any needed schema prefix and quoting */
    2385            8 :     relname = generate_relation_name(rel);
    2386              : 
    2387            8 :     tupdesc = rel->rd_att;
    2388            8 :     natts = tupdesc->natts;
    2389              : 
    2390              :     /*
    2391              :      * Build sql statement to look up tuple of interest, ie, the one matching
    2392              :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2393              :      * generate a result tuple that matches the table's physical structure,
    2394              :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2395              :      * different tupdescs and everything's very confusing.
    2396              :      */
    2397            8 :     appendStringInfoString(&buf, "SELECT ");
    2398              : 
    2399           38 :     for (i = 0; i < natts; i++)
    2400              :     {
    2401           30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2402              : 
    2403           30 :         if (i > 0)
    2404           22 :             appendStringInfoString(&buf, ", ");
    2405              : 
    2406           30 :         if (attr->attisdropped)
    2407            4 :             appendStringInfoString(&buf, "NULL");
    2408              :         else
    2409           26 :             appendStringInfoString(&buf,
    2410           26 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2411              :     }
    2412              : 
    2413            8 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2414              : 
    2415           22 :     for (i = 0; i < pknumatts; i++)
    2416              :     {
    2417           14 :         int         pkattnum = pkattnums[i];
    2418           14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2419              : 
    2420           14 :         if (i > 0)
    2421            6 :             appendStringInfoString(&buf, " AND ");
    2422              : 
    2423           14 :         appendStringInfoString(&buf,
    2424           14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2425              : 
    2426           14 :         if (src_pkattvals[i] != NULL)
    2427           14 :             appendStringInfo(&buf, " = %s",
    2428           14 :                              quote_literal_cstr(src_pkattvals[i]));
    2429              :         else
    2430            0 :             appendStringInfoString(&buf, " IS NULL");
    2431              :     }
    2432              : 
    2433              :     /*
    2434              :      * Retrieve the desired tuple
    2435              :      */
    2436            8 :     ret = SPI_exec(buf.data, 0);
    2437            8 :     pfree(buf.data);
    2438              : 
    2439              :     /*
    2440              :      * Only allow one qualifying tuple
    2441              :      */
    2442            8 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2443            0 :         ereport(ERROR,
    2444              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2445              :                  errmsg("source criteria matched more than one record")));
    2446              : 
    2447            8 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2448              :     {
    2449            8 :         SPITupleTable *tuptable = SPI_tuptable;
    2450              : 
    2451            8 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2452            8 :         SPI_finish();
    2453              : 
    2454            8 :         return tuple;
    2455              :     }
    2456              :     else
    2457              :     {
    2458              :         /*
    2459              :          * no qualifying tuples
    2460              :          */
    2461            0 :         SPI_finish();
    2462              : 
    2463            0 :         return NULL;
    2464              :     }
    2465              : 
    2466              :     /*
    2467              :      * never reached, but keep compiler quiet
    2468              :      */
    2469              :     return NULL;
    2470              : }
    2471              : 
    2472              : static void
    2473           21 : RangeVarCallbackForDblink(const RangeVar *relation,
    2474              :                           Oid relId, Oid oldRelId, void *arg)
    2475              : {
    2476              :     AclResult   aclresult;
    2477              : 
    2478           21 :     if (!OidIsValid(relId))
    2479            0 :         return;
    2480              : 
    2481           21 :     aclresult = pg_class_aclcheck(relId, GetUserId(), *((AclMode *) arg));
    2482           21 :     if (aclresult != ACLCHECK_OK)
    2483            0 :         aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relId)),
    2484            0 :                        relation->relname);
    2485              : }
    2486              : 
    2487              : /*
    2488              :  * Open the relation named by relname_text, acquire specified type of lock,
    2489              :  * verify we have specified permissions.
    2490              :  * Caller must close rel when done with it.
    2491              :  */
    2492              : static Relation
    2493           21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2494              : {
    2495              :     RangeVar   *relvar;
    2496              :     Oid         relid;
    2497              : 
    2498           21 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2499           21 :     relid = RangeVarGetRelidExtended(relvar, lockmode, 0,
    2500              :                                      RangeVarCallbackForDblink, &aclmode);
    2501              : 
    2502           21 :     return table_open(relid, NoLock);
    2503              : }
    2504              : 
    2505              : /*
    2506              :  * generate_relation_name - copied from ruleutils.c
    2507              :  *      Compute the name to display for a relation
    2508              :  *
    2509              :  * The result includes all necessary quoting and schema-prefixing.
    2510              :  */
    2511              : static char *
    2512           20 : generate_relation_name(Relation rel)
    2513              : {
    2514              :     char       *nspname;
    2515              :     char       *result;
    2516              : 
    2517              :     /* Qualify the name if not visible in search path */
    2518           20 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2519           15 :         nspname = NULL;
    2520              :     else
    2521            5 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2522              : 
    2523           20 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2524              : 
    2525           20 :     return result;
    2526              : }
    2527              : 
    2528              : 
    2529              : static remoteConn *
    2530           79 : getConnectionByName(const char *name)
    2531              : {
    2532              :     remoteConnHashEnt *hentry;
    2533              :     char       *key;
    2534              : 
    2535           79 :     if (!remoteConnHash)
    2536            6 :         remoteConnHash = createConnHash();
    2537              : 
    2538           79 :     key = pstrdup(name);
    2539           79 :     truncate_identifier(key, strlen(key), false);
    2540           79 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2541              :                                                key, HASH_FIND, NULL);
    2542              : 
    2543           79 :     if (hentry && hentry->rconn.conn != NULL)
    2544           68 :         return &hentry->rconn;
    2545              : 
    2546           11 :     return NULL;
    2547              : }
    2548              : 
    2549              : static HTAB *
    2550            7 : createConnHash(void)
    2551              : {
    2552              :     HASHCTL     ctl;
    2553              : 
    2554            7 :     ctl.keysize = NAMEDATALEN;
    2555            7 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2556              : 
    2557            7 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2558              :                        HASH_ELEM | HASH_STRINGS);
    2559              : }
    2560              : 
    2561              : static remoteConn *
    2562           10 : createNewConnection(const char *name)
    2563              : {
    2564              :     remoteConnHashEnt *hentry;
    2565              :     bool        found;
    2566              :     char       *key;
    2567              : 
    2568           10 :     if (!remoteConnHash)
    2569            1 :         remoteConnHash = createConnHash();
    2570              : 
    2571           10 :     key = pstrdup(name);
    2572           10 :     truncate_identifier(key, strlen(key), true);
    2573           10 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2574              :                                                HASH_ENTER, &found);
    2575              : 
    2576           10 :     if (found && hentry->rconn.conn != NULL)
    2577            1 :         ereport(ERROR,
    2578              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2579              :                  errmsg("duplicate connection name")));
    2580              : 
    2581              :     /* New, or reusable, so initialize the rconn struct to zeroes */
    2582            9 :     memset(&hentry->rconn, 0, sizeof(remoteConn));
    2583              : 
    2584            9 :     return &hentry->rconn;
    2585              : }
    2586              : 
    2587              : static void
    2588            8 : deleteConnection(const char *name)
    2589              : {
    2590              :     remoteConnHashEnt *hentry;
    2591              :     bool        found;
    2592              :     char       *key;
    2593              : 
    2594            8 :     if (!remoteConnHash)
    2595            0 :         remoteConnHash = createConnHash();
    2596              : 
    2597            8 :     key = pstrdup(name);
    2598            8 :     truncate_identifier(key, strlen(key), false);
    2599            8 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2600              :                                                key, HASH_REMOVE, &found);
    2601              : 
    2602            8 :     if (!hentry)
    2603            0 :         ereport(ERROR,
    2604              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2605              :                  errmsg("undefined connection name")));
    2606            8 : }
    2607              : 
    2608              :  /*
    2609              :   * Ensure that require_auth and SCRAM keys are correctly set on connstr.
    2610              :   * SCRAM keys used to pass-through are coming from the initial connection
    2611              :   * from the client with the server.
    2612              :   *
    2613              :   * All required SCRAM options are set by dblink, so we just need to ensure
    2614              :   * that these options are not overwritten by the user.
    2615              :   *
    2616              :   * See appendSCRAMKeysInfo and its usage for more.
    2617              :   */
    2618              : bool
    2619            6 : dblink_connstr_has_required_scram_options(const char *connstr)
    2620              : {
    2621              :     PQconninfoOption *options;
    2622            6 :     bool        has_scram_server_key = false;
    2623            6 :     bool        has_scram_client_key = false;
    2624            6 :     bool        has_require_auth = false;
    2625            6 :     bool        has_scram_keys = false;
    2626              : 
    2627            6 :     options = PQconninfoParse(connstr, NULL);
    2628            6 :     if (options)
    2629              :     {
    2630              :         /*
    2631              :          * Continue iterating even if we found the keys that we need to
    2632              :          * validate to make sure that there is no other declaration of these
    2633              :          * keys that can overwrite the first.
    2634              :          */
    2635          318 :         for (PQconninfoOption *option = options; option->keyword != NULL; option++)
    2636              :         {
    2637          312 :             if (strcmp(option->keyword, "require_auth") == 0)
    2638              :             {
    2639            6 :                 if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
    2640            5 :                     has_require_auth = true;
    2641              :                 else
    2642            1 :                     has_require_auth = false;
    2643              :             }
    2644              : 
    2645          312 :             if (strcmp(option->keyword, "scram_client_key") == 0)
    2646              :             {
    2647            6 :                 if (option->val != NULL && option->val[0] != '\0')
    2648            6 :                     has_scram_client_key = true;
    2649              :                 else
    2650            0 :                     has_scram_client_key = false;
    2651              :             }
    2652              : 
    2653          312 :             if (strcmp(option->keyword, "scram_server_key") == 0)
    2654              :             {
    2655            6 :                 if (option->val != NULL && option->val[0] != '\0')
    2656            6 :                     has_scram_server_key = true;
    2657              :                 else
    2658            0 :                     has_scram_server_key = false;
    2659              :             }
    2660              :         }
    2661            6 :         PQconninfoFree(options);
    2662              :     }
    2663              : 
    2664            6 :     has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
    2665              : 
    2666            6 :     return (has_scram_keys && has_require_auth);
    2667              : }
    2668              : 
    2669              : /*
    2670              :  * We need to make sure that the connection made used credentials
    2671              :  * which were provided by the user, so check what credentials were
    2672              :  * used to connect and then make sure that they came from the user.
    2673              :  *
    2674              :  * On failure, we close "conn" and also delete the hashtable entry
    2675              :  * identified by "connname" (if that's not NULL).
    2676              :  */
    2677              : static void
    2678           20 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
    2679              : {
    2680              :     /* Superuser bypasses security check */
    2681           20 :     if (superuser())
    2682           18 :         return;
    2683              : 
    2684              :     /* If password was used to connect, make sure it was one provided */
    2685            2 :     if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
    2686            0 :         return;
    2687              : 
    2688              :     /*
    2689              :      * Password was not used to connect, check if SCRAM pass-through is in
    2690              :      * use.
    2691              :      *
    2692              :      * If dblink_connstr_has_required_scram_options is true we assume that
    2693              :      * UseScramPassthrough is also true because the required SCRAM keys are
    2694              :      * only added if UseScramPassthrough is set, and the user is not allowed
    2695              :      * to add the SCRAM keys on fdw and user mapping options.
    2696              :      */
    2697            2 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2698            2 :         return;
    2699              : 
    2700              : #ifdef ENABLE_GSS
    2701              :     /* If GSSAPI creds used to connect, make sure it was one delegated */
    2702              :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
    2703              :         return;
    2704              : #endif
    2705              : 
    2706              :     /* Otherwise, fail out */
    2707            0 :     libpqsrv_disconnect(conn);
    2708            0 :     if (connname)
    2709            0 :         deleteConnection(connname);
    2710              : 
    2711            0 :     ereport(ERROR,
    2712              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2713              :              errmsg("password or GSSAPI delegated credentials required"),
    2714              :              errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
    2715              :              errhint("Ensure provided credentials match target server's authentication method.")));
    2716              : }
    2717              : 
    2718              : /*
    2719              :  * Function to check if the connection string includes an explicit
    2720              :  * password, needed to ensure that non-superuser password-based auth
    2721              :  * is using a provided password and not one picked up from the
    2722              :  * environment.
    2723              :  */
    2724              : static bool
    2725            7 : dblink_connstr_has_pw(const char *connstr)
    2726              : {
    2727              :     PQconninfoOption *options;
    2728              :     PQconninfoOption *option;
    2729            7 :     bool        connstr_gives_password = false;
    2730              : 
    2731            7 :     options = PQconninfoParse(connstr, NULL);
    2732            7 :     if (options)
    2733              :     {
    2734          371 :         for (option = options; option->keyword != NULL; option++)
    2735              :         {
    2736          364 :             if (strcmp(option->keyword, "password") == 0)
    2737              :             {
    2738            7 :                 if (option->val != NULL && option->val[0] != '\0')
    2739              :                 {
    2740            0 :                     connstr_gives_password = true;
    2741            0 :                     break;
    2742              :                 }
    2743              :             }
    2744              :         }
    2745            7 :         PQconninfoFree(options);
    2746              :     }
    2747              : 
    2748            7 :     return connstr_gives_password;
    2749              : }
    2750              : 
    2751              : /*
    2752              :  * For non-superusers, insist that the connstr specify a password, except if
    2753              :  * GSSAPI credentials have been delegated (and we check that they are used for
    2754              :  * the connection in dblink_security_check later) or if SCRAM pass-through is
    2755              :  * being used.  This prevents a password or GSSAPI credentials from being
    2756              :  * picked up from .pgpass, a service file, the environment, etc.  We don't want
    2757              :  * the postgres user's passwords or Kerberos credentials to be accessible to
    2758              :  * non-superusers. In case of SCRAM pass-through insist that the connstr
    2759              :  * has the required SCRAM pass-through options.
    2760              :  */
    2761              : static void
    2762           26 : dblink_connstr_check(const char *connstr)
    2763              : {
    2764           26 :     if (superuser())
    2765           21 :         return;
    2766              : 
    2767            5 :     if (dblink_connstr_has_pw(connstr))
    2768            0 :         return;
    2769              : 
    2770            5 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2771            3 :         return;
    2772              : 
    2773              : #ifdef ENABLE_GSS
    2774              :     if (be_gssapi_get_delegation(MyProcPort))
    2775              :         return;
    2776              : #endif
    2777              : 
    2778            2 :     ereport(ERROR,
    2779              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2780              :              errmsg("password or GSSAPI delegated credentials required"),
    2781              :              errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
    2782              : }
    2783              : 
    2784              : /*
    2785              :  * Report an error received from the remote server
    2786              :  *
    2787              :  * res: the received error result
    2788              :  * fail: true for ERROR ereport, false for NOTICE
    2789              :  * fmt and following args: sprintf-style format and values for errcontext;
    2790              :  * the resulting string should be worded like "while <some action>"
    2791              :  *
    2792              :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
    2793              :  * in which case memory context cleanup will clear it eventually).
    2794              :  */
    2795              : static void
    2796           12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2797              :                  bool fail, const char *fmt, ...)
    2798              : {
    2799              :     int         level;
    2800           12 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2801           12 :     char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2802           12 :     char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2803           12 :     char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2804           12 :     char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2805              :     int         sqlstate;
    2806              :     va_list     ap;
    2807              :     char        dblink_context_msg[512];
    2808              : 
    2809           12 :     if (fail)
    2810            3 :         level = ERROR;
    2811              :     else
    2812            9 :         level = NOTICE;
    2813              : 
    2814           12 :     if (pg_diag_sqlstate)
    2815           12 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2816              :                                  pg_diag_sqlstate[1],
    2817              :                                  pg_diag_sqlstate[2],
    2818              :                                  pg_diag_sqlstate[3],
    2819              :                                  pg_diag_sqlstate[4]);
    2820              :     else
    2821            0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2822              : 
    2823              :     /*
    2824              :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2825              :      * needed because for connection-level failures, PQgetResult may just
    2826              :      * return NULL, not a PGresult at all.
    2827              :      */
    2828           12 :     if (message_primary == NULL)
    2829            0 :         message_primary = pchomp(PQerrorMessage(conn));
    2830              : 
    2831              :     /*
    2832              :      * Format the basic errcontext string.  Below, we'll add on something
    2833              :      * about the connection name.  That's a violation of the translatability
    2834              :      * guidelines about constructing error messages out of parts, but since
    2835              :      * there's no translation support for dblink, there's no need to worry
    2836              :      * about that (yet).
    2837              :      */
    2838           12 :     va_start(ap, fmt);
    2839           12 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2840           12 :     va_end(ap);
    2841              : 
    2842           12 :     ereport(level,
    2843              :             (errcode(sqlstate),
    2844              :              (message_primary != NULL && message_primary[0] != '\0') ?
    2845              :              errmsg_internal("%s", message_primary) :
    2846              :              errmsg("could not obtain message string for remote error"),
    2847              :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2848              :              message_hint ? errhint("%s", message_hint) : 0,
    2849              :              message_context ? (errcontext("%s", message_context)) : 0,
    2850              :              conname ?
    2851              :              (errcontext("%s on dblink connection named \"%s\"",
    2852              :                          dblink_context_msg, conname)) :
    2853              :              (errcontext("%s on unnamed dblink connection",
    2854              :                          dblink_context_msg))));
    2855            9 :     PQclear(res);
    2856            9 : }
    2857              : 
    2858              : /*
    2859              :  * Obtain connection string for a foreign server
    2860              :  */
    2861              : static char *
    2862           26 : get_connect_string(const char *servername)
    2863              : {
    2864           26 :     ForeignServer *foreign_server = NULL;
    2865              :     UserMapping *user_mapping;
    2866              :     ListCell   *cell;
    2867              :     StringInfoData buf;
    2868              :     ForeignDataWrapper *fdw;
    2869              :     AclResult   aclresult;
    2870              :     char       *srvname;
    2871              : 
    2872              :     static const PQconninfoOption *options = NULL;
    2873              : 
    2874           26 :     initStringInfo(&buf);
    2875              : 
    2876              :     /*
    2877              :      * Get list of valid libpq options.
    2878              :      *
    2879              :      * To avoid unnecessary work, we get the list once and use it throughout
    2880              :      * the lifetime of this backend process.  We don't need to care about
    2881              :      * memory context issues, because PQconndefaults allocates with malloc.
    2882              :      */
    2883           26 :     if (!options)
    2884              :     {
    2885            7 :         options = PQconndefaults();
    2886            7 :         if (!options)           /* assume reason for failure is OOM */
    2887            0 :             ereport(ERROR,
    2888              :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2889              :                      errmsg("out of memory"),
    2890              :                      errdetail("Could not get libpq's default connection options.")));
    2891              :     }
    2892              : 
    2893              :     /* first gather the server connstr options */
    2894           26 :     srvname = pstrdup(servername);
    2895           26 :     truncate_identifier(srvname, strlen(srvname), false);
    2896           26 :     foreign_server = GetForeignServerByName(srvname, true);
    2897              : 
    2898           26 :     if (foreign_server)
    2899              :     {
    2900            6 :         Oid         serverid = foreign_server->serverid;
    2901            6 :         Oid         fdwid = foreign_server->fdwid;
    2902            6 :         Oid         userid = GetUserId();
    2903              : 
    2904            6 :         user_mapping = GetUserMapping(userid, serverid);
    2905            6 :         fdw = GetForeignDataWrapper(fdwid);
    2906              : 
    2907              :         /* Check permissions, user must have usage on the server. */
    2908            6 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2909            6 :         if (aclresult != ACLCHECK_OK)
    2910            0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2911              : 
    2912              :         /*
    2913              :          * First append hardcoded options needed for SCRAM pass-through, so if
    2914              :          * the user overwrites these options we can ereport on
    2915              :          * dblink_connstr_check and dblink_security_check.
    2916              :          */
    2917            6 :         if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
    2918            4 :             appendSCRAMKeysInfo(&buf);
    2919              : 
    2920            6 :         foreach(cell, fdw->options)
    2921              :         {
    2922            0 :             DefElem    *def = lfirst(cell);
    2923              : 
    2924            0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2925            0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2926            0 :                                  escape_param_str(strVal(def->arg)));
    2927              :         }
    2928              : 
    2929           27 :         foreach(cell, foreign_server->options)
    2930              :         {
    2931           21 :             DefElem    *def = lfirst(cell);
    2932              : 
    2933           21 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2934           17 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2935           17 :                                  escape_param_str(strVal(def->arg)));
    2936              :         }
    2937              : 
    2938           12 :         foreach(cell, user_mapping->options)
    2939              :         {
    2940              : 
    2941            6 :             DefElem    *def = lfirst(cell);
    2942              : 
    2943            6 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2944            6 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2945            6 :                                  escape_param_str(strVal(def->arg)));
    2946              :         }
    2947              : 
    2948            6 :         return buf.data;
    2949              :     }
    2950              :     else
    2951           20 :         return NULL;
    2952              : }
    2953              : 
    2954              : /*
    2955              :  * Escaping libpq connect parameter strings.
    2956              :  *
    2957              :  * Replaces "'" with "\'" and "\" with "\\".
    2958              :  */
    2959              : static char *
    2960           23 : escape_param_str(const char *str)
    2961              : {
    2962              :     const char *cp;
    2963              :     StringInfoData buf;
    2964              : 
    2965           23 :     initStringInfo(&buf);
    2966              : 
    2967          205 :     for (cp = str; *cp; cp++)
    2968              :     {
    2969          182 :         if (*cp == '\\' || *cp == '\'')
    2970            0 :             appendStringInfoChar(&buf, '\\');
    2971          182 :         appendStringInfoChar(&buf, *cp);
    2972              :     }
    2973              : 
    2974           23 :     return buf.data;
    2975              : }
    2976              : 
    2977              : /*
    2978              :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2979              :  * functions, and translate to the internal representation.
    2980              :  *
    2981              :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2982              :  * argument (the need for the separate count argument is historical, but we
    2983              :  * still check it).  We check that each attnum corresponds to a valid,
    2984              :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2985              :  * listed twice, though the actual use-case for such things is dubious.
    2986              :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2987              :  * physical not logical column numbers; this was changed for future-proofing.
    2988              :  *
    2989              :  * The internal representation is a palloc'd int array of 0-based physical
    2990              :  * attnums.
    2991              :  */
    2992              : static void
    2993           18 : validate_pkattnums(Relation rel,
    2994              :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2995              :                    int **pkattnums, int *pknumatts)
    2996              : {
    2997           18 :     TupleDesc   tupdesc = rel->rd_att;
    2998           18 :     int         natts = tupdesc->natts;
    2999              :     int         i;
    3000              : 
    3001              :     /* Don't take more array elements than there are */
    3002           18 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    3003              : 
    3004              :     /* Must have at least one pk attnum selected */
    3005           18 :     if (pknumatts_arg <= 0)
    3006            0 :         ereport(ERROR,
    3007              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3008              :                  errmsg("number of key attributes must be > 0")));
    3009              : 
    3010              :     /* Allocate output array */
    3011           18 :     *pkattnums = palloc_array(int, pknumatts_arg);
    3012           18 :     *pknumatts = pknumatts_arg;
    3013              : 
    3014              :     /* Validate attnums and convert to internal form */
    3015           57 :     for (i = 0; i < pknumatts_arg; i++)
    3016              :     {
    3017           45 :         int         pkattnum = pkattnums_arg->values[i];
    3018              :         int         lnum;
    3019              :         int         j;
    3020              : 
    3021              :         /* Can throw error immediately if out of range */
    3022           45 :         if (pkattnum <= 0 || pkattnum > natts)
    3023            6 :             ereport(ERROR,
    3024              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3025              :                      errmsg("invalid attribute number %d", pkattnum)));
    3026              : 
    3027              :         /* Identify which physical column has this logical number */
    3028           39 :         lnum = 0;
    3029           69 :         for (j = 0; j < natts; j++)
    3030              :         {
    3031              :             /* dropped columns don't count */
    3032           69 :             if (TupleDescCompactAttr(tupdesc, j)->attisdropped)
    3033            3 :                 continue;
    3034              : 
    3035           66 :             if (++lnum == pkattnum)
    3036           39 :                 break;
    3037              :         }
    3038              : 
    3039           39 :         if (j < natts)
    3040           39 :             (*pkattnums)[i] = j;
    3041              :         else
    3042            0 :             ereport(ERROR,
    3043              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3044              :                      errmsg("invalid attribute number %d", pkattnum)));
    3045              :     }
    3046           12 : }
    3047              : 
    3048              : /*
    3049              :  * Check if the specified connection option is valid.
    3050              :  *
    3051              :  * We basically allow whatever libpq thinks is an option, with these
    3052              :  * restrictions:
    3053              :  *      debug options: disallowed
    3054              :  *      "client_encoding": disallowed
    3055              :  *      "user": valid only in USER MAPPING options
    3056              :  *      secure options (eg password): valid only in USER MAPPING options
    3057              :  *      others: valid only in FOREIGN SERVER options
    3058              :  *
    3059              :  * We disallow client_encoding because it would be overridden anyway via
    3060              :  * PQclientEncoding; allowing it to be specified would merely promote
    3061              :  * confusion.
    3062              :  */
    3063              : static bool
    3064          478 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    3065              :                        Oid context)
    3066              : {
    3067              :     const PQconninfoOption *opt;
    3068              : 
    3069              :     /* Look up the option in libpq result */
    3070        12050 :     for (opt = options; opt->keyword; opt++)
    3071              :     {
    3072        12044 :         if (strcmp(opt->keyword, option) == 0)
    3073          472 :             break;
    3074              :     }
    3075          478 :     if (opt->keyword == NULL)
    3076            6 :         return false;
    3077              : 
    3078              :     /* Disallow debug options (particularly "replication") */
    3079          472 :     if (strchr(opt->dispchar, 'D'))
    3080           34 :         return false;
    3081              : 
    3082              :     /* Disallow "client_encoding" */
    3083          438 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    3084            8 :         return false;
    3085              : 
    3086              :     /*
    3087              :      * Disallow OAuth options for now, since the builtin flow communicates on
    3088              :      * stderr by default and can't cache tokens yet.
    3089              :      */
    3090          430 :     if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
    3091           44 :         return false;
    3092              : 
    3093              :     /*
    3094              :      * If the option is "user" or marked secure, it should be specified only
    3095              :      * in USER MAPPING.  Others should be specified only in SERVER.
    3096              :      */
    3097          386 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3098              :     {
    3099           38 :         if (context != UserMappingRelationId)
    3100            9 :             return false;
    3101              :     }
    3102              :     else
    3103              :     {
    3104          348 :         if (context != ForeignServerRelationId)
    3105          234 :             return false;
    3106              :     }
    3107              : 
    3108          143 :     return true;
    3109              : }
    3110              : 
    3111              : /*
    3112              :  * Same as is_valid_dblink_option but also check for only dblink_fdw specific
    3113              :  * options.
    3114              :  */
    3115              : static bool
    3116           39 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
    3117              :                            Oid context)
    3118              : {
    3119           39 :     if (strcmp(option, "use_scram_passthrough") == 0)
    3120            4 :         return true;
    3121              : 
    3122           35 :     return is_valid_dblink_option(options, option, context);
    3123              : }
    3124              : 
    3125              : /*
    3126              :  * Copy the remote session's values of GUCs that affect datatype I/O
    3127              :  * and apply them locally in a new GUC nesting level.  Returns the new
    3128              :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3129              :  * or -1 if no new nestlevel was needed.
    3130              :  *
    3131              :  * We use the equivalent of a function SET option to allow the settings to
    3132              :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3133              :  * thrown in between, guc.c will take care of undoing the settings.
    3134              :  */
    3135              : static int
    3136           34 : applyRemoteGucs(PGconn *conn)
    3137              : {
    3138              :     static const char *const GUCsAffectingIO[] = {
    3139              :         "DateStyle",
    3140              :         "IntervalStyle"
    3141              :     };
    3142              : 
    3143           34 :     int         nestlevel = -1;
    3144              :     int         i;
    3145              : 
    3146          102 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3147              :     {
    3148           68 :         const char *gucName = GUCsAffectingIO[i];
    3149           68 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3150              :         const char *localVal;
    3151              : 
    3152              :         /*
    3153              :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3154              :          * that's okay because its output format won't be ambiguous.  So just
    3155              :          * skip the GUC if we don't get a value for it.  (We might eventually
    3156              :          * need more complicated logic with remote-version checks here.)
    3157              :          */
    3158           68 :         if (remoteVal == NULL)
    3159            0 :             continue;
    3160              : 
    3161              :         /*
    3162              :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3163              :          * have the same value.
    3164              :          */
    3165           68 :         localVal = GetConfigOption(gucName, false, false);
    3166              :         Assert(localVal != NULL);
    3167              : 
    3168           68 :         if (strcmp(remoteVal, localVal) == 0)
    3169           51 :             continue;
    3170              : 
    3171              :         /* Create new GUC nest level if we didn't already */
    3172           17 :         if (nestlevel < 0)
    3173            9 :             nestlevel = NewGUCNestLevel();
    3174              : 
    3175              :         /* Apply the option (this will throw error on failure) */
    3176           17 :         (void) set_config_option(gucName, remoteVal,
    3177              :                                  PGC_USERSET, PGC_S_SESSION,
    3178              :                                  GUC_ACTION_SAVE, true, 0, false);
    3179              :     }
    3180              : 
    3181           34 :     return nestlevel;
    3182              : }
    3183              : 
    3184              : /*
    3185              :  * Restore local GUCs after they have been overlaid with remote settings.
    3186              :  */
    3187              : static void
    3188           35 : restoreLocalGucs(int nestlevel)
    3189              : {
    3190              :     /* Do nothing if no new nestlevel was created */
    3191           35 :     if (nestlevel > 0)
    3192            8 :         AtEOXact_GUC(true, nestlevel);
    3193           35 : }
    3194              : 
    3195              : /*
    3196              :  * Append SCRAM client key and server key information from the global
    3197              :  * MyProcPort into the given StringInfo buffer.
    3198              :  */
    3199              : static void
    3200            4 : appendSCRAMKeysInfo(StringInfo buf)
    3201              : {
    3202              :     int         len;
    3203              :     int         encoded_len;
    3204              :     char       *client_key;
    3205              :     char       *server_key;
    3206              : 
    3207            4 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
    3208              :     /* don't forget the zero-terminator */
    3209            4 :     client_key = palloc0(len + 1);
    3210            4 :     encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
    3211              :                                 sizeof(MyProcPort->scram_ClientKey),
    3212              :                                 client_key, len);
    3213            4 :     if (encoded_len < 0)
    3214            0 :         elog(ERROR, "could not encode SCRAM client key");
    3215              : 
    3216            4 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
    3217              :     /* don't forget the zero-terminator */
    3218            4 :     server_key = palloc0(len + 1);
    3219            4 :     encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
    3220              :                                 sizeof(MyProcPort->scram_ServerKey),
    3221              :                                 server_key, len);
    3222            4 :     if (encoded_len < 0)
    3223            0 :         elog(ERROR, "could not encode SCRAM server key");
    3224              : 
    3225            4 :     appendStringInfo(buf, "scram_client_key='%s' ", client_key);
    3226            4 :     appendStringInfo(buf, "scram_server_key='%s' ", server_key);
    3227            4 :     appendStringInfoString(buf, "require_auth='scram-sha-256' ");
    3228              : 
    3229            4 :     pfree(client_key);
    3230            4 :     pfree(server_key);
    3231            4 : }
    3232              : 
    3233              : 
    3234              : static bool
    3235            4 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
    3236              : {
    3237              :     ListCell   *cell;
    3238              : 
    3239           16 :     foreach(cell, foreign_server->options)
    3240              :     {
    3241           16 :         DefElem    *def = lfirst(cell);
    3242              : 
    3243           16 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3244            4 :             return defGetBoolean(def);
    3245              :     }
    3246              : 
    3247            0 :     foreach(cell, user->options)
    3248              :     {
    3249            0 :         DefElem    *def = (DefElem *) lfirst(cell);
    3250              : 
    3251            0 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3252            0 :             return defGetBoolean(def);
    3253              :     }
    3254              : 
    3255            0 :     return false;
    3256              : }
        

Generated by: LCOV version 2.0-1