LCOV - code coverage report
Current view: top level - contrib/dblink - dblink.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19devel Lines: 86.7 % 1120 971
Test Date: 2026-04-06 21:16:29 Functions: 97.5 % 81 79
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*
       2              :  * dblink.c
       3              :  *
       4              :  * Functions returning results from a remote database
       5              :  *
       6              :  * Joe Conway <mail@joeconway.com>
       7              :  * And contributors:
       8              :  * Darko Prenosil <Darko.Prenosil@finteh.hr>
       9              :  * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
      10              :  *
      11              :  * contrib/dblink/dblink.c
      12              :  * Copyright (c) 2001-2026, PostgreSQL Global Development Group
      13              :  * ALL RIGHTS RESERVED;
      14              :  *
      15              :  * Permission to use, copy, modify, and distribute this software and its
      16              :  * documentation for any purpose, without fee, and without a written agreement
      17              :  * is hereby granted, provided that the above copyright notice and this
      18              :  * paragraph and the following two paragraphs appear in all copies.
      19              :  *
      20              :  * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
      21              :  * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
      22              :  * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
      23              :  * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
      24              :  * POSSIBILITY OF SUCH DAMAGE.
      25              :  *
      26              :  * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
      27              :  * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
      28              :  * AND FITNESS FOR A PARTICULAR PURPOSE.  THE SOFTWARE PROVIDED HEREUNDER IS
      29              :  * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
      30              :  * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
      31              :  *
      32              :  */
      33              : #include "postgres.h"
      34              : 
      35              : #include <limits.h>
      36              : 
      37              : #include "access/htup_details.h"
      38              : #include "access/relation.h"
      39              : #include "access/reloptions.h"
      40              : #include "access/table.h"
      41              : #include "catalog/namespace.h"
      42              : #include "catalog/pg_foreign_data_wrapper.h"
      43              : #include "catalog/pg_foreign_server.h"
      44              : #include "catalog/pg_type.h"
      45              : #include "catalog/pg_user_mapping.h"
      46              : #include "commands/defrem.h"
      47              : #include "common/base64.h"
      48              : #include "executor/spi.h"
      49              : #include "foreign/foreign.h"
      50              : #include "funcapi.h"
      51              : #include "lib/stringinfo.h"
      52              : #include "libpq-fe.h"
      53              : #include "libpq/libpq-be.h"
      54              : #include "libpq/libpq-be-fe-helpers.h"
      55              : #include "mb/pg_wchar.h"
      56              : #include "miscadmin.h"
      57              : #include "parser/scansup.h"
      58              : #include "utils/acl.h"
      59              : #include "utils/builtins.h"
      60              : #include "utils/fmgroids.h"
      61              : #include "utils/guc.h"
      62              : #include "utils/hsearch.h"
      63              : #include "utils/lsyscache.h"
      64              : #include "utils/memutils.h"
      65              : #include "utils/rel.h"
      66              : #include "utils/tuplestore.h"
      67              : #include "utils/varlena.h"
      68              : #include "utils/wait_event.h"
      69              : 
      70           17 : PG_MODULE_MAGIC_EXT(
      71              :                     .name = "dblink",
      72              :                     .version = PG_VERSION
      73              : );
      74              : 
      75              : typedef struct remoteConn
      76              : {
      77              :     PGconn     *conn;           /* Hold the remote connection */
      78              :     int         openCursorCount;    /* The number of open cursors */
      79              :     bool        newXactForCursor;   /* Opened a transaction for a cursor */
      80              : } remoteConn;
      81              : 
      82              : typedef struct storeInfo
      83              : {
      84              :     FunctionCallInfo fcinfo;
      85              :     Tuplestorestate *tuplestore;
      86              :     AttInMetadata *attinmeta;
      87              :     MemoryContext tmpcontext;
      88              :     char      **cstrs;
      89              :     /* temp storage for results to avoid leaks on exception */
      90              :     PGresult   *last_res;
      91              :     PGresult   *cur_res;
      92              : } storeInfo;
      93              : 
      94              : /*
      95              :  * Internal declarations
      96              :  */
      97              : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
      98              : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
      99              : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
     100              :                               PGresult *res);
     101              : static void materializeQueryResult(FunctionCallInfo fcinfo,
     102              :                                    PGconn *conn,
     103              :                                    const char *conname,
     104              :                                    const char *sql,
     105              :                                    bool fail);
     106              : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
     107              : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
     108              : static remoteConn *getConnectionByName(const char *name);
     109              : static HTAB *createConnHash(void);
     110              : static remoteConn *createNewConnection(const char *name);
     111              : static void deleteConnection(const char *name);
     112              : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
     113              : static char **get_text_array_contents(ArrayType *array, int *numitems);
     114              : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     115              : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
     116              : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
     117              : static char *quote_ident_cstr(char *rawstr);
     118              : static int  get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
     119              : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
     120              : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
     121              : static char *generate_relation_name(Relation rel);
     122              : static void dblink_connstr_check(const char *connstr);
     123              : static bool dblink_connstr_has_pw(const char *connstr);
     124              : static void dblink_security_check(PGconn *conn, const char *connname,
     125              :                                   const char *connstr);
     126              : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
     127              :                              bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
     128              : static char *get_connect_string(const char *servername);
     129              : static char *escape_param_str(const char *str);
     130              : static void validate_pkattnums(Relation rel,
     131              :                                int2vector *pkattnums_arg, int32 pknumatts_arg,
     132              :                                int **pkattnums, int *pknumatts);
     133              : static bool is_valid_dblink_option(const PQconninfoOption *options,
     134              :                                    const char *option, Oid context);
     135              : static int  applyRemoteGucs(PGconn *conn);
     136              : static void restoreLocalGucs(int nestlevel);
     137              : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
     138              : static void appendSCRAMKeysInfo(StringInfo buf);
     139              : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
     140              :                                        Oid context);
     141              : static bool dblink_connstr_has_required_scram_options(const char *connstr);
     142              : 
     143              : /* Global */
     144              : static remoteConn *pconn = NULL;
     145              : static HTAB *remoteConnHash = NULL;
     146              : 
     147              : /* custom wait event values, retrieved from shared memory */
     148              : static uint32 dblink_we_connect = 0;
     149              : static uint32 dblink_we_get_conn = 0;
     150              : static uint32 dblink_we_get_result = 0;
     151              : 
     152              : /*
     153              :  *  Following is hash that holds multiple remote connections.
     154              :  *  Calling convention of each dblink function changes to accept
     155              :  *  connection name as the first parameter. The connection hash is
     156              :  *  much like ecpg e.g. a mapping between a name and a PGconn object.
     157              :  *
     158              :  *  To avoid potentially leaking a PGconn object in case of out-of-memory
     159              :  *  errors, we first create the hash entry, then open the PGconn.
     160              :  *  Hence, a hash entry whose rconn.conn pointer is NULL must be
     161              :  *  understood as a leftover from a failed create; it should be ignored
     162              :  *  by lookup operations, and silently replaced by create operations.
     163              :  */
     164              : 
     165              : typedef struct remoteConnHashEnt
     166              : {
     167              :     char        name[NAMEDATALEN];
     168              :     remoteConn  rconn;
     169              : } remoteConnHashEnt;
     170              : 
     171              : /* initial number of connection hashes */
     172              : #define NUMCONN 16
     173              : 
     174              : pg_noreturn static void
     175            0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
     176              : {
     177            0 :     char       *msg = pchomp(PQerrorMessage(conn));
     178              : 
     179            0 :     PQclear(res);
     180            0 :     elog(ERROR, "%s: %s", p2, msg);
     181              : }
     182              : 
     183              : pg_noreturn static void
     184            3 : dblink_conn_not_avail(const char *conname)
     185              : {
     186            3 :     if (conname)
     187            1 :         ereport(ERROR,
     188              :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     189              :                  errmsg("connection \"%s\" not available", conname)));
     190              :     else
     191            2 :         ereport(ERROR,
     192              :                 (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
     193              :                  errmsg("connection not available")));
     194              : }
     195              : 
     196              : static void
     197           37 : dblink_get_conn(char *conname_or_str,
     198              :                 PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
     199              : {
     200           37 :     remoteConn *rconn = getConnectionByName(conname_or_str);
     201              :     PGconn     *conn;
     202              :     char       *conname;
     203              :     bool        freeconn;
     204              : 
     205           37 :     if (rconn)
     206              :     {
     207           27 :         conn = rconn->conn;
     208           27 :         conname = conname_or_str;
     209           27 :         freeconn = false;
     210              :     }
     211              :     else
     212              :     {
     213              :         const char *connstr;
     214              : 
     215           10 :         connstr = get_connect_string(conname_or_str);
     216           10 :         if (connstr == NULL)
     217            6 :             connstr = conname_or_str;
     218           10 :         dblink_connstr_check(connstr);
     219              : 
     220              :         /* first time, allocate or get the custom wait event */
     221            9 :         if (dblink_we_get_conn == 0)
     222            5 :             dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
     223              : 
     224              :         /* OK to make connection */
     225            9 :         conn = libpqsrv_connect(connstr, dblink_we_get_conn);
     226              : 
     227            9 :         if (PQstatus(conn) == CONNECTION_BAD)
     228              :         {
     229            3 :             char       *msg = pchomp(PQerrorMessage(conn));
     230              : 
     231            3 :             libpqsrv_disconnect(conn);
     232            3 :             ereport(ERROR,
     233              :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     234              :                      errmsg("could not establish connection"),
     235              :                      errdetail_internal("%s", msg)));
     236              :         }
     237              : 
     238            6 :         PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     239              :                             "received message via remote connection");
     240              : 
     241            6 :         dblink_security_check(conn, NULL, connstr);
     242            6 :         if (PQclientEncoding(conn) != GetDatabaseEncoding())
     243            0 :             PQsetClientEncoding(conn, GetDatabaseEncodingName());
     244            6 :         freeconn = true;
     245            6 :         conname = NULL;
     246              :     }
     247              : 
     248           33 :     *conn_p = conn;
     249           33 :     *conname_p = conname;
     250           33 :     *freeconn_p = freeconn;
     251           33 : }
     252              : 
     253              : static PGconn *
     254           17 : dblink_get_named_conn(const char *conname)
     255              : {
     256           17 :     remoteConn *rconn = getConnectionByName(conname);
     257              : 
     258           17 :     if (rconn)
     259           17 :         return rconn->conn;
     260              : 
     261            0 :     dblink_conn_not_avail(conname);
     262              :     return NULL;                /* keep compiler quiet */
     263              : }
     264              : 
     265              : static void
     266          124 : dblink_init(void)
     267              : {
     268          124 :     if (!pconn)
     269              :     {
     270            7 :         if (dblink_we_get_result == 0)
     271            7 :             dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
     272              : 
     273            7 :         pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
     274            7 :         pconn->conn = NULL;
     275            7 :         pconn->openCursorCount = 0;
     276            7 :         pconn->newXactForCursor = false;
     277              :     }
     278          124 : }
     279              : 
     280              : /*
     281              :  * Create a persistent connection to another database
     282              :  */
     283           13 : PG_FUNCTION_INFO_V1(dblink_connect);
     284              : Datum
     285           16 : dblink_connect(PG_FUNCTION_ARGS)
     286              : {
     287           16 :     char       *conname_or_str = NULL;
     288           16 :     char       *connstr = NULL;
     289           16 :     char       *connname = NULL;
     290              :     char       *msg;
     291           16 :     PGconn     *conn = NULL;
     292           16 :     remoteConn *rconn = NULL;
     293              : 
     294           16 :     dblink_init();
     295              : 
     296           16 :     if (PG_NARGS() == 2)
     297              :     {
     298           11 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
     299           11 :         connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     300              :     }
     301            5 :     else if (PG_NARGS() == 1)
     302            5 :         conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
     303              : 
     304              :     /* first check for valid foreign data server */
     305           16 :     connstr = get_connect_string(conname_or_str);
     306           16 :     if (connstr == NULL)
     307           14 :         connstr = conname_or_str;
     308              : 
     309              :     /* check password in connection string if not superuser */
     310           16 :     dblink_connstr_check(connstr);
     311              : 
     312              :     /* first time, allocate or get the custom wait event */
     313           15 :     if (dblink_we_connect == 0)
     314            2 :         dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
     315              : 
     316              :     /* if we need a hashtable entry, make that first, since it might fail */
     317           15 :     if (connname)
     318              :     {
     319           10 :         rconn = createNewConnection(connname);
     320              :         Assert(rconn->conn == NULL);
     321              :     }
     322              : 
     323              :     /* OK to make connection */
     324           14 :     conn = libpqsrv_connect(connstr, dblink_we_connect);
     325              : 
     326           14 :     if (PQstatus(conn) == CONNECTION_BAD)
     327              :     {
     328            0 :         msg = pchomp(PQerrorMessage(conn));
     329            0 :         libpqsrv_disconnect(conn);
     330            0 :         if (connname)
     331            0 :             deleteConnection(connname);
     332              : 
     333            0 :         ereport(ERROR,
     334              :                 (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     335              :                  errmsg("could not establish connection"),
     336              :                  errdetail_internal("%s", msg)));
     337              :     }
     338              : 
     339           14 :     PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     340              :                         "received message via remote connection");
     341              : 
     342              :     /* check password actually used if not superuser */
     343           14 :     dblink_security_check(conn, connname, connstr);
     344              : 
     345              :     /* attempt to set client encoding to match server encoding, if needed */
     346           14 :     if (PQclientEncoding(conn) != GetDatabaseEncoding())
     347            0 :         PQsetClientEncoding(conn, GetDatabaseEncodingName());
     348              : 
     349              :     /* all OK, save away the conn */
     350           14 :     if (connname)
     351              :     {
     352            9 :         rconn->conn = conn;
     353              :     }
     354              :     else
     355              :     {
     356            5 :         if (pconn->conn)
     357            1 :             libpqsrv_disconnect(pconn->conn);
     358            5 :         pconn->conn = conn;
     359              :     }
     360              : 
     361           14 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     362              : }
     363              : 
     364              : /*
     365              :  * Clear a persistent connection to another database
     366              :  */
     367            8 : PG_FUNCTION_INFO_V1(dblink_disconnect);
     368              : Datum
     369           13 : dblink_disconnect(PG_FUNCTION_ARGS)
     370              : {
     371           13 :     char       *conname = NULL;
     372           13 :     remoteConn *rconn = NULL;
     373           13 :     PGconn     *conn = NULL;
     374              : 
     375           13 :     dblink_init();
     376              : 
     377           13 :     if (PG_NARGS() == 1)
     378              :     {
     379            9 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     380            9 :         rconn = getConnectionByName(conname);
     381            9 :         if (rconn)
     382            8 :             conn = rconn->conn;
     383              :     }
     384              :     else
     385            4 :         conn = pconn->conn;
     386              : 
     387           13 :     if (!conn)
     388            1 :         dblink_conn_not_avail(conname);
     389              : 
     390           12 :     libpqsrv_disconnect(conn);
     391           12 :     if (rconn)
     392            8 :         deleteConnection(conname);
     393              :     else
     394            4 :         pconn->conn = NULL;
     395              : 
     396           12 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     397              : }
     398              : 
     399              : /*
     400              :  * opens a cursor using a persistent connection
     401              :  */
     402           13 : PG_FUNCTION_INFO_V1(dblink_open);
     403              : Datum
     404            9 : dblink_open(PG_FUNCTION_ARGS)
     405              : {
     406            9 :     PGresult   *res = NULL;
     407              :     PGconn     *conn;
     408            9 :     char       *curname = NULL;
     409            9 :     char       *sql = NULL;
     410            9 :     char       *conname = NULL;
     411              :     StringInfoData buf;
     412            9 :     remoteConn *rconn = NULL;
     413            9 :     bool        fail = true;    /* default to backward compatible behavior */
     414              : 
     415            9 :     dblink_init();
     416            9 :     initStringInfo(&buf);
     417              : 
     418            9 :     if (PG_NARGS() == 2)
     419              :     {
     420              :         /* text,text */
     421            2 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     422            2 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     423            2 :         rconn = pconn;
     424              :     }
     425            7 :     else if (PG_NARGS() == 3)
     426              :     {
     427              :         /* might be text,text,text or text,text,bool */
     428            6 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     429              :         {
     430            1 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     431            1 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     432            1 :             fail = PG_GETARG_BOOL(2);
     433            1 :             rconn = pconn;
     434              :         }
     435              :         else
     436              :         {
     437            5 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     438            5 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     439            5 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     440            5 :             rconn = getConnectionByName(conname);
     441              :         }
     442              :     }
     443            1 :     else if (PG_NARGS() == 4)
     444              :     {
     445              :         /* text,text,text,bool */
     446            1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     447            1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     448            1 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
     449            1 :         fail = PG_GETARG_BOOL(3);
     450            1 :         rconn = getConnectionByName(conname);
     451              :     }
     452              : 
     453            9 :     if (!rconn || !rconn->conn)
     454            0 :         dblink_conn_not_avail(conname);
     455              : 
     456            9 :     conn = rconn->conn;
     457              : 
     458              :     /* If we are not in a transaction, start one */
     459            9 :     if (PQtransactionStatus(conn) == PQTRANS_IDLE)
     460              :     {
     461            7 :         res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
     462            7 :         if (PQresultStatus(res) != PGRES_COMMAND_OK)
     463            0 :             dblink_res_internalerror(conn, res, "begin error");
     464            7 :         PQclear(res);
     465            7 :         rconn->newXactForCursor = true;
     466              : 
     467              :         /*
     468              :          * Since transaction state was IDLE, we force cursor count to
     469              :          * initially be 0. This is needed as a previous ABORT might have wiped
     470              :          * out our transaction without maintaining the cursor count for us.
     471              :          */
     472            7 :         rconn->openCursorCount = 0;
     473              :     }
     474              : 
     475              :     /* if we started a transaction, increment cursor count */
     476            9 :     if (rconn->newXactForCursor)
     477            9 :         (rconn->openCursorCount)++;
     478              : 
     479            9 :     appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
     480            9 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     481            9 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     482              :     {
     483            2 :         dblink_res_error(conn, conname, res, fail,
     484              :                          "while opening cursor \"%s\"", curname);
     485            2 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     486              :     }
     487              : 
     488            7 :     PQclear(res);
     489            7 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     490              : }
     491              : 
     492              : /*
     493              :  * closes a cursor
     494              :  */
     495           10 : PG_FUNCTION_INFO_V1(dblink_close);
     496              : Datum
     497            5 : dblink_close(PG_FUNCTION_ARGS)
     498              : {
     499              :     PGconn     *conn;
     500            5 :     PGresult   *res = NULL;
     501            5 :     char       *curname = NULL;
     502            5 :     char       *conname = NULL;
     503              :     StringInfoData buf;
     504            5 :     remoteConn *rconn = NULL;
     505            5 :     bool        fail = true;    /* default to backward compatible behavior */
     506              : 
     507            5 :     dblink_init();
     508            5 :     initStringInfo(&buf);
     509              : 
     510            5 :     if (PG_NARGS() == 1)
     511              :     {
     512              :         /* text */
     513            0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     514            0 :         rconn = pconn;
     515              :     }
     516            5 :     else if (PG_NARGS() == 2)
     517              :     {
     518              :         /* might be text,text or text,bool */
     519            5 :         if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     520              :         {
     521            2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     522            2 :             fail = PG_GETARG_BOOL(1);
     523            2 :             rconn = pconn;
     524              :         }
     525              :         else
     526              :         {
     527            3 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     528            3 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     529            3 :             rconn = getConnectionByName(conname);
     530              :         }
     531              :     }
     532            5 :     if (PG_NARGS() == 3)
     533              :     {
     534              :         /* text,text,bool */
     535            0 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     536            0 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     537            0 :         fail = PG_GETARG_BOOL(2);
     538            0 :         rconn = getConnectionByName(conname);
     539              :     }
     540              : 
     541            5 :     if (!rconn || !rconn->conn)
     542            0 :         dblink_conn_not_avail(conname);
     543              : 
     544            5 :     conn = rconn->conn;
     545              : 
     546            5 :     appendStringInfo(&buf, "CLOSE %s", curname);
     547              : 
     548              :     /* close the cursor */
     549            5 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     550            5 :     if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
     551              :     {
     552            1 :         dblink_res_error(conn, conname, res, fail,
     553              :                          "while closing cursor \"%s\"", curname);
     554            1 :         PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
     555              :     }
     556              : 
     557            4 :     PQclear(res);
     558              : 
     559              :     /* if we started a transaction, decrement cursor count */
     560            4 :     if (rconn->newXactForCursor)
     561              :     {
     562            4 :         (rconn->openCursorCount)--;
     563              : 
     564              :         /* if count is zero, commit the transaction */
     565            4 :         if (rconn->openCursorCount == 0)
     566              :         {
     567            2 :             rconn->newXactForCursor = false;
     568              : 
     569            2 :             res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
     570            2 :             if (PQresultStatus(res) != PGRES_COMMAND_OK)
     571            0 :                 dblink_res_internalerror(conn, res, "commit error");
     572            2 :             PQclear(res);
     573              :         }
     574              :     }
     575              : 
     576            4 :     PG_RETURN_TEXT_P(cstring_to_text("OK"));
     577              : }
     578              : 
     579              : /*
     580              :  * Fetch results from an open cursor
     581              :  */
     582           13 : PG_FUNCTION_INFO_V1(dblink_fetch);
     583              : Datum
     584           13 : dblink_fetch(PG_FUNCTION_ARGS)
     585              : {
     586           13 :     PGresult   *res = NULL;
     587           13 :     char       *conname = NULL;
     588           13 :     remoteConn *rconn = NULL;
     589           13 :     PGconn     *conn = NULL;
     590              :     StringInfoData buf;
     591           13 :     char       *curname = NULL;
     592           13 :     int         howmany = 0;
     593           13 :     bool        fail = true;    /* default to backward compatible */
     594              : 
     595           13 :     prepTuplestoreResult(fcinfo);
     596              : 
     597           13 :     dblink_init();
     598              : 
     599           13 :     if (PG_NARGS() == 4)
     600              :     {
     601              :         /* text,text,int,bool */
     602            1 :         conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     603            1 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     604            1 :         howmany = PG_GETARG_INT32(2);
     605            1 :         fail = PG_GETARG_BOOL(3);
     606              : 
     607            1 :         rconn = getConnectionByName(conname);
     608            1 :         if (rconn)
     609            1 :             conn = rconn->conn;
     610              :     }
     611           12 :     else if (PG_NARGS() == 3)
     612              :     {
     613              :         /* text,text,int or text,int,bool */
     614            8 :         if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
     615              :         {
     616            2 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     617            2 :             howmany = PG_GETARG_INT32(1);
     618            2 :             fail = PG_GETARG_BOOL(2);
     619            2 :             conn = pconn->conn;
     620              :         }
     621              :         else
     622              :         {
     623            6 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     624            6 :             curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
     625            6 :             howmany = PG_GETARG_INT32(2);
     626              : 
     627            6 :             rconn = getConnectionByName(conname);
     628            6 :             if (rconn)
     629            6 :                 conn = rconn->conn;
     630              :         }
     631              :     }
     632            4 :     else if (PG_NARGS() == 2)
     633              :     {
     634              :         /* text,int */
     635            4 :         curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     636            4 :         howmany = PG_GETARG_INT32(1);
     637            4 :         conn = pconn->conn;
     638              :     }
     639              : 
     640           13 :     if (!conn)
     641            0 :         dblink_conn_not_avail(conname);
     642              : 
     643           13 :     initStringInfo(&buf);
     644           13 :     appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
     645              : 
     646              :     /*
     647              :      * Try to execute the query.  Note that since libpq uses malloc, the
     648              :      * PGresult will be long-lived even though we are still in a short-lived
     649              :      * memory context.
     650              :      */
     651           13 :     res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
     652           26 :     if (!res ||
     653           26 :         (PQresultStatus(res) != PGRES_COMMAND_OK &&
     654           13 :          PQresultStatus(res) != PGRES_TUPLES_OK))
     655              :     {
     656            5 :         dblink_res_error(conn, conname, res, fail,
     657              :                          "while fetching from cursor \"%s\"", curname);
     658            3 :         return (Datum) 0;
     659              :     }
     660            8 :     else if (PQresultStatus(res) == PGRES_COMMAND_OK)
     661              :     {
     662              :         /* cursor does not exist - closed already or bad name */
     663            0 :         PQclear(res);
     664            0 :         ereport(ERROR,
     665              :                 (errcode(ERRCODE_INVALID_CURSOR_NAME),
     666              :                  errmsg("cursor \"%s\" does not exist", curname)));
     667              :     }
     668              : 
     669            8 :     materializeResult(fcinfo, conn, res);
     670            7 :     return (Datum) 0;
     671              : }
     672              : 
     673              : /*
     674              :  * Note: this is the new preferred version of dblink
     675              :  */
     676           19 : PG_FUNCTION_INFO_V1(dblink_record);
     677              : Datum
     678           29 : dblink_record(PG_FUNCTION_ARGS)
     679              : {
     680           29 :     return dblink_record_internal(fcinfo, false);
     681              : }
     682              : 
     683            4 : PG_FUNCTION_INFO_V1(dblink_send_query);
     684              : Datum
     685            6 : dblink_send_query(PG_FUNCTION_ARGS)
     686              : {
     687              :     PGconn     *conn;
     688              :     char       *sql;
     689              :     int         retval;
     690              : 
     691            6 :     if (PG_NARGS() == 2)
     692              :     {
     693            6 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
     694            6 :         sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     695              :     }
     696              :     else
     697              :         /* shouldn't happen */
     698            0 :         elog(ERROR, "wrong number of arguments");
     699              : 
     700              :     /* async query send */
     701            6 :     retval = PQsendQuery(conn, sql);
     702            6 :     if (retval != 1)
     703            0 :         elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
     704              : 
     705            6 :     PG_RETURN_INT32(retval);
     706              : }
     707              : 
     708            6 : PG_FUNCTION_INFO_V1(dblink_get_result);
     709              : Datum
     710            8 : dblink_get_result(PG_FUNCTION_ARGS)
     711              : {
     712            8 :     return dblink_record_internal(fcinfo, true);
     713              : }
     714              : 
     715              : static Datum
     716           37 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
     717              : {
     718           37 :     PGconn     *volatile conn = NULL;
     719           37 :     volatile bool freeconn = false;
     720              : 
     721           37 :     prepTuplestoreResult(fcinfo);
     722              : 
     723           37 :     dblink_init();
     724              : 
     725           37 :     PG_TRY();
     726              :     {
     727           37 :         char       *sql = NULL;
     728           37 :         char       *conname = NULL;
     729           37 :         bool        fail = true;    /* default to backward compatible */
     730              : 
     731           37 :         if (!is_async)
     732              :         {
     733           29 :             if (PG_NARGS() == 3)
     734              :             {
     735              :                 /* text,text,bool */
     736            1 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     737            1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     738            1 :                 fail = PG_GETARG_BOOL(2);
     739            1 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
     740              :             }
     741           28 :             else if (PG_NARGS() == 2)
     742              :             {
     743              :                 /* text,text or text,bool */
     744           21 :                 if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
     745              :                 {
     746            1 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     747            1 :                     fail = PG_GETARG_BOOL(1);
     748            1 :                     conn = pconn->conn;
     749              :                 }
     750              :                 else
     751              :                 {
     752           20 :                     conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     753           20 :                     sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
     754           20 :                     dblink_get_conn(conname, &conn, &conname, &freeconn);
     755              :                 }
     756              :             }
     757            7 :             else if (PG_NARGS() == 1)
     758              :             {
     759              :                 /* text */
     760            7 :                 conn = pconn->conn;
     761            7 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
     762              :             }
     763              :             else
     764              :                 /* shouldn't happen */
     765            0 :                 elog(ERROR, "wrong number of arguments");
     766              :         }
     767              :         else                    /* is_async */
     768              :         {
     769              :             /* get async result */
     770            8 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
     771              : 
     772            8 :             if (PG_NARGS() == 2)
     773              :             {
     774              :                 /* text,bool */
     775            0 :                 fail = PG_GETARG_BOOL(1);
     776            0 :                 conn = dblink_get_named_conn(conname);
     777              :             }
     778            8 :             else if (PG_NARGS() == 1)
     779              :             {
     780              :                 /* text */
     781            8 :                 conn = dblink_get_named_conn(conname);
     782              :             }
     783              :             else
     784              :                 /* shouldn't happen */
     785            0 :                 elog(ERROR, "wrong number of arguments");
     786              :         }
     787              : 
     788           33 :         if (!conn)
     789            2 :             dblink_conn_not_avail(conname);
     790              : 
     791           31 :         if (!is_async)
     792              :         {
     793              :             /* synchronous query, use efficient tuple collection method */
     794           23 :             materializeQueryResult(fcinfo, conn, conname, sql, fail);
     795              :         }
     796              :         else
     797              :         {
     798              :             /* async result retrieval, do it the old way */
     799            8 :             PGresult   *res = libpqsrv_get_result(conn, dblink_we_get_result);
     800              : 
     801              :             /* NULL means we're all done with the async results */
     802            8 :             if (res)
     803              :             {
     804            5 :                 if (PQresultStatus(res) != PGRES_COMMAND_OK &&
     805            5 :                     PQresultStatus(res) != PGRES_TUPLES_OK)
     806              :                 {
     807            0 :                     dblink_res_error(conn, conname, res, fail,
     808              :                                      "while executing query");
     809              :                     /* if fail isn't set, we'll return an empty query result */
     810              :                 }
     811              :                 else
     812              :                 {
     813            5 :                     materializeResult(fcinfo, conn, res);
     814              :                 }
     815              :             }
     816              :         }
     817              :     }
     818            6 :     PG_FINALLY();
     819              :     {
     820              :         /* if needed, close the connection to the database */
     821           37 :         if (freeconn)
     822            5 :             libpqsrv_disconnect(conn);
     823              :     }
     824           37 :     PG_END_TRY();
     825              : 
     826           31 :     return (Datum) 0;
     827              : }
     828              : 
     829              : /*
     830              :  * Verify function caller can handle a tuplestore result, and set up for that.
     831              :  *
     832              :  * Note: if the caller returns without actually creating a tuplestore, the
     833              :  * executor will treat the function result as an empty set.
     834              :  */
     835              : static void
     836           50 : prepTuplestoreResult(FunctionCallInfo fcinfo)
     837              : {
     838           50 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     839              : 
     840              :     /* check to see if query supports us returning a tuplestore */
     841           50 :     if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
     842            0 :         ereport(ERROR,
     843              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     844              :                  errmsg("set-valued function called in context that cannot accept a set")));
     845           50 :     if (!(rsinfo->allowedModes & SFRM_Materialize))
     846            0 :         ereport(ERROR,
     847              :                 (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     848              :                  errmsg("materialize mode required, but it is not allowed in this context")));
     849              : 
     850              :     /* let the executor know we're sending back a tuplestore */
     851           50 :     rsinfo->returnMode = SFRM_Materialize;
     852              : 
     853              :     /* caller must fill these to return a non-empty result */
     854           50 :     rsinfo->setResult = NULL;
     855           50 :     rsinfo->setDesc = NULL;
     856           50 : }
     857              : 
     858              : /*
     859              :  * Copy the contents of the PGresult into a tuplestore to be returned
     860              :  * as the result of the current function.
     861              :  * The PGresult will be released in this function.
     862              :  */
     863              : static void
     864           13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
     865              : {
     866           13 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
     867              :     TupleDesc   tupdesc;
     868              :     bool        is_sql_cmd;
     869              :     int         ntuples;
     870              :     int         nfields;
     871              : 
     872              :     /* prepTuplestoreResult must have been called previously */
     873              :     Assert(rsinfo->returnMode == SFRM_Materialize);
     874              : 
     875           13 :     if (PQresultStatus(res) == PGRES_COMMAND_OK)
     876              :     {
     877            0 :         is_sql_cmd = true;
     878              : 
     879              :         /*
     880              :          * need a tuple descriptor representing one TEXT column to return the
     881              :          * command status string as our result tuple
     882              :          */
     883            0 :         tupdesc = CreateTemplateTupleDesc(1);
     884            0 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
     885              :                            TEXTOID, -1, 0);
     886            0 :         TupleDescFinalize(tupdesc);
     887            0 :         ntuples = 1;
     888            0 :         nfields = 1;
     889              :     }
     890              :     else
     891              :     {
     892              :         Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
     893              : 
     894           13 :         is_sql_cmd = false;
     895              : 
     896              :         /* get a tuple descriptor for our result type */
     897           13 :         switch (get_call_result_type(fcinfo, NULL, &tupdesc))
     898              :         {
     899           13 :             case TYPEFUNC_COMPOSITE:
     900              :                 /* success */
     901           13 :                 break;
     902            0 :             case TYPEFUNC_RECORD:
     903              :                 /* failed to determine actual type of RECORD */
     904            0 :                 ereport(ERROR,
     905              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
     906              :                          errmsg("function returning record called in context "
     907              :                                 "that cannot accept type record")));
     908              :                 break;
     909            0 :             default:
     910              :                 /* result type isn't composite */
     911            0 :                 elog(ERROR, "return type must be a row type");
     912              :                 break;
     913              :         }
     914              : 
     915              :         /* make sure we have a persistent copy of the tupdesc */
     916           13 :         tupdesc = CreateTupleDescCopy(tupdesc);
     917           13 :         ntuples = PQntuples(res);
     918           13 :         nfields = PQnfields(res);
     919              :     }
     920              : 
     921              :     /*
     922              :      * check result and tuple descriptor have the same number of columns
     923              :      */
     924           13 :     if (nfields != tupdesc->natts)
     925            0 :         ereport(ERROR,
     926              :                 (errcode(ERRCODE_DATATYPE_MISMATCH),
     927              :                  errmsg("remote query result rowtype does not match "
     928              :                         "the specified FROM clause rowtype")));
     929              : 
     930           13 :     if (ntuples > 0)
     931              :     {
     932              :         AttInMetadata *attinmeta;
     933           13 :         int         nestlevel = -1;
     934              :         Tuplestorestate *tupstore;
     935              :         MemoryContext oldcontext;
     936              :         int         row;
     937              :         char      **values;
     938              : 
     939           13 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
     940              : 
     941              :         /* Set GUCs to ensure we read GUC-sensitive data types correctly */
     942           13 :         if (!is_sql_cmd)
     943           13 :             nestlevel = applyRemoteGucs(conn);
     944              : 
     945           13 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
     946           13 :         tupstore = tuplestore_begin_heap(true, false, work_mem);
     947           13 :         rsinfo->setResult = tupstore;
     948           13 :         rsinfo->setDesc = tupdesc;
     949           13 :         MemoryContextSwitchTo(oldcontext);
     950              : 
     951           13 :         values = palloc_array(char *, nfields);
     952              : 
     953              :         /* put all tuples into the tuplestore */
     954           49 :         for (row = 0; row < ntuples; row++)
     955              :         {
     956              :             HeapTuple   tuple;
     957              : 
     958           37 :             if (!is_sql_cmd)
     959              :             {
     960              :                 int         i;
     961              : 
     962          138 :                 for (i = 0; i < nfields; i++)
     963              :                 {
     964          101 :                     if (PQgetisnull(res, row, i))
     965            0 :                         values[i] = NULL;
     966              :                     else
     967          101 :                         values[i] = PQgetvalue(res, row, i);
     968              :                 }
     969              :             }
     970              :             else
     971              :             {
     972            0 :                 values[0] = PQcmdStatus(res);
     973              :             }
     974              : 
     975              :             /* build the tuple and put it into the tuplestore. */
     976           37 :             tuple = BuildTupleFromCStrings(attinmeta, values);
     977           36 :             tuplestore_puttuple(tupstore, tuple);
     978              :         }
     979              : 
     980              :         /* clean up GUC settings, if we changed any */
     981           12 :         restoreLocalGucs(nestlevel);
     982              :     }
     983              : 
     984           12 :     PQclear(res);
     985           12 : }
     986              : 
     987              : /*
     988              :  * Execute the given SQL command and store its results into a tuplestore
     989              :  * to be returned as the result of the current function.
     990              :  *
     991              :  * This is equivalent to PQexec followed by materializeResult, but we make
     992              :  * use of libpq's single-row mode to avoid accumulating the whole result
     993              :  * inside libpq before it gets transferred to the tuplestore.
     994              :  */
     995              : static void
     996           23 : materializeQueryResult(FunctionCallInfo fcinfo,
     997              :                        PGconn *conn,
     998              :                        const char *conname,
     999              :                        const char *sql,
    1000              :                        bool fail)
    1001              : {
    1002           23 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1003              : 
    1004              :     /* prepTuplestoreResult must have been called previously */
    1005              :     Assert(rsinfo->returnMode == SFRM_Materialize);
    1006              : 
    1007              :     /* Use a PG_TRY block to ensure we pump libpq dry of results */
    1008           23 :     PG_TRY();
    1009              :     {
    1010           23 :         storeInfo   sinfo = {0};
    1011              :         PGresult   *res;
    1012              : 
    1013           23 :         sinfo.fcinfo = fcinfo;
    1014              :         /* Create short-lived memory context for data conversions */
    1015           23 :         sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
    1016              :                                                  "dblink temporary context",
    1017              :                                                  ALLOCSET_DEFAULT_SIZES);
    1018              : 
    1019              :         /* execute query, collecting any tuples into the tuplestore */
    1020           23 :         res = storeQueryResult(&sinfo, conn, sql);
    1021              : 
    1022           23 :         if (!res ||
    1023           23 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1024           23 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1025              :         {
    1026            2 :             dblink_res_error(conn, conname, res, fail,
    1027              :                              "while executing query");
    1028              :             /* if fail isn't set, we'll return an empty query result */
    1029              :         }
    1030           21 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1031              :         {
    1032              :             /*
    1033              :              * storeRow didn't get called, so we need to convert the command
    1034              :              * status string to a tuple manually
    1035              :              */
    1036              :             TupleDesc   tupdesc;
    1037              :             AttInMetadata *attinmeta;
    1038              :             Tuplestorestate *tupstore;
    1039              :             HeapTuple   tuple;
    1040              :             char       *values[1];
    1041              :             MemoryContext oldcontext;
    1042              : 
    1043              :             /*
    1044              :              * need a tuple descriptor representing one TEXT column to return
    1045              :              * the command status string as our result tuple
    1046              :              */
    1047            0 :             tupdesc = CreateTemplateTupleDesc(1);
    1048            0 :             TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
    1049              :                                TEXTOID, -1, 0);
    1050            0 :             TupleDescFinalize(tupdesc);
    1051            0 :             attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1052              : 
    1053            0 :             oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1054            0 :             tupstore = tuplestore_begin_heap(true, false, work_mem);
    1055            0 :             rsinfo->setResult = tupstore;
    1056            0 :             rsinfo->setDesc = tupdesc;
    1057            0 :             MemoryContextSwitchTo(oldcontext);
    1058              : 
    1059            0 :             values[0] = PQcmdStatus(res);
    1060              : 
    1061              :             /* build the tuple and put it into the tuplestore. */
    1062            0 :             tuple = BuildTupleFromCStrings(attinmeta, values);
    1063            0 :             tuplestore_puttuple(tupstore, tuple);
    1064              : 
    1065            0 :             PQclear(res);
    1066              :         }
    1067              :         else
    1068              :         {
    1069              :             Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
    1070              :             /* storeRow should have created a tuplestore */
    1071              :             Assert(rsinfo->setResult != NULL);
    1072              : 
    1073           21 :             PQclear(res);
    1074              :         }
    1075              : 
    1076              :         /* clean up data conversion short-lived memory context */
    1077           23 :         if (sinfo.tmpcontext != NULL)
    1078           23 :             MemoryContextDelete(sinfo.tmpcontext);
    1079              : 
    1080           23 :         PQclear(sinfo.last_res);
    1081           23 :         PQclear(sinfo.cur_res);
    1082              :     }
    1083            0 :     PG_CATCH();
    1084              :     {
    1085              :         PGresult   *res;
    1086              : 
    1087              :         /* be sure to clear out any pending data in libpq */
    1088            0 :         while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
    1089              :                NULL)
    1090            0 :             PQclear(res);
    1091            0 :         PG_RE_THROW();
    1092              :     }
    1093           23 :     PG_END_TRY();
    1094           23 : }
    1095              : 
    1096              : /*
    1097              :  * Execute query, and send any result rows to sinfo->tuplestore.
    1098              :  */
    1099              : static PGresult *
    1100           23 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
    1101              : {
    1102           23 :     bool        first = true;
    1103           23 :     int         nestlevel = -1;
    1104              :     PGresult   *res;
    1105              : 
    1106           23 :     if (!PQsendQuery(conn, sql))
    1107            0 :         elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
    1108              : 
    1109           23 :     if (!PQsetSingleRowMode(conn))  /* shouldn't fail */
    1110            0 :         elog(ERROR, "failed to set single-row mode for dblink query");
    1111              : 
    1112              :     for (;;)
    1113              :     {
    1114          198 :         CHECK_FOR_INTERRUPTS();
    1115              : 
    1116          198 :         sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
    1117          198 :         if (!sinfo->cur_res)
    1118           23 :             break;
    1119              : 
    1120          175 :         if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
    1121              :         {
    1122              :             /* got one row from possibly-bigger resultset */
    1123              : 
    1124              :             /*
    1125              :              * Set GUCs to ensure we read GUC-sensitive data types correctly.
    1126              :              * We shouldn't do this until we have a row in hand, to ensure
    1127              :              * libpq has seen any earlier ParameterStatus protocol messages.
    1128              :              */
    1129          152 :             if (first && nestlevel < 0)
    1130           21 :                 nestlevel = applyRemoteGucs(conn);
    1131              : 
    1132          152 :             storeRow(sinfo, sinfo->cur_res, first);
    1133              : 
    1134          152 :             PQclear(sinfo->cur_res);
    1135          152 :             sinfo->cur_res = NULL;
    1136          152 :             first = false;
    1137              :         }
    1138              :         else
    1139              :         {
    1140              :             /* if empty resultset, fill tuplestore header */
    1141           23 :             if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
    1142            0 :                 storeRow(sinfo, sinfo->cur_res, first);
    1143              : 
    1144              :             /* store completed result at last_res */
    1145           23 :             PQclear(sinfo->last_res);
    1146           23 :             sinfo->last_res = sinfo->cur_res;
    1147           23 :             sinfo->cur_res = NULL;
    1148           23 :             first = true;
    1149              :         }
    1150              :     }
    1151              : 
    1152              :     /* clean up GUC settings, if we changed any */
    1153           23 :     restoreLocalGucs(nestlevel);
    1154              : 
    1155              :     /* return last_res */
    1156           23 :     res = sinfo->last_res;
    1157           23 :     sinfo->last_res = NULL;
    1158           23 :     return res;
    1159              : }
    1160              : 
    1161              : /*
    1162              :  * Send single row to sinfo->tuplestore.
    1163              :  *
    1164              :  * If "first" is true, create the tuplestore using PGresult's metadata
    1165              :  * (in this case the PGresult might contain either zero or one row).
    1166              :  */
    1167              : static void
    1168          152 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
    1169              : {
    1170          152 :     int         nfields = PQnfields(res);
    1171              :     HeapTuple   tuple;
    1172              :     int         i;
    1173              :     MemoryContext oldcontext;
    1174              : 
    1175          152 :     if (first)
    1176              :     {
    1177              :         /* Prepare for new result set */
    1178           21 :         ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
    1179              :         TupleDesc   tupdesc;
    1180              : 
    1181              :         /*
    1182              :          * It's possible to get more than one result set if the query string
    1183              :          * contained multiple SQL commands.  In that case, we follow PQexec's
    1184              :          * traditional behavior of throwing away all but the last result.
    1185              :          */
    1186           21 :         if (sinfo->tuplestore)
    1187            0 :             tuplestore_end(sinfo->tuplestore);
    1188           21 :         sinfo->tuplestore = NULL;
    1189              : 
    1190              :         /* get a tuple descriptor for our result type */
    1191           21 :         switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
    1192              :         {
    1193           21 :             case TYPEFUNC_COMPOSITE:
    1194              :                 /* success */
    1195           21 :                 break;
    1196            0 :             case TYPEFUNC_RECORD:
    1197              :                 /* failed to determine actual type of RECORD */
    1198            0 :                 ereport(ERROR,
    1199              :                         (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1200              :                          errmsg("function returning record called in context "
    1201              :                                 "that cannot accept type record")));
    1202              :                 break;
    1203            0 :             default:
    1204              :                 /* result type isn't composite */
    1205            0 :                 elog(ERROR, "return type must be a row type");
    1206              :                 break;
    1207              :         }
    1208              : 
    1209              :         /* make sure we have a persistent copy of the tupdesc */
    1210           21 :         tupdesc = CreateTupleDescCopy(tupdesc);
    1211              : 
    1212              :         /* check result and tuple descriptor have the same number of columns */
    1213           21 :         if (nfields != tupdesc->natts)
    1214            0 :             ereport(ERROR,
    1215              :                     (errcode(ERRCODE_DATATYPE_MISMATCH),
    1216              :                      errmsg("remote query result rowtype does not match "
    1217              :                             "the specified FROM clause rowtype")));
    1218              : 
    1219              :         /* Prepare attinmeta for later data conversions */
    1220           21 :         sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1221              : 
    1222              :         /* Create a new, empty tuplestore */
    1223           21 :         oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
    1224           21 :         sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
    1225           21 :         rsinfo->setResult = sinfo->tuplestore;
    1226           21 :         rsinfo->setDesc = tupdesc;
    1227           21 :         MemoryContextSwitchTo(oldcontext);
    1228              : 
    1229              :         /* Done if empty resultset */
    1230           21 :         if (PQntuples(res) == 0)
    1231            0 :             return;
    1232              : 
    1233              :         /*
    1234              :          * Set up sufficiently-wide string pointers array; this won't change
    1235              :          * in size so it's easy to preallocate.
    1236              :          */
    1237           21 :         if (sinfo->cstrs)
    1238            0 :             pfree(sinfo->cstrs);
    1239           21 :         sinfo->cstrs = palloc_array(char *, nfields);
    1240              :     }
    1241              : 
    1242              :     /* Should have a single-row result if we get here */
    1243              :     Assert(PQntuples(res) == 1);
    1244              : 
    1245              :     /*
    1246              :      * Do the following work in a temp context that we reset after each tuple.
    1247              :      * This cleans up not only the data we have direct access to, but any
    1248              :      * cruft the I/O functions might leak.
    1249              :      */
    1250          152 :     oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
    1251              : 
    1252              :     /*
    1253              :      * Fill cstrs with null-terminated strings of column values.
    1254              :      */
    1255          570 :     for (i = 0; i < nfields; i++)
    1256              :     {
    1257          418 :         if (PQgetisnull(res, 0, i))
    1258            0 :             sinfo->cstrs[i] = NULL;
    1259              :         else
    1260          418 :             sinfo->cstrs[i] = PQgetvalue(res, 0, i);
    1261              :     }
    1262              : 
    1263              :     /* Convert row to a tuple, and add it to the tuplestore */
    1264          152 :     tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
    1265              : 
    1266          152 :     tuplestore_puttuple(sinfo->tuplestore, tuple);
    1267              : 
    1268              :     /* Clean up */
    1269          152 :     MemoryContextSwitchTo(oldcontext);
    1270          152 :     MemoryContextReset(sinfo->tmpcontext);
    1271              : }
    1272              : 
    1273              : /*
    1274              :  * List all open dblink connections by name.
    1275              :  * Returns an array of all connection names.
    1276              :  * Takes no params
    1277              :  */
    1278            3 : PG_FUNCTION_INFO_V1(dblink_get_connections);
    1279              : Datum
    1280            1 : dblink_get_connections(PG_FUNCTION_ARGS)
    1281              : {
    1282              :     HASH_SEQ_STATUS status;
    1283              :     remoteConnHashEnt *hentry;
    1284            1 :     ArrayBuildState *astate = NULL;
    1285              : 
    1286            1 :     if (remoteConnHash)
    1287              :     {
    1288            1 :         hash_seq_init(&status, remoteConnHash);
    1289            4 :         while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
    1290              :         {
    1291              :             /* ignore it if it's not an open connection */
    1292            3 :             if (hentry->rconn.conn == NULL)
    1293            0 :                 continue;
    1294              :             /* stash away current value */
    1295            3 :             astate = accumArrayResult(astate,
    1296            3 :                                       CStringGetTextDatum(hentry->name),
    1297              :                                       false, TEXTOID, CurrentMemoryContext);
    1298              :         }
    1299              :     }
    1300              : 
    1301            1 :     if (astate)
    1302            1 :         PG_RETURN_DATUM(makeArrayResult(astate,
    1303              :                                         CurrentMemoryContext));
    1304              :     else
    1305            0 :         PG_RETURN_NULL();
    1306              : }
    1307              : 
    1308              : /*
    1309              :  * Checks if a given remote connection is busy
    1310              :  *
    1311              :  * Returns 1 if the connection is busy, 0 otherwise
    1312              :  * Params:
    1313              :  *  text connection_name - name of the connection to check
    1314              :  *
    1315              :  */
    1316            3 : PG_FUNCTION_INFO_V1(dblink_is_busy);
    1317              : Datum
    1318            1 : dblink_is_busy(PG_FUNCTION_ARGS)
    1319              : {
    1320              :     PGconn     *conn;
    1321              : 
    1322            1 :     dblink_init();
    1323            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1324              : 
    1325            1 :     PQconsumeInput(conn);
    1326            1 :     PG_RETURN_INT32(PQisBusy(conn));
    1327              : }
    1328              : 
    1329              : /*
    1330              :  * Cancels a running request on a connection
    1331              :  *
    1332              :  * Returns text:
    1333              :  *  "OK" if the cancel request has been sent correctly,
    1334              :  *      an error message otherwise
    1335              :  *
    1336              :  * Params:
    1337              :  *  text connection_name - name of the connection to check
    1338              :  *
    1339              :  */
    1340            3 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
    1341              : Datum
    1342            1 : dblink_cancel_query(PG_FUNCTION_ARGS)
    1343              : {
    1344              :     PGconn     *conn;
    1345              :     const char *msg;
    1346              :     TimestampTz endtime;
    1347              : 
    1348            1 :     dblink_init();
    1349            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1350            1 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1351              :                                           30000);
    1352            1 :     msg = libpqsrv_cancel(conn, endtime);
    1353            1 :     if (msg == NULL)
    1354            1 :         msg = "OK";
    1355              : 
    1356            1 :     PG_RETURN_TEXT_P(cstring_to_text(msg));
    1357              : }
    1358              : 
    1359              : 
    1360              : /*
    1361              :  * Get error message from a connection
    1362              :  *
    1363              :  * Returns text:
    1364              :  *  "OK" if no error, an error message otherwise
    1365              :  *
    1366              :  * Params:
    1367              :  *  text connection_name - name of the connection to check
    1368              :  *
    1369              :  */
    1370            3 : PG_FUNCTION_INFO_V1(dblink_error_message);
    1371              : Datum
    1372            1 : dblink_error_message(PG_FUNCTION_ARGS)
    1373              : {
    1374              :     char       *msg;
    1375              :     PGconn     *conn;
    1376              : 
    1377            1 :     dblink_init();
    1378            1 :     conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1379              : 
    1380            1 :     msg = PQerrorMessage(conn);
    1381            1 :     if (msg == NULL || msg[0] == '\0')
    1382            1 :         PG_RETURN_TEXT_P(cstring_to_text("OK"));
    1383              :     else
    1384            0 :         PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
    1385              : }
    1386              : 
    1387              : /*
    1388              :  * Execute an SQL non-SELECT command
    1389              :  */
    1390           13 : PG_FUNCTION_INFO_V1(dblink_exec);
    1391              : Datum
    1392           26 : dblink_exec(PG_FUNCTION_ARGS)
    1393              : {
    1394           26 :     text       *volatile sql_cmd_status = NULL;
    1395           26 :     PGconn     *volatile conn = NULL;
    1396           26 :     volatile bool freeconn = false;
    1397              : 
    1398           26 :     dblink_init();
    1399              : 
    1400           26 :     PG_TRY();
    1401              :     {
    1402           26 :         PGresult   *res = NULL;
    1403           26 :         char       *sql = NULL;
    1404           26 :         char       *conname = NULL;
    1405           26 :         bool        fail = true;    /* default to backward compatible behavior */
    1406              : 
    1407           26 :         if (PG_NARGS() == 3)
    1408              :         {
    1409              :             /* must be text,text,bool */
    1410            0 :             conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1411            0 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1412            0 :             fail = PG_GETARG_BOOL(2);
    1413            0 :             dblink_get_conn(conname, &conn, &conname, &freeconn);
    1414              :         }
    1415           26 :         else if (PG_NARGS() == 2)
    1416              :         {
    1417              :             /* might be text,text or text,bool */
    1418           17 :             if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
    1419              :             {
    1420            1 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1421            1 :                 fail = PG_GETARG_BOOL(1);
    1422            1 :                 conn = pconn->conn;
    1423              :             }
    1424              :             else
    1425              :             {
    1426           16 :                 conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1427           16 :                 sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
    1428           16 :                 dblink_get_conn(conname, &conn, &conname, &freeconn);
    1429              :             }
    1430              :         }
    1431            9 :         else if (PG_NARGS() == 1)
    1432              :         {
    1433              :             /* must be single text argument */
    1434            9 :             conn = pconn->conn;
    1435            9 :             sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
    1436              :         }
    1437              :         else
    1438              :             /* shouldn't happen */
    1439            0 :             elog(ERROR, "wrong number of arguments");
    1440              : 
    1441           26 :         if (!conn)
    1442            0 :             dblink_conn_not_avail(conname);
    1443              : 
    1444           26 :         res = libpqsrv_exec(conn, sql, dblink_we_get_result);
    1445           26 :         if (!res ||
    1446           26 :             (PQresultStatus(res) != PGRES_COMMAND_OK &&
    1447            2 :              PQresultStatus(res) != PGRES_TUPLES_OK))
    1448              :         {
    1449            2 :             dblink_res_error(conn, conname, res, fail,
    1450              :                              "while executing command");
    1451              : 
    1452              :             /*
    1453              :              * and save a copy of the command status string to return as our
    1454              :              * result tuple
    1455              :              */
    1456            1 :             sql_cmd_status = cstring_to_text("ERROR");
    1457              :         }
    1458           24 :         else if (PQresultStatus(res) == PGRES_COMMAND_OK)
    1459              :         {
    1460              :             /*
    1461              :              * and save a copy of the command status string to return as our
    1462              :              * result tuple
    1463              :              */
    1464           24 :             sql_cmd_status = cstring_to_text(PQcmdStatus(res));
    1465           24 :             PQclear(res);
    1466              :         }
    1467              :         else
    1468              :         {
    1469            0 :             PQclear(res);
    1470            0 :             ereport(ERROR,
    1471              :                     (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    1472              :                      errmsg("statement returning results not allowed")));
    1473              :         }
    1474              :     }
    1475            1 :     PG_FINALLY();
    1476              :     {
    1477              :         /* if needed, close the connection to the database */
    1478           26 :         if (freeconn)
    1479            1 :             libpqsrv_disconnect(conn);
    1480              :     }
    1481           26 :     PG_END_TRY();
    1482              : 
    1483           25 :     PG_RETURN_TEXT_P(sql_cmd_status);
    1484              : }
    1485              : 
    1486              : 
    1487              : /*
    1488              :  * dblink_get_pkey
    1489              :  *
    1490              :  * Return list of primary key fields for the supplied relation,
    1491              :  * or NULL if none exists.
    1492              :  */
    1493            3 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
    1494              : Datum
    1495            9 : dblink_get_pkey(PG_FUNCTION_ARGS)
    1496              : {
    1497              :     int16       indnkeyatts;
    1498              :     char      **results;
    1499              :     FuncCallContext *funcctx;
    1500              :     int32       call_cntr;
    1501              :     int32       max_calls;
    1502              :     AttInMetadata *attinmeta;
    1503              :     MemoryContext oldcontext;
    1504              : 
    1505              :     /* stuff done only on the first call of the function */
    1506            9 :     if (SRF_IS_FIRSTCALL())
    1507              :     {
    1508              :         Relation    rel;
    1509              :         TupleDesc   tupdesc;
    1510              : 
    1511              :         /* create a function context for cross-call persistence */
    1512            3 :         funcctx = SRF_FIRSTCALL_INIT();
    1513              : 
    1514              :         /*
    1515              :          * switch to memory context appropriate for multiple function calls
    1516              :          */
    1517            3 :         oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
    1518              : 
    1519              :         /* open target relation */
    1520            3 :         rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
    1521              : 
    1522              :         /* get the array of attnums */
    1523            3 :         results = get_pkey_attnames(rel, &indnkeyatts);
    1524              : 
    1525            3 :         relation_close(rel, AccessShareLock);
    1526              : 
    1527              :         /*
    1528              :          * need a tuple descriptor representing one INT and one TEXT column
    1529              :          */
    1530            3 :         tupdesc = CreateTemplateTupleDesc(2);
    1531            3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
    1532              :                            INT4OID, -1, 0);
    1533            3 :         TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
    1534              :                            TEXTOID, -1, 0);
    1535              : 
    1536            3 :         TupleDescFinalize(tupdesc);
    1537              : 
    1538              :         /*
    1539              :          * Generate attribute metadata needed later to produce tuples from raw
    1540              :          * C strings
    1541              :          */
    1542            3 :         attinmeta = TupleDescGetAttInMetadata(tupdesc);
    1543            3 :         funcctx->attinmeta = attinmeta;
    1544              : 
    1545            3 :         if ((results != NULL) && (indnkeyatts > 0))
    1546              :         {
    1547            3 :             funcctx->max_calls = indnkeyatts;
    1548              : 
    1549              :             /* got results, keep track of them */
    1550            3 :             funcctx->user_fctx = results;
    1551              :         }
    1552              :         else
    1553              :         {
    1554              :             /* fast track when no results */
    1555            0 :             MemoryContextSwitchTo(oldcontext);
    1556            0 :             SRF_RETURN_DONE(funcctx);
    1557              :         }
    1558              : 
    1559            3 :         MemoryContextSwitchTo(oldcontext);
    1560              :     }
    1561              : 
    1562              :     /* stuff done on every call of the function */
    1563            9 :     funcctx = SRF_PERCALL_SETUP();
    1564              : 
    1565              :     /*
    1566              :      * initialize per-call variables
    1567              :      */
    1568            9 :     call_cntr = funcctx->call_cntr;
    1569            9 :     max_calls = funcctx->max_calls;
    1570              : 
    1571            9 :     results = (char **) funcctx->user_fctx;
    1572            9 :     attinmeta = funcctx->attinmeta;
    1573              : 
    1574            9 :     if (call_cntr < max_calls)   /* do when there is more left to send */
    1575              :     {
    1576              :         char      **values;
    1577              :         HeapTuple   tuple;
    1578              :         Datum       result;
    1579              : 
    1580            6 :         values = palloc_array(char *, 2);
    1581            6 :         values[0] = psprintf("%d", call_cntr + 1);
    1582            6 :         values[1] = results[call_cntr];
    1583              : 
    1584              :         /* build the tuple */
    1585            6 :         tuple = BuildTupleFromCStrings(attinmeta, values);
    1586              : 
    1587              :         /* make the tuple into a datum */
    1588            6 :         result = HeapTupleGetDatum(tuple);
    1589              : 
    1590            6 :         SRF_RETURN_NEXT(funcctx, result);
    1591              :     }
    1592              :     else
    1593              :     {
    1594              :         /* do when there is no more left */
    1595            3 :         SRF_RETURN_DONE(funcctx);
    1596              :     }
    1597              : }
    1598              : 
    1599              : 
    1600              : /*
    1601              :  * dblink_build_sql_insert
    1602              :  *
    1603              :  * Used to generate an SQL insert statement
    1604              :  * based on an existing tuple in a local relation.
    1605              :  * This is useful for selectively replicating data
    1606              :  * to another server via dblink.
    1607              :  *
    1608              :  * API:
    1609              :  * <relname> - name of local table of interest
    1610              :  * <pkattnums> - an int2vector of attnums which will be used
    1611              :  * to identify the local tuple of interest
    1612              :  * <pknumatts> - number of attnums in pkattnums
    1613              :  * <src_pkattvals_arry> - text array of key values which will be used
    1614              :  * to identify the local tuple of interest
    1615              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1616              :  * to build the string for execution remotely. These are substituted
    1617              :  * for their counterparts in src_pkattvals_arry
    1618              :  */
    1619            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
    1620              : Datum
    1621            6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
    1622              : {
    1623            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1624            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1625            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1626            6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1627            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1628              :     Relation    rel;
    1629              :     int        *pkattnums;
    1630              :     int         pknumatts;
    1631              :     char      **src_pkattvals;
    1632              :     char      **tgt_pkattvals;
    1633              :     int         src_nitems;
    1634              :     int         tgt_nitems;
    1635              :     char       *sql;
    1636              : 
    1637              :     /*
    1638              :      * Open target relation.
    1639              :      */
    1640            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1641              : 
    1642              :     /*
    1643              :      * Process pkattnums argument.
    1644              :      */
    1645            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1646              :                        &pkattnums, &pknumatts);
    1647              : 
    1648              :     /*
    1649              :      * Source array is made up of key values that will be used to locate the
    1650              :      * tuple of interest from the local system.
    1651              :      */
    1652            4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1653              : 
    1654              :     /*
    1655              :      * There should be one source array key value for each key attnum
    1656              :      */
    1657            4 :     if (src_nitems != pknumatts)
    1658            0 :         ereport(ERROR,
    1659              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1660              :                  errmsg("source key array length must match number of key attributes")));
    1661              : 
    1662              :     /*
    1663              :      * Target array is made up of key values that will be used to build the
    1664              :      * SQL string for use on the remote system.
    1665              :      */
    1666            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1667              : 
    1668              :     /*
    1669              :      * There should be one target array key value for each key attnum
    1670              :      */
    1671            4 :     if (tgt_nitems != pknumatts)
    1672            0 :         ereport(ERROR,
    1673              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1674              :                  errmsg("target key array length must match number of key attributes")));
    1675              : 
    1676              :     /*
    1677              :      * Prep work is finally done. Go get the SQL string.
    1678              :      */
    1679            4 :     sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1680              : 
    1681              :     /*
    1682              :      * Now we can close the relation.
    1683              :      */
    1684            4 :     relation_close(rel, AccessShareLock);
    1685              : 
    1686              :     /*
    1687              :      * And send it
    1688              :      */
    1689            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1690              : }
    1691              : 
    1692              : 
    1693              : /*
    1694              :  * dblink_build_sql_delete
    1695              :  *
    1696              :  * Used to generate an SQL delete statement.
    1697              :  * This is useful for selectively replicating a
    1698              :  * delete to another server via dblink.
    1699              :  *
    1700              :  * API:
    1701              :  * <relname> - name of remote table of interest
    1702              :  * <pkattnums> - an int2vector of attnums which will be used
    1703              :  * to identify the remote tuple of interest
    1704              :  * <pknumatts> - number of attnums in pkattnums
    1705              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1706              :  * to build the string for execution remotely.
    1707              :  */
    1708            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
    1709              : Datum
    1710            6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
    1711              : {
    1712            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1713            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1714            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1715            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1716              :     Relation    rel;
    1717              :     int        *pkattnums;
    1718              :     int         pknumatts;
    1719              :     char      **tgt_pkattvals;
    1720              :     int         tgt_nitems;
    1721              :     char       *sql;
    1722              : 
    1723              :     /*
    1724              :      * Open target relation.
    1725              :      */
    1726            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1727              : 
    1728              :     /*
    1729              :      * Process pkattnums argument.
    1730              :      */
    1731            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1732              :                        &pkattnums, &pknumatts);
    1733              : 
    1734              :     /*
    1735              :      * Target array is made up of key values that will be used to build the
    1736              :      * SQL string for use on the remote system.
    1737              :      */
    1738            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1739              : 
    1740              :     /*
    1741              :      * There should be one target array key value for each key attnum
    1742              :      */
    1743            4 :     if (tgt_nitems != pknumatts)
    1744            0 :         ereport(ERROR,
    1745              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1746              :                  errmsg("target key array length must match number of key attributes")));
    1747              : 
    1748              :     /*
    1749              :      * Prep work is finally done. Go get the SQL string.
    1750              :      */
    1751            4 :     sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
    1752              : 
    1753              :     /*
    1754              :      * Now we can close the relation.
    1755              :      */
    1756            4 :     relation_close(rel, AccessShareLock);
    1757              : 
    1758              :     /*
    1759              :      * And send it
    1760              :      */
    1761            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1762              : }
    1763              : 
    1764              : 
    1765              : /*
    1766              :  * dblink_build_sql_update
    1767              :  *
    1768              :  * Used to generate an SQL update statement
    1769              :  * based on an existing tuple in a local relation.
    1770              :  * This is useful for selectively replicating data
    1771              :  * to another server via dblink.
    1772              :  *
    1773              :  * API:
    1774              :  * <relname> - name of local table of interest
    1775              :  * <pkattnums> - an int2vector of attnums which will be used
    1776              :  * to identify the local tuple of interest
    1777              :  * <pknumatts> - number of attnums in pkattnums
    1778              :  * <src_pkattvals_arry> - text array of key values which will be used
    1779              :  * to identify the local tuple of interest
    1780              :  * <tgt_pkattvals_arry> - text array of key values which will be used
    1781              :  * to build the string for execution remotely. These are substituted
    1782              :  * for their counterparts in src_pkattvals_arry
    1783              :  */
    1784            4 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
    1785              : Datum
    1786            6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
    1787              : {
    1788            6 :     text       *relname_text = PG_GETARG_TEXT_PP(0);
    1789            6 :     int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
    1790            6 :     int32       pknumatts_arg = PG_GETARG_INT32(2);
    1791            6 :     ArrayType  *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
    1792            6 :     ArrayType  *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
    1793              :     Relation    rel;
    1794              :     int        *pkattnums;
    1795              :     int         pknumatts;
    1796              :     char      **src_pkattvals;
    1797              :     char      **tgt_pkattvals;
    1798              :     int         src_nitems;
    1799              :     int         tgt_nitems;
    1800              :     char       *sql;
    1801              : 
    1802              :     /*
    1803              :      * Open target relation.
    1804              :      */
    1805            6 :     rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
    1806              : 
    1807              :     /*
    1808              :      * Process pkattnums argument.
    1809              :      */
    1810            6 :     validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
    1811              :                        &pkattnums, &pknumatts);
    1812              : 
    1813              :     /*
    1814              :      * Source array is made up of key values that will be used to locate the
    1815              :      * tuple of interest from the local system.
    1816              :      */
    1817            4 :     src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
    1818              : 
    1819              :     /*
    1820              :      * There should be one source array key value for each key attnum
    1821              :      */
    1822            4 :     if (src_nitems != pknumatts)
    1823            0 :         ereport(ERROR,
    1824              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1825              :                  errmsg("source key array length must match number of key attributes")));
    1826              : 
    1827              :     /*
    1828              :      * Target array is made up of key values that will be used to build the
    1829              :      * SQL string for use on the remote system.
    1830              :      */
    1831            4 :     tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
    1832              : 
    1833              :     /*
    1834              :      * There should be one target array key value for each key attnum
    1835              :      */
    1836            4 :     if (tgt_nitems != pknumatts)
    1837            0 :         ereport(ERROR,
    1838              :                 (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
    1839              :                  errmsg("target key array length must match number of key attributes")));
    1840              : 
    1841              :     /*
    1842              :      * Prep work is finally done. Go get the SQL string.
    1843              :      */
    1844            4 :     sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
    1845              : 
    1846              :     /*
    1847              :      * Now we can close the relation.
    1848              :      */
    1849            4 :     relation_close(rel, AccessShareLock);
    1850              : 
    1851              :     /*
    1852              :      * And send it
    1853              :      */
    1854            4 :     PG_RETURN_TEXT_P(cstring_to_text(sql));
    1855              : }
    1856              : 
    1857              : /*
    1858              :  * dblink_current_query
    1859              :  * return the current query string
    1860              :  * to allow its use in (among other things)
    1861              :  * rewrite rules
    1862              :  */
    1863            2 : PG_FUNCTION_INFO_V1(dblink_current_query);
    1864              : Datum
    1865            0 : dblink_current_query(PG_FUNCTION_ARGS)
    1866              : {
    1867              :     /* This is now just an alias for the built-in function current_query() */
    1868            0 :     PG_RETURN_DATUM(current_query(fcinfo));
    1869              : }
    1870              : 
    1871              : /*
    1872              :  * Retrieve async notifications for a connection.
    1873              :  *
    1874              :  * Returns a setof record of notifications, or an empty set if none received.
    1875              :  * Can optionally take a named connection as parameter, but uses the unnamed
    1876              :  * connection per default.
    1877              :  *
    1878              :  */
    1879              : #define DBLINK_NOTIFY_COLS      3
    1880              : 
    1881            5 : PG_FUNCTION_INFO_V1(dblink_get_notify);
    1882              : Datum
    1883            2 : dblink_get_notify(PG_FUNCTION_ARGS)
    1884              : {
    1885              :     PGconn     *conn;
    1886              :     PGnotify   *notify;
    1887            2 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1888              : 
    1889            2 :     dblink_init();
    1890            2 :     if (PG_NARGS() == 1)
    1891            0 :         conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
    1892              :     else
    1893            2 :         conn = pconn->conn;
    1894              : 
    1895            2 :     InitMaterializedSRF(fcinfo, 0);
    1896              : 
    1897            2 :     PQconsumeInput(conn);
    1898            4 :     while ((notify = PQnotifies(conn)) != NULL)
    1899              :     {
    1900              :         Datum       values[DBLINK_NOTIFY_COLS];
    1901              :         bool        nulls[DBLINK_NOTIFY_COLS];
    1902              : 
    1903            2 :         memset(values, 0, sizeof(values));
    1904            2 :         memset(nulls, 0, sizeof(nulls));
    1905              : 
    1906            2 :         if (notify->relname != NULL)
    1907            2 :             values[0] = CStringGetTextDatum(notify->relname);
    1908              :         else
    1909            0 :             nulls[0] = true;
    1910              : 
    1911            2 :         values[1] = Int32GetDatum(notify->be_pid);
    1912              : 
    1913            2 :         if (notify->extra != NULL)
    1914            2 :             values[2] = CStringGetTextDatum(notify->extra);
    1915              :         else
    1916            0 :             nulls[2] = true;
    1917              : 
    1918            2 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    1919              : 
    1920            2 :         PQfreemem(notify);
    1921            2 :         PQconsumeInput(conn);
    1922              :     }
    1923              : 
    1924            2 :     return (Datum) 0;
    1925              : }
    1926              : 
    1927              : /*
    1928              :  * Validate the options given to a dblink foreign server or user mapping.
    1929              :  * Raise an error if any option is invalid.
    1930              :  *
    1931              :  * We just check the names of options here, so semantic errors in options,
    1932              :  * such as invalid numeric format, will be detected at the attempt to connect.
    1933              :  */
    1934           14 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
    1935              : Datum
    1936           19 : dblink_fdw_validator(PG_FUNCTION_ARGS)
    1937              : {
    1938           19 :     List       *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
    1939           19 :     Oid         context = PG_GETARG_OID(1);
    1940              :     ListCell   *cell;
    1941              : 
    1942              :     static const PQconninfoOption *options = NULL;
    1943              : 
    1944              :     /*
    1945              :      * Get list of valid libpq options.
    1946              :      *
    1947              :      * To avoid unnecessary work, we get the list once and use it throughout
    1948              :      * the lifetime of this backend process.  We don't need to care about
    1949              :      * memory context issues, because PQconndefaults allocates with malloc.
    1950              :      */
    1951           19 :     if (!options)
    1952              :     {
    1953           12 :         options = PQconndefaults();
    1954           12 :         if (!options)           /* assume reason for failure is OOM */
    1955            0 :             ereport(ERROR,
    1956              :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    1957              :                      errmsg("out of memory"),
    1958              :                      errdetail("Could not get libpq's default connection options.")));
    1959              :     }
    1960              : 
    1961              :     /* Validate each supplied option. */
    1962           50 :     foreach(cell, options_list)
    1963              :     {
    1964           39 :         DefElem    *def = (DefElem *) lfirst(cell);
    1965              : 
    1966           39 :         if (!is_valid_dblink_fdw_option(options, def->defname, context))
    1967              :         {
    1968              :             /*
    1969              :              * Unknown option, or invalid option for the context specified, so
    1970              :              * complain about it.  Provide a hint with a valid option that
    1971              :              * looks similar, if there is one.
    1972              :              */
    1973              :             const PQconninfoOption *opt;
    1974              :             const char *closest_match;
    1975              :             ClosestMatchState match_state;
    1976            8 :             bool        has_valid_options = false;
    1977              : 
    1978            8 :             initClosestMatch(&match_state, def->defname, 4);
    1979          424 :             for (opt = options; opt->keyword; opt++)
    1980              :             {
    1981          416 :                 if (is_valid_dblink_option(options, opt->keyword, context))
    1982              :                 {
    1983           93 :                     has_valid_options = true;
    1984           93 :                     updateClosestMatch(&match_state, opt->keyword);
    1985              :                 }
    1986              :             }
    1987              : 
    1988            8 :             closest_match = getClosestMatch(&match_state);
    1989            8 :             ereport(ERROR,
    1990              :                     (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
    1991              :                      errmsg("invalid option \"%s\"", def->defname),
    1992              :                      has_valid_options ? closest_match ?
    1993              :                      errhint("Perhaps you meant the option \"%s\".",
    1994              :                              closest_match) : 0 :
    1995              :                      errhint("There are no valid options in this context.")));
    1996              :         }
    1997              :     }
    1998              : 
    1999           11 :     PG_RETURN_VOID();
    2000              : }
    2001              : 
    2002              : 
    2003              : /*************************************************************
    2004              :  * internal functions
    2005              :  */
    2006              : 
    2007              : 
    2008              : /*
    2009              :  * get_pkey_attnames
    2010              :  *
    2011              :  * Get the primary key attnames for the given relation.
    2012              :  * Return NULL, and set indnkeyatts = 0, if no primary key exists.
    2013              :  */
    2014              : static char **
    2015            3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
    2016              : {
    2017              :     Relation    indexRelation;
    2018              :     ScanKeyData skey;
    2019              :     SysScanDesc scan;
    2020              :     HeapTuple   indexTuple;
    2021              :     int         i;
    2022            3 :     char      **result = NULL;
    2023              :     TupleDesc   tupdesc;
    2024              : 
    2025              :     /* initialize indnkeyatts to 0 in case no primary key exists */
    2026            3 :     *indnkeyatts = 0;
    2027              : 
    2028            3 :     tupdesc = rel->rd_att;
    2029              : 
    2030              :     /* Prepare to scan pg_index for entries having indrelid = this rel. */
    2031            3 :     indexRelation = table_open(IndexRelationId, AccessShareLock);
    2032            3 :     ScanKeyInit(&skey,
    2033              :                 Anum_pg_index_indrelid,
    2034              :                 BTEqualStrategyNumber, F_OIDEQ,
    2035              :                 ObjectIdGetDatum(RelationGetRelid(rel)));
    2036              : 
    2037            3 :     scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
    2038              :                               NULL, 1, &skey);
    2039              : 
    2040            3 :     while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
    2041              :     {
    2042            3 :         Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
    2043              : 
    2044              :         /* we're only interested if it is the primary key */
    2045            3 :         if (index->indisprimary)
    2046              :         {
    2047            3 :             *indnkeyatts = index->indnkeyatts;
    2048            3 :             if (*indnkeyatts > 0)
    2049              :             {
    2050            3 :                 result = palloc_array(char *, *indnkeyatts);
    2051              : 
    2052            9 :                 for (i = 0; i < *indnkeyatts; i++)
    2053            6 :                     result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
    2054              :             }
    2055            3 :             break;
    2056              :         }
    2057              :     }
    2058              : 
    2059            3 :     systable_endscan(scan);
    2060            3 :     table_close(indexRelation, AccessShareLock);
    2061              : 
    2062            3 :     return result;
    2063              : }
    2064              : 
    2065              : /*
    2066              :  * Deconstruct a text[] into C-strings (note any NULL elements will be
    2067              :  * returned as NULL pointers)
    2068              :  */
    2069              : static char **
    2070           20 : get_text_array_contents(ArrayType *array, int *numitems)
    2071              : {
    2072           20 :     int         ndim = ARR_NDIM(array);
    2073           20 :     int        *dims = ARR_DIMS(array);
    2074              :     int         nitems;
    2075              :     int16       typlen;
    2076              :     bool        typbyval;
    2077              :     char        typalign;
    2078              :     uint8       typalignby;
    2079              :     char      **values;
    2080              :     char       *ptr;
    2081              :     uint8      *bitmap;
    2082              :     int         bitmask;
    2083              :     int         i;
    2084              : 
    2085              :     Assert(ARR_ELEMTYPE(array) == TEXTOID);
    2086              : 
    2087           20 :     *numitems = nitems = ArrayGetNItems(ndim, dims);
    2088              : 
    2089           20 :     get_typlenbyvalalign(ARR_ELEMTYPE(array),
    2090              :                          &typlen, &typbyval, &typalign);
    2091           20 :     typalignby = typalign_to_alignby(typalign);
    2092              : 
    2093           20 :     values = palloc_array(char *, nitems);
    2094              : 
    2095           20 :     ptr = ARR_DATA_PTR(array);
    2096           20 :     bitmap = ARR_NULLBITMAP(array);
    2097           20 :     bitmask = 1;
    2098              : 
    2099           55 :     for (i = 0; i < nitems; i++)
    2100              :     {
    2101           35 :         if (bitmap && (*bitmap & bitmask) == 0)
    2102              :         {
    2103            0 :             values[i] = NULL;
    2104              :         }
    2105              :         else
    2106              :         {
    2107           35 :             values[i] = TextDatumGetCString(PointerGetDatum(ptr));
    2108           35 :             ptr = att_addlength_pointer(ptr, typlen, ptr);
    2109           35 :             ptr = (char *) att_nominal_alignby(ptr, typalignby);
    2110              :         }
    2111              : 
    2112              :         /* advance bitmap pointer if any */
    2113           35 :         if (bitmap)
    2114              :         {
    2115            0 :             bitmask <<= 1;
    2116            0 :             if (bitmask == 0x100)
    2117              :             {
    2118            0 :                 bitmap++;
    2119            0 :                 bitmask = 1;
    2120              :             }
    2121              :         }
    2122              :     }
    2123              : 
    2124           20 :     return values;
    2125              : }
    2126              : 
    2127              : static char *
    2128            4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2129              : {
    2130              :     char       *relname;
    2131              :     HeapTuple   tuple;
    2132              :     TupleDesc   tupdesc;
    2133              :     int         natts;
    2134              :     StringInfoData buf;
    2135              :     char       *val;
    2136              :     int         key;
    2137              :     int         i;
    2138              :     bool        needComma;
    2139              : 
    2140            4 :     initStringInfo(&buf);
    2141              : 
    2142              :     /* get relation name including any needed schema prefix and quoting */
    2143            4 :     relname = generate_relation_name(rel);
    2144              : 
    2145            4 :     tupdesc = rel->rd_att;
    2146            4 :     natts = tupdesc->natts;
    2147              : 
    2148            4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2149            4 :     if (!tuple)
    2150            0 :         ereport(ERROR,
    2151              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2152              :                  errmsg("source row not found")));
    2153              : 
    2154            4 :     appendStringInfo(&buf, "INSERT INTO %s(", relname);
    2155              : 
    2156            4 :     needComma = false;
    2157           19 :     for (i = 0; i < natts; i++)
    2158              :     {
    2159           15 :         Form_pg_attribute att = TupleDescAttr(tupdesc, i);
    2160              : 
    2161           15 :         if (att->attisdropped)
    2162            2 :             continue;
    2163              : 
    2164           13 :         if (needComma)
    2165            9 :             appendStringInfoChar(&buf, ',');
    2166              : 
    2167           13 :         appendStringInfoString(&buf,
    2168           13 :                                quote_ident_cstr(NameStr(att->attname)));
    2169           13 :         needComma = true;
    2170              :     }
    2171              : 
    2172            4 :     appendStringInfoString(&buf, ") VALUES(");
    2173              : 
    2174              :     /*
    2175              :      * Note: i is physical column number (counting from 0).
    2176              :      */
    2177            4 :     needComma = false;
    2178           19 :     for (i = 0; i < natts; i++)
    2179              :     {
    2180           15 :         if (TupleDescAttr(tupdesc, i)->attisdropped)
    2181            2 :             continue;
    2182              : 
    2183           13 :         if (needComma)
    2184            9 :             appendStringInfoChar(&buf, ',');
    2185              : 
    2186           13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2187              : 
    2188           13 :         if (key >= 0)
    2189            7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2190              :         else
    2191            6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2192              : 
    2193           13 :         if (val != NULL)
    2194              :         {
    2195           13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2196           13 :             pfree(val);
    2197              :         }
    2198              :         else
    2199            0 :             appendStringInfoString(&buf, "NULL");
    2200           13 :         needComma = true;
    2201              :     }
    2202            4 :     appendStringInfoChar(&buf, ')');
    2203              : 
    2204            4 :     return buf.data;
    2205              : }
    2206              : 
    2207              : static char *
    2208            4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
    2209              : {
    2210              :     char       *relname;
    2211              :     TupleDesc   tupdesc;
    2212              :     StringInfoData buf;
    2213              :     int         i;
    2214              : 
    2215            4 :     initStringInfo(&buf);
    2216              : 
    2217              :     /* get relation name including any needed schema prefix and quoting */
    2218            4 :     relname = generate_relation_name(rel);
    2219              : 
    2220            4 :     tupdesc = rel->rd_att;
    2221              : 
    2222            4 :     appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
    2223           11 :     for (i = 0; i < pknumatts; i++)
    2224              :     {
    2225            7 :         int         pkattnum = pkattnums[i];
    2226            7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2227              : 
    2228            7 :         if (i > 0)
    2229            3 :             appendStringInfoString(&buf, " AND ");
    2230              : 
    2231            7 :         appendStringInfoString(&buf,
    2232            7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2233              : 
    2234            7 :         if (tgt_pkattvals[i] != NULL)
    2235            7 :             appendStringInfo(&buf, " = %s",
    2236            7 :                              quote_literal_cstr(tgt_pkattvals[i]));
    2237              :         else
    2238            0 :             appendStringInfoString(&buf, " IS NULL");
    2239              :     }
    2240              : 
    2241            4 :     return buf.data;
    2242              : }
    2243              : 
    2244              : static char *
    2245            4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
    2246              : {
    2247              :     char       *relname;
    2248              :     HeapTuple   tuple;
    2249              :     TupleDesc   tupdesc;
    2250              :     int         natts;
    2251              :     StringInfoData buf;
    2252              :     char       *val;
    2253              :     int         key;
    2254              :     int         i;
    2255              :     bool        needComma;
    2256              : 
    2257            4 :     initStringInfo(&buf);
    2258              : 
    2259              :     /* get relation name including any needed schema prefix and quoting */
    2260            4 :     relname = generate_relation_name(rel);
    2261              : 
    2262            4 :     tupdesc = rel->rd_att;
    2263            4 :     natts = tupdesc->natts;
    2264              : 
    2265            4 :     tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
    2266            4 :     if (!tuple)
    2267            0 :         ereport(ERROR,
    2268              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2269              :                  errmsg("source row not found")));
    2270              : 
    2271            4 :     appendStringInfo(&buf, "UPDATE %s SET ", relname);
    2272              : 
    2273              :     /*
    2274              :      * Note: i is physical column number (counting from 0).
    2275              :      */
    2276            4 :     needComma = false;
    2277           19 :     for (i = 0; i < natts; i++)
    2278              :     {
    2279           15 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2280              : 
    2281           15 :         if (attr->attisdropped)
    2282            2 :             continue;
    2283              : 
    2284           13 :         if (needComma)
    2285            9 :             appendStringInfoString(&buf, ", ");
    2286              : 
    2287           13 :         appendStringInfo(&buf, "%s = ",
    2288           13 :                          quote_ident_cstr(NameStr(attr->attname)));
    2289              : 
    2290           13 :         key = get_attnum_pk_pos(pkattnums, pknumatts, i);
    2291              : 
    2292           13 :         if (key >= 0)
    2293            7 :             val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
    2294              :         else
    2295            6 :             val = SPI_getvalue(tuple, tupdesc, i + 1);
    2296              : 
    2297           13 :         if (val != NULL)
    2298              :         {
    2299           13 :             appendStringInfoString(&buf, quote_literal_cstr(val));
    2300           13 :             pfree(val);
    2301              :         }
    2302              :         else
    2303            0 :             appendStringInfoString(&buf, "NULL");
    2304           13 :         needComma = true;
    2305              :     }
    2306              : 
    2307            4 :     appendStringInfoString(&buf, " WHERE ");
    2308              : 
    2309           11 :     for (i = 0; i < pknumatts; i++)
    2310              :     {
    2311            7 :         int         pkattnum = pkattnums[i];
    2312            7 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2313              : 
    2314            7 :         if (i > 0)
    2315            3 :             appendStringInfoString(&buf, " AND ");
    2316              : 
    2317            7 :         appendStringInfoString(&buf,
    2318            7 :                                quote_ident_cstr(NameStr(attr->attname)));
    2319              : 
    2320            7 :         val = tgt_pkattvals[i];
    2321              : 
    2322            7 :         if (val != NULL)
    2323            7 :             appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
    2324              :         else
    2325            0 :             appendStringInfoString(&buf, " IS NULL");
    2326              :     }
    2327              : 
    2328            4 :     return buf.data;
    2329              : }
    2330              : 
    2331              : /*
    2332              :  * Return a properly quoted identifier.
    2333              :  * Uses quote_ident in quote.c
    2334              :  */
    2335              : static char *
    2336           80 : quote_ident_cstr(char *rawstr)
    2337              : {
    2338              :     text       *rawstr_text;
    2339              :     text       *result_text;
    2340              :     char       *result;
    2341              : 
    2342           80 :     rawstr_text = cstring_to_text(rawstr);
    2343           80 :     result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
    2344              :                                                      PointerGetDatum(rawstr_text)));
    2345           80 :     result = text_to_cstring(result_text);
    2346              : 
    2347           80 :     return result;
    2348              : }
    2349              : 
    2350              : static int
    2351           26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
    2352              : {
    2353              :     int         i;
    2354              : 
    2355              :     /*
    2356              :      * Not likely a long list anyway, so just scan for the value
    2357              :      */
    2358           50 :     for (i = 0; i < pknumatts; i++)
    2359           38 :         if (key == pkattnums[i])
    2360           14 :             return i;
    2361              : 
    2362           12 :     return -1;
    2363              : }
    2364              : 
    2365              : static HeapTuple
    2366            8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
    2367              : {
    2368              :     char       *relname;
    2369              :     TupleDesc   tupdesc;
    2370              :     int         natts;
    2371              :     StringInfoData buf;
    2372              :     int         ret;
    2373              :     HeapTuple   tuple;
    2374              :     int         i;
    2375              : 
    2376              :     /*
    2377              :      * Connect to SPI manager
    2378              :      */
    2379            8 :     SPI_connect();
    2380              : 
    2381            8 :     initStringInfo(&buf);
    2382              : 
    2383              :     /* get relation name including any needed schema prefix and quoting */
    2384            8 :     relname = generate_relation_name(rel);
    2385              : 
    2386            8 :     tupdesc = rel->rd_att;
    2387            8 :     natts = tupdesc->natts;
    2388              : 
    2389              :     /*
    2390              :      * Build sql statement to look up tuple of interest, ie, the one matching
    2391              :      * src_pkattvals.  We used to use "SELECT *" here, but it's simpler to
    2392              :      * generate a result tuple that matches the table's physical structure,
    2393              :      * with NULLs for any dropped columns.  Otherwise we have to deal with two
    2394              :      * different tupdescs and everything's very confusing.
    2395              :      */
    2396            8 :     appendStringInfoString(&buf, "SELECT ");
    2397              : 
    2398           38 :     for (i = 0; i < natts; i++)
    2399              :     {
    2400           30 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
    2401              : 
    2402           30 :         if (i > 0)
    2403           22 :             appendStringInfoString(&buf, ", ");
    2404              : 
    2405           30 :         if (attr->attisdropped)
    2406            4 :             appendStringInfoString(&buf, "NULL");
    2407              :         else
    2408           26 :             appendStringInfoString(&buf,
    2409           26 :                                    quote_ident_cstr(NameStr(attr->attname)));
    2410              :     }
    2411              : 
    2412            8 :     appendStringInfo(&buf, " FROM %s WHERE ", relname);
    2413              : 
    2414           22 :     for (i = 0; i < pknumatts; i++)
    2415              :     {
    2416           14 :         int         pkattnum = pkattnums[i];
    2417           14 :         Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
    2418              : 
    2419           14 :         if (i > 0)
    2420            6 :             appendStringInfoString(&buf, " AND ");
    2421              : 
    2422           14 :         appendStringInfoString(&buf,
    2423           14 :                                quote_ident_cstr(NameStr(attr->attname)));
    2424              : 
    2425           14 :         if (src_pkattvals[i] != NULL)
    2426           14 :             appendStringInfo(&buf, " = %s",
    2427           14 :                              quote_literal_cstr(src_pkattvals[i]));
    2428              :         else
    2429            0 :             appendStringInfoString(&buf, " IS NULL");
    2430              :     }
    2431              : 
    2432              :     /*
    2433              :      * Retrieve the desired tuple
    2434              :      */
    2435            8 :     ret = SPI_exec(buf.data, 0);
    2436            8 :     pfree(buf.data);
    2437              : 
    2438              :     /*
    2439              :      * Only allow one qualifying tuple
    2440              :      */
    2441            8 :     if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
    2442            0 :         ereport(ERROR,
    2443              :                 (errcode(ERRCODE_CARDINALITY_VIOLATION),
    2444              :                  errmsg("source criteria matched more than one record")));
    2445              : 
    2446            8 :     else if (ret == SPI_OK_SELECT && SPI_processed == 1)
    2447              :     {
    2448            8 :         SPITupleTable *tuptable = SPI_tuptable;
    2449              : 
    2450            8 :         tuple = SPI_copytuple(tuptable->vals[0]);
    2451            8 :         SPI_finish();
    2452              : 
    2453            8 :         return tuple;
    2454              :     }
    2455              :     else
    2456              :     {
    2457              :         /*
    2458              :          * no qualifying tuples
    2459              :          */
    2460            0 :         SPI_finish();
    2461              : 
    2462            0 :         return NULL;
    2463              :     }
    2464              : 
    2465              :     /*
    2466              :      * never reached, but keep compiler quiet
    2467              :      */
    2468              :     return NULL;
    2469              : }
    2470              : 
    2471              : static void
    2472           21 : RangeVarCallbackForDblink(const RangeVar *relation,
    2473              :                           Oid relId, Oid oldRelId, void *arg)
    2474              : {
    2475              :     AclResult   aclresult;
    2476              : 
    2477           21 :     if (!OidIsValid(relId))
    2478            0 :         return;
    2479              : 
    2480           21 :     aclresult = pg_class_aclcheck(relId, GetUserId(), *((AclMode *) arg));
    2481           21 :     if (aclresult != ACLCHECK_OK)
    2482            0 :         aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relId)),
    2483            0 :                        relation->relname);
    2484              : }
    2485              : 
    2486              : /*
    2487              :  * Open the relation named by relname_text, acquire specified type of lock,
    2488              :  * verify we have specified permissions.
    2489              :  * Caller must close rel when done with it.
    2490              :  */
    2491              : static Relation
    2492           21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
    2493              : {
    2494              :     RangeVar   *relvar;
    2495              :     Oid         relid;
    2496              : 
    2497           21 :     relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
    2498           21 :     relid = RangeVarGetRelidExtended(relvar, lockmode, 0,
    2499              :                                      RangeVarCallbackForDblink, &aclmode);
    2500              : 
    2501           21 :     return table_open(relid, NoLock);
    2502              : }
    2503              : 
    2504              : /*
    2505              :  * generate_relation_name - copied from ruleutils.c
    2506              :  *      Compute the name to display for a relation
    2507              :  *
    2508              :  * The result includes all necessary quoting and schema-prefixing.
    2509              :  */
    2510              : static char *
    2511           20 : generate_relation_name(Relation rel)
    2512              : {
    2513              :     char       *nspname;
    2514              :     char       *result;
    2515              : 
    2516              :     /* Qualify the name if not visible in search path */
    2517           20 :     if (RelationIsVisible(RelationGetRelid(rel)))
    2518           15 :         nspname = NULL;
    2519              :     else
    2520            5 :         nspname = get_namespace_name(rel->rd_rel->relnamespace);
    2521              : 
    2522           20 :     result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
    2523              : 
    2524           20 :     return result;
    2525              : }
    2526              : 
    2527              : 
    2528              : static remoteConn *
    2529           79 : getConnectionByName(const char *name)
    2530              : {
    2531              :     remoteConnHashEnt *hentry;
    2532              :     char       *key;
    2533              : 
    2534           79 :     if (!remoteConnHash)
    2535            6 :         remoteConnHash = createConnHash();
    2536              : 
    2537           79 :     key = pstrdup(name);
    2538           79 :     truncate_identifier(key, strlen(key), false);
    2539           79 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2540              :                                                key, HASH_FIND, NULL);
    2541              : 
    2542           79 :     if (hentry && hentry->rconn.conn != NULL)
    2543           68 :         return &hentry->rconn;
    2544              : 
    2545           11 :     return NULL;
    2546              : }
    2547              : 
    2548              : static HTAB *
    2549            7 : createConnHash(void)
    2550              : {
    2551              :     HASHCTL     ctl;
    2552              : 
    2553            7 :     ctl.keysize = NAMEDATALEN;
    2554            7 :     ctl.entrysize = sizeof(remoteConnHashEnt);
    2555              : 
    2556            7 :     return hash_create("Remote Con hash", NUMCONN, &ctl,
    2557              :                        HASH_ELEM | HASH_STRINGS);
    2558              : }
    2559              : 
    2560              : static remoteConn *
    2561           10 : createNewConnection(const char *name)
    2562              : {
    2563              :     remoteConnHashEnt *hentry;
    2564              :     bool        found;
    2565              :     char       *key;
    2566              : 
    2567           10 :     if (!remoteConnHash)
    2568            1 :         remoteConnHash = createConnHash();
    2569              : 
    2570           10 :     key = pstrdup(name);
    2571           10 :     truncate_identifier(key, strlen(key), true);
    2572           10 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
    2573              :                                                HASH_ENTER, &found);
    2574              : 
    2575           10 :     if (found && hentry->rconn.conn != NULL)
    2576            1 :         ereport(ERROR,
    2577              :                 (errcode(ERRCODE_DUPLICATE_OBJECT),
    2578              :                  errmsg("duplicate connection name")));
    2579              : 
    2580              :     /* New, or reusable, so initialize the rconn struct to zeroes */
    2581            9 :     memset(&hentry->rconn, 0, sizeof(remoteConn));
    2582              : 
    2583            9 :     return &hentry->rconn;
    2584              : }
    2585              : 
    2586              : static void
    2587            8 : deleteConnection(const char *name)
    2588              : {
    2589              :     remoteConnHashEnt *hentry;
    2590              :     bool        found;
    2591              :     char       *key;
    2592              : 
    2593            8 :     if (!remoteConnHash)
    2594            0 :         remoteConnHash = createConnHash();
    2595              : 
    2596            8 :     key = pstrdup(name);
    2597            8 :     truncate_identifier(key, strlen(key), false);
    2598            8 :     hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
    2599              :                                                key, HASH_REMOVE, &found);
    2600              : 
    2601            8 :     if (!hentry)
    2602            0 :         ereport(ERROR,
    2603              :                 (errcode(ERRCODE_UNDEFINED_OBJECT),
    2604              :                  errmsg("undefined connection name")));
    2605            8 : }
    2606              : 
    2607              :  /*
    2608              :   * Ensure that require_auth and SCRAM keys are correctly set on connstr.
    2609              :   * SCRAM keys used to pass-through are coming from the initial connection
    2610              :   * from the client with the server.
    2611              :   *
    2612              :   * All required SCRAM options are set by dblink, so we just need to ensure
    2613              :   * that these options are not overwritten by the user.
    2614              :   *
    2615              :   * See appendSCRAMKeysInfo and its usage for more.
    2616              :   */
    2617              : bool
    2618            6 : dblink_connstr_has_required_scram_options(const char *connstr)
    2619              : {
    2620              :     PQconninfoOption *options;
    2621            6 :     bool        has_scram_server_key = false;
    2622            6 :     bool        has_scram_client_key = false;
    2623            6 :     bool        has_require_auth = false;
    2624            6 :     bool        has_scram_keys = false;
    2625              : 
    2626            6 :     options = PQconninfoParse(connstr, NULL);
    2627            6 :     if (options)
    2628              :     {
    2629              :         /*
    2630              :          * Continue iterating even if we found the keys that we need to
    2631              :          * validate to make sure that there is no other declaration of these
    2632              :          * keys that can overwrite the first.
    2633              :          */
    2634          318 :         for (PQconninfoOption *option = options; option->keyword != NULL; option++)
    2635              :         {
    2636          312 :             if (strcmp(option->keyword, "require_auth") == 0)
    2637              :             {
    2638            6 :                 if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
    2639            5 :                     has_require_auth = true;
    2640              :                 else
    2641            1 :                     has_require_auth = false;
    2642              :             }
    2643              : 
    2644          312 :             if (strcmp(option->keyword, "scram_client_key") == 0)
    2645              :             {
    2646            6 :                 if (option->val != NULL && option->val[0] != '\0')
    2647            6 :                     has_scram_client_key = true;
    2648              :                 else
    2649            0 :                     has_scram_client_key = false;
    2650              :             }
    2651              : 
    2652          312 :             if (strcmp(option->keyword, "scram_server_key") == 0)
    2653              :             {
    2654            6 :                 if (option->val != NULL && option->val[0] != '\0')
    2655            6 :                     has_scram_server_key = true;
    2656              :                 else
    2657            0 :                     has_scram_server_key = false;
    2658              :             }
    2659              :         }
    2660            6 :         PQconninfoFree(options);
    2661              :     }
    2662              : 
    2663            6 :     has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
    2664              : 
    2665            6 :     return (has_scram_keys && has_require_auth);
    2666              : }
    2667              : 
    2668              : /*
    2669              :  * We need to make sure that the connection made used credentials
    2670              :  * which were provided by the user, so check what credentials were
    2671              :  * used to connect and then make sure that they came from the user.
    2672              :  *
    2673              :  * On failure, we close "conn" and also delete the hashtable entry
    2674              :  * identified by "connname" (if that's not NULL).
    2675              :  */
    2676              : static void
    2677           20 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
    2678              : {
    2679              :     /* Superuser bypasses security check */
    2680           20 :     if (superuser())
    2681           18 :         return;
    2682              : 
    2683              :     /* If password was used to connect, make sure it was one provided */
    2684            2 :     if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
    2685            0 :         return;
    2686              : 
    2687              :     /*
    2688              :      * Password was not used to connect, check if SCRAM pass-through is in
    2689              :      * use.
    2690              :      *
    2691              :      * If dblink_connstr_has_required_scram_options is true we assume that
    2692              :      * UseScramPassthrough is also true because the required SCRAM keys are
    2693              :      * only added if UseScramPassthrough is set, and the user is not allowed
    2694              :      * to add the SCRAM keys on fdw and user mapping options.
    2695              :      */
    2696            2 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2697            2 :         return;
    2698              : 
    2699              : #ifdef ENABLE_GSS
    2700              :     /* If GSSAPI creds used to connect, make sure it was one delegated */
    2701              :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
    2702              :         return;
    2703              : #endif
    2704              : 
    2705              :     /* Otherwise, fail out */
    2706            0 :     libpqsrv_disconnect(conn);
    2707            0 :     if (connname)
    2708            0 :         deleteConnection(connname);
    2709              : 
    2710            0 :     ereport(ERROR,
    2711              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2712              :              errmsg("password or GSSAPI delegated credentials required"),
    2713              :              errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
    2714              :              errhint("Ensure provided credentials match target server's authentication method.")));
    2715              : }
    2716              : 
    2717              : /*
    2718              :  * Function to check if the connection string includes an explicit
    2719              :  * password, needed to ensure that non-superuser password-based auth
    2720              :  * is using a provided password and not one picked up from the
    2721              :  * environment.
    2722              :  */
    2723              : static bool
    2724            7 : dblink_connstr_has_pw(const char *connstr)
    2725              : {
    2726              :     PQconninfoOption *options;
    2727              :     PQconninfoOption *option;
    2728            7 :     bool        connstr_gives_password = false;
    2729              : 
    2730            7 :     options = PQconninfoParse(connstr, NULL);
    2731            7 :     if (options)
    2732              :     {
    2733          371 :         for (option = options; option->keyword != NULL; option++)
    2734              :         {
    2735          364 :             if (strcmp(option->keyword, "password") == 0)
    2736              :             {
    2737            7 :                 if (option->val != NULL && option->val[0] != '\0')
    2738              :                 {
    2739            0 :                     connstr_gives_password = true;
    2740            0 :                     break;
    2741              :                 }
    2742              :             }
    2743              :         }
    2744            7 :         PQconninfoFree(options);
    2745              :     }
    2746              : 
    2747            7 :     return connstr_gives_password;
    2748              : }
    2749              : 
    2750              : /*
    2751              :  * For non-superusers, insist that the connstr specify a password, except if
    2752              :  * GSSAPI credentials have been delegated (and we check that they are used for
    2753              :  * the connection in dblink_security_check later) or if SCRAM pass-through is
    2754              :  * being used.  This prevents a password or GSSAPI credentials from being
    2755              :  * picked up from .pgpass, a service file, the environment, etc.  We don't want
    2756              :  * the postgres user's passwords or Kerberos credentials to be accessible to
    2757              :  * non-superusers. In case of SCRAM pass-through insist that the connstr
    2758              :  * has the required SCRAM pass-through options.
    2759              :  */
    2760              : static void
    2761           26 : dblink_connstr_check(const char *connstr)
    2762              : {
    2763           26 :     if (superuser())
    2764           21 :         return;
    2765              : 
    2766            5 :     if (dblink_connstr_has_pw(connstr))
    2767            0 :         return;
    2768              : 
    2769            5 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
    2770            3 :         return;
    2771              : 
    2772              : #ifdef ENABLE_GSS
    2773              :     if (be_gssapi_get_delegation(MyProcPort))
    2774              :         return;
    2775              : #endif
    2776              : 
    2777            2 :     ereport(ERROR,
    2778              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
    2779              :              errmsg("password or GSSAPI delegated credentials required"),
    2780              :              errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
    2781              : }
    2782              : 
    2783              : /*
    2784              :  * Report an error received from the remote server
    2785              :  *
    2786              :  * res: the received error result
    2787              :  * fail: true for ERROR ereport, false for NOTICE
    2788              :  * fmt and following args: sprintf-style format and values for errcontext;
    2789              :  * the resulting string should be worded like "while <some action>"
    2790              :  *
    2791              :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
    2792              :  * in which case memory context cleanup will clear it eventually).
    2793              :  */
    2794              : static void
    2795           12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
    2796              :                  bool fail, const char *fmt,...)
    2797              : {
    2798              :     int         level;
    2799           12 :     char       *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    2800           12 :     char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    2801           12 :     char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    2802           12 :     char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    2803           12 :     char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    2804              :     int         sqlstate;
    2805              :     va_list     ap;
    2806              :     char        dblink_context_msg[512];
    2807              : 
    2808           12 :     if (fail)
    2809            3 :         level = ERROR;
    2810              :     else
    2811            9 :         level = NOTICE;
    2812              : 
    2813           12 :     if (pg_diag_sqlstate)
    2814           12 :         sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
    2815              :                                  pg_diag_sqlstate[1],
    2816              :                                  pg_diag_sqlstate[2],
    2817              :                                  pg_diag_sqlstate[3],
    2818              :                                  pg_diag_sqlstate[4]);
    2819              :     else
    2820            0 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    2821              : 
    2822              :     /*
    2823              :      * If we don't get a message from the PGresult, try the PGconn.  This is
    2824              :      * needed because for connection-level failures, PQgetResult may just
    2825              :      * return NULL, not a PGresult at all.
    2826              :      */
    2827           12 :     if (message_primary == NULL)
    2828            0 :         message_primary = pchomp(PQerrorMessage(conn));
    2829              : 
    2830              :     /*
    2831              :      * Format the basic errcontext string.  Below, we'll add on something
    2832              :      * about the connection name.  That's a violation of the translatability
    2833              :      * guidelines about constructing error messages out of parts, but since
    2834              :      * there's no translation support for dblink, there's no need to worry
    2835              :      * about that (yet).
    2836              :      */
    2837           12 :     va_start(ap, fmt);
    2838           12 :     vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
    2839           12 :     va_end(ap);
    2840              : 
    2841           12 :     ereport(level,
    2842              :             (errcode(sqlstate),
    2843              :              (message_primary != NULL && message_primary[0] != '\0') ?
    2844              :              errmsg_internal("%s", message_primary) :
    2845              :              errmsg("could not obtain message string for remote error"),
    2846              :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    2847              :              message_hint ? errhint("%s", message_hint) : 0,
    2848              :              message_context ? (errcontext("%s", message_context)) : 0,
    2849              :              conname ?
    2850              :              (errcontext("%s on dblink connection named \"%s\"",
    2851              :                          dblink_context_msg, conname)) :
    2852              :              (errcontext("%s on unnamed dblink connection",
    2853              :                          dblink_context_msg))));
    2854            9 :     PQclear(res);
    2855            9 : }
    2856              : 
    2857              : /*
    2858              :  * Obtain connection string for a foreign server
    2859              :  */
    2860              : static char *
    2861           26 : get_connect_string(const char *servername)
    2862              : {
    2863           26 :     ForeignServer *foreign_server = NULL;
    2864              :     UserMapping *user_mapping;
    2865              :     ListCell   *cell;
    2866              :     StringInfoData buf;
    2867              :     ForeignDataWrapper *fdw;
    2868              :     AclResult   aclresult;
    2869              :     char       *srvname;
    2870              : 
    2871              :     static const PQconninfoOption *options = NULL;
    2872              : 
    2873           26 :     initStringInfo(&buf);
    2874              : 
    2875              :     /*
    2876              :      * Get list of valid libpq options.
    2877              :      *
    2878              :      * To avoid unnecessary work, we get the list once and use it throughout
    2879              :      * the lifetime of this backend process.  We don't need to care about
    2880              :      * memory context issues, because PQconndefaults allocates with malloc.
    2881              :      */
    2882           26 :     if (!options)
    2883              :     {
    2884            7 :         options = PQconndefaults();
    2885            7 :         if (!options)           /* assume reason for failure is OOM */
    2886            0 :             ereport(ERROR,
    2887              :                     (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
    2888              :                      errmsg("out of memory"),
    2889              :                      errdetail("Could not get libpq's default connection options.")));
    2890              :     }
    2891              : 
    2892              :     /* first gather the server connstr options */
    2893           26 :     srvname = pstrdup(servername);
    2894           26 :     truncate_identifier(srvname, strlen(srvname), false);
    2895           26 :     foreign_server = GetForeignServerByName(srvname, true);
    2896              : 
    2897           26 :     if (foreign_server)
    2898              :     {
    2899            6 :         Oid         serverid = foreign_server->serverid;
    2900            6 :         Oid         fdwid = foreign_server->fdwid;
    2901            6 :         Oid         userid = GetUserId();
    2902              : 
    2903            6 :         user_mapping = GetUserMapping(userid, serverid);
    2904            6 :         fdw = GetForeignDataWrapper(fdwid);
    2905              : 
    2906              :         /* Check permissions, user must have usage on the server. */
    2907            6 :         aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
    2908            6 :         if (aclresult != ACLCHECK_OK)
    2909            0 :             aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
    2910              : 
    2911              :         /*
    2912              :          * First append hardcoded options needed for SCRAM pass-through, so if
    2913              :          * the user overwrites these options we can ereport on
    2914              :          * dblink_connstr_check and dblink_security_check.
    2915              :          */
    2916            6 :         if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
    2917            4 :             appendSCRAMKeysInfo(&buf);
    2918              : 
    2919            6 :         foreach(cell, fdw->options)
    2920              :         {
    2921            0 :             DefElem    *def = lfirst(cell);
    2922              : 
    2923            0 :             if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
    2924            0 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2925            0 :                                  escape_param_str(strVal(def->arg)));
    2926              :         }
    2927              : 
    2928           27 :         foreach(cell, foreign_server->options)
    2929              :         {
    2930           21 :             DefElem    *def = lfirst(cell);
    2931              : 
    2932           21 :             if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
    2933           17 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2934           17 :                                  escape_param_str(strVal(def->arg)));
    2935              :         }
    2936              : 
    2937           12 :         foreach(cell, user_mapping->options)
    2938              :         {
    2939              : 
    2940            6 :             DefElem    *def = lfirst(cell);
    2941              : 
    2942            6 :             if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
    2943            6 :                 appendStringInfo(&buf, "%s='%s' ", def->defname,
    2944            6 :                                  escape_param_str(strVal(def->arg)));
    2945              :         }
    2946              : 
    2947            6 :         return buf.data;
    2948              :     }
    2949              :     else
    2950           20 :         return NULL;
    2951              : }
    2952              : 
    2953              : /*
    2954              :  * Escaping libpq connect parameter strings.
    2955              :  *
    2956              :  * Replaces "'" with "\'" and "\" with "\\".
    2957              :  */
    2958              : static char *
    2959           23 : escape_param_str(const char *str)
    2960              : {
    2961              :     const char *cp;
    2962              :     StringInfoData buf;
    2963              : 
    2964           23 :     initStringInfo(&buf);
    2965              : 
    2966          205 :     for (cp = str; *cp; cp++)
    2967              :     {
    2968          182 :         if (*cp == '\\' || *cp == '\'')
    2969            0 :             appendStringInfoChar(&buf, '\\');
    2970          182 :         appendStringInfoChar(&buf, *cp);
    2971              :     }
    2972              : 
    2973           23 :     return buf.data;
    2974              : }
    2975              : 
    2976              : /*
    2977              :  * Validate the PK-attnums argument for dblink_build_sql_insert() and related
    2978              :  * functions, and translate to the internal representation.
    2979              :  *
    2980              :  * The user supplies an int2vector of 1-based logical attnums, plus a count
    2981              :  * argument (the need for the separate count argument is historical, but we
    2982              :  * still check it).  We check that each attnum corresponds to a valid,
    2983              :  * non-dropped attribute of the rel.  We do *not* prevent attnums from being
    2984              :  * listed twice, though the actual use-case for such things is dubious.
    2985              :  * Note that before Postgres 9.0, the user's attnums were interpreted as
    2986              :  * physical not logical column numbers; this was changed for future-proofing.
    2987              :  *
    2988              :  * The internal representation is a palloc'd int array of 0-based physical
    2989              :  * attnums.
    2990              :  */
    2991              : static void
    2992           18 : validate_pkattnums(Relation rel,
    2993              :                    int2vector *pkattnums_arg, int32 pknumatts_arg,
    2994              :                    int **pkattnums, int *pknumatts)
    2995              : {
    2996           18 :     TupleDesc   tupdesc = rel->rd_att;
    2997           18 :     int         natts = tupdesc->natts;
    2998              :     int         i;
    2999              : 
    3000              :     /* Don't take more array elements than there are */
    3001           18 :     pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
    3002              : 
    3003              :     /* Must have at least one pk attnum selected */
    3004           18 :     if (pknumatts_arg <= 0)
    3005            0 :         ereport(ERROR,
    3006              :                 (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3007              :                  errmsg("number of key attributes must be > 0")));
    3008              : 
    3009              :     /* Allocate output array */
    3010           18 :     *pkattnums = palloc_array(int, pknumatts_arg);
    3011           18 :     *pknumatts = pknumatts_arg;
    3012              : 
    3013              :     /* Validate attnums and convert to internal form */
    3014           57 :     for (i = 0; i < pknumatts_arg; i++)
    3015              :     {
    3016           45 :         int         pkattnum = pkattnums_arg->values[i];
    3017              :         int         lnum;
    3018              :         int         j;
    3019              : 
    3020              :         /* Can throw error immediately if out of range */
    3021           45 :         if (pkattnum <= 0 || pkattnum > natts)
    3022            6 :             ereport(ERROR,
    3023              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3024              :                      errmsg("invalid attribute number %d", pkattnum)));
    3025              : 
    3026              :         /* Identify which physical column has this logical number */
    3027           39 :         lnum = 0;
    3028           69 :         for (j = 0; j < natts; j++)
    3029              :         {
    3030              :             /* dropped columns don't count */
    3031           69 :             if (TupleDescCompactAttr(tupdesc, j)->attisdropped)
    3032            3 :                 continue;
    3033              : 
    3034           66 :             if (++lnum == pkattnum)
    3035           39 :                 break;
    3036              :         }
    3037              : 
    3038           39 :         if (j < natts)
    3039           39 :             (*pkattnums)[i] = j;
    3040              :         else
    3041            0 :             ereport(ERROR,
    3042              :                     (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
    3043              :                      errmsg("invalid attribute number %d", pkattnum)));
    3044              :     }
    3045           12 : }
    3046              : 
    3047              : /*
    3048              :  * Check if the specified connection option is valid.
    3049              :  *
    3050              :  * We basically allow whatever libpq thinks is an option, with these
    3051              :  * restrictions:
    3052              :  *      debug options: disallowed
    3053              :  *      "client_encoding": disallowed
    3054              :  *      "user": valid only in USER MAPPING options
    3055              :  *      secure options (eg password): valid only in USER MAPPING options
    3056              :  *      others: valid only in FOREIGN SERVER options
    3057              :  *
    3058              :  * We disallow client_encoding because it would be overridden anyway via
    3059              :  * PQclientEncoding; allowing it to be specified would merely promote
    3060              :  * confusion.
    3061              :  */
    3062              : static bool
    3063          478 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
    3064              :                        Oid context)
    3065              : {
    3066              :     const PQconninfoOption *opt;
    3067              : 
    3068              :     /* Look up the option in libpq result */
    3069        12050 :     for (opt = options; opt->keyword; opt++)
    3070              :     {
    3071        12044 :         if (strcmp(opt->keyword, option) == 0)
    3072          472 :             break;
    3073              :     }
    3074          478 :     if (opt->keyword == NULL)
    3075            6 :         return false;
    3076              : 
    3077              :     /* Disallow debug options (particularly "replication") */
    3078          472 :     if (strchr(opt->dispchar, 'D'))
    3079           34 :         return false;
    3080              : 
    3081              :     /* Disallow "client_encoding" */
    3082          438 :     if (strcmp(opt->keyword, "client_encoding") == 0)
    3083            8 :         return false;
    3084              : 
    3085              :     /*
    3086              :      * Disallow OAuth options for now, since the builtin flow communicates on
    3087              :      * stderr by default and can't cache tokens yet.
    3088              :      */
    3089          430 :     if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
    3090           44 :         return false;
    3091              : 
    3092              :     /*
    3093              :      * If the option is "user" or marked secure, it should be specified only
    3094              :      * in USER MAPPING.  Others should be specified only in SERVER.
    3095              :      */
    3096          386 :     if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
    3097              :     {
    3098           38 :         if (context != UserMappingRelationId)
    3099            9 :             return false;
    3100              :     }
    3101              :     else
    3102              :     {
    3103          348 :         if (context != ForeignServerRelationId)
    3104          234 :             return false;
    3105              :     }
    3106              : 
    3107          143 :     return true;
    3108              : }
    3109              : 
    3110              : /*
    3111              :  * Same as is_valid_dblink_option but also check for only dblink_fdw specific
    3112              :  * options.
    3113              :  */
    3114              : static bool
    3115           39 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
    3116              :                            Oid context)
    3117              : {
    3118           39 :     if (strcmp(option, "use_scram_passthrough") == 0)
    3119            4 :         return true;
    3120              : 
    3121           35 :     return is_valid_dblink_option(options, option, context);
    3122              : }
    3123              : 
    3124              : /*
    3125              :  * Copy the remote session's values of GUCs that affect datatype I/O
    3126              :  * and apply them locally in a new GUC nesting level.  Returns the new
    3127              :  * nestlevel (which is needed by restoreLocalGucs to undo the settings),
    3128              :  * or -1 if no new nestlevel was needed.
    3129              :  *
    3130              :  * We use the equivalent of a function SET option to allow the settings to
    3131              :  * persist only until the caller calls restoreLocalGucs.  If an error is
    3132              :  * thrown in between, guc.c will take care of undoing the settings.
    3133              :  */
    3134              : static int
    3135           34 : applyRemoteGucs(PGconn *conn)
    3136              : {
    3137              :     static const char *const GUCsAffectingIO[] = {
    3138              :         "DateStyle",
    3139              :         "IntervalStyle"
    3140              :     };
    3141              : 
    3142           34 :     int         nestlevel = -1;
    3143              :     int         i;
    3144              : 
    3145          102 :     for (i = 0; i < lengthof(GUCsAffectingIO); i++)
    3146              :     {
    3147           68 :         const char *gucName = GUCsAffectingIO[i];
    3148           68 :         const char *remoteVal = PQparameterStatus(conn, gucName);
    3149              :         const char *localVal;
    3150              : 
    3151              :         /*
    3152              :          * If the remote server is pre-8.4, it won't have IntervalStyle, but
    3153              :          * that's okay because its output format won't be ambiguous.  So just
    3154              :          * skip the GUC if we don't get a value for it.  (We might eventually
    3155              :          * need more complicated logic with remote-version checks here.)
    3156              :          */
    3157           68 :         if (remoteVal == NULL)
    3158            0 :             continue;
    3159              : 
    3160              :         /*
    3161              :          * Avoid GUC-setting overhead if the remote and local GUCs already
    3162              :          * have the same value.
    3163              :          */
    3164           68 :         localVal = GetConfigOption(gucName, false, false);
    3165              :         Assert(localVal != NULL);
    3166              : 
    3167           68 :         if (strcmp(remoteVal, localVal) == 0)
    3168           51 :             continue;
    3169              : 
    3170              :         /* Create new GUC nest level if we didn't already */
    3171           17 :         if (nestlevel < 0)
    3172            9 :             nestlevel = NewGUCNestLevel();
    3173              : 
    3174              :         /* Apply the option (this will throw error on failure) */
    3175           17 :         (void) set_config_option(gucName, remoteVal,
    3176              :                                  PGC_USERSET, PGC_S_SESSION,
    3177              :                                  GUC_ACTION_SAVE, true, 0, false);
    3178              :     }
    3179              : 
    3180           34 :     return nestlevel;
    3181              : }
    3182              : 
    3183              : /*
    3184              :  * Restore local GUCs after they have been overlaid with remote settings.
    3185              :  */
    3186              : static void
    3187           35 : restoreLocalGucs(int nestlevel)
    3188              : {
    3189              :     /* Do nothing if no new nestlevel was created */
    3190           35 :     if (nestlevel > 0)
    3191            8 :         AtEOXact_GUC(true, nestlevel);
    3192           35 : }
    3193              : 
    3194              : /*
    3195              :  * Append SCRAM client key and server key information from the global
    3196              :  * MyProcPort into the given StringInfo buffer.
    3197              :  */
    3198              : static void
    3199            4 : appendSCRAMKeysInfo(StringInfo buf)
    3200              : {
    3201              :     int         len;
    3202              :     int         encoded_len;
    3203              :     char       *client_key;
    3204              :     char       *server_key;
    3205              : 
    3206            4 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
    3207              :     /* don't forget the zero-terminator */
    3208            4 :     client_key = palloc0(len + 1);
    3209            4 :     encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
    3210              :                                 sizeof(MyProcPort->scram_ClientKey),
    3211              :                                 client_key, len);
    3212            4 :     if (encoded_len < 0)
    3213            0 :         elog(ERROR, "could not encode SCRAM client key");
    3214              : 
    3215            4 :     len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
    3216              :     /* don't forget the zero-terminator */
    3217            4 :     server_key = palloc0(len + 1);
    3218            4 :     encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
    3219              :                                 sizeof(MyProcPort->scram_ServerKey),
    3220              :                                 server_key, len);
    3221            4 :     if (encoded_len < 0)
    3222            0 :         elog(ERROR, "could not encode SCRAM server key");
    3223              : 
    3224            4 :     appendStringInfo(buf, "scram_client_key='%s' ", client_key);
    3225            4 :     appendStringInfo(buf, "scram_server_key='%s' ", server_key);
    3226            4 :     appendStringInfoString(buf, "require_auth='scram-sha-256' ");
    3227              : 
    3228            4 :     pfree(client_key);
    3229            4 :     pfree(server_key);
    3230            4 : }
    3231              : 
    3232              : 
    3233              : static bool
    3234            4 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
    3235              : {
    3236              :     ListCell   *cell;
    3237              : 
    3238           16 :     foreach(cell, foreign_server->options)
    3239              :     {
    3240           16 :         DefElem    *def = lfirst(cell);
    3241              : 
    3242           16 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3243            4 :             return defGetBoolean(def);
    3244              :     }
    3245              : 
    3246            0 :     foreach(cell, user->options)
    3247              :     {
    3248            0 :         DefElem    *def = (DefElem *) lfirst(cell);
    3249              : 
    3250            0 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
    3251            0 :             return defGetBoolean(def);
    3252              :     }
    3253              : 
    3254            0 :     return false;
    3255              : }
        

Generated by: LCOV version 2.0-1