LCOV - code coverage report
Current view: top level - contrib/postgres_fdw - connection.c (source / functions) Hit Total Coverage
Test: PostgreSQL 19devel Lines: 589 706 83.4 %
Date: 2025-07-31 06:18:14 Functions: 49 51 96.1 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * connection.c
       4             :  *        Connection management functions for postgres_fdw
       5             :  *
       6             :  * Portions Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/postgres_fdw/connection.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #if HAVE_POLL_H
      16             : #include <poll.h>
      17             : #endif
      18             : 
      19             : #include "access/xact.h"
      20             : #include "catalog/pg_user_mapping.h"
      21             : #include "commands/defrem.h"
      22             : #include "common/base64.h"
      23             : #include "funcapi.h"
      24             : #include "libpq/libpq-be.h"
      25             : #include "libpq/libpq-be-fe-helpers.h"
      26             : #include "mb/pg_wchar.h"
      27             : #include "miscadmin.h"
      28             : #include "pgstat.h"
      29             : #include "postgres_fdw.h"
      30             : #include "storage/latch.h"
      31             : #include "utils/builtins.h"
      32             : #include "utils/hsearch.h"
      33             : #include "utils/inval.h"
      34             : #include "utils/syscache.h"
      35             : 
      36             : /*
      37             :  * Connection cache hash table entry
      38             :  *
      39             :  * The lookup key in this hash table is the user mapping OID. We use just one
      40             :  * connection per user mapping ID, which ensures that all the scans use the
      41             :  * same snapshot during a query.  Using the user mapping OID rather than
      42             :  * the foreign server OID + user OID avoids creating multiple connections when
      43             :  * the public user mapping applies to all user OIDs.
      44             :  *
      45             :  * The "conn" pointer can be NULL if we don't currently have a live connection.
      46             :  * When we do have a connection, xact_depth tracks the current depth of
      47             :  * transactions and subtransactions open on the remote side.  We need to issue
      48             :  * commands at the same nesting depth on the remote as we're executing at
      49             :  * ourselves, so that rolling back a subtransaction will kill the right
      50             :  * queries and not the wrong ones.
      51             :  */
      52             : typedef Oid ConnCacheKey;
      53             : 
      54             : typedef struct ConnCacheEntry
      55             : {
      56             :     ConnCacheKey key;           /* hash key (must be first) */
      57             :     PGconn     *conn;           /* connection to foreign server, or NULL */
      58             :     /* Remaining fields are invalid when conn is NULL: */
      59             :     int         xact_depth;     /* 0 = no xact open, 1 = main xact open, 2 =
      60             :                                  * one level of subxact open, etc */
      61             :     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
      62             :     bool        have_error;     /* have any subxacts aborted in this xact? */
      63             :     bool        changing_xact_state;    /* xact state change in process */
      64             :     bool        parallel_commit;    /* do we commit (sub)xacts in parallel? */
      65             :     bool        parallel_abort; /* do we abort (sub)xacts in parallel? */
      66             :     bool        invalidated;    /* true if reconnect is pending */
      67             :     bool        keep_connections;   /* setting value of keep_connections
      68             :                                      * server option */
      69             :     Oid         serverid;       /* foreign server OID used to get server name */
      70             :     uint32      server_hashvalue;   /* hash value of foreign server OID */
      71             :     uint32      mapping_hashvalue;  /* hash value of user mapping OID */
      72             :     PgFdwConnState state;       /* extra per-connection state */
      73             : } ConnCacheEntry;
      74             : 
      75             : /*
      76             :  * Connection cache (initialized on first use)
      77             :  */
      78             : static HTAB *ConnectionHash = NULL;
      79             : 
      80             : /* for assigning cursor numbers and prepared statement numbers */
      81             : static unsigned int cursor_number = 0;
      82             : static unsigned int prep_stmt_number = 0;
      83             : 
      84             : /* tracks whether any work is needed in callback functions */
      85             : static bool xact_got_connection = false;
      86             : 
      87             : /* custom wait event values, retrieved from shared memory */
      88             : static uint32 pgfdw_we_cleanup_result = 0;
      89             : static uint32 pgfdw_we_connect = 0;
      90             : static uint32 pgfdw_we_get_result = 0;
      91             : 
      92             : /*
      93             :  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
      94             :  * query; if it takes longer than 30 seconds to do these, we assume the
      95             :  * connection is dead.
      96             :  */
      97             : #define CONNECTION_CLEANUP_TIMEOUT  30000
      98             : 
      99             : /*
     100             :  * Milliseconds to wait before issuing another cancel request.  This covers
     101             :  * the race condition where the remote session ignored our cancel request
     102             :  * because it arrived while idle.
     103             :  */
     104             : #define RETRY_CANCEL_TIMEOUT    1000
     105             : 
     106             : /* Macro for constructing abort command to be sent */
     107             : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
     108             :     do { \
     109             :         if (toplevel) \
     110             :             snprintf((sql), sizeof(sql), \
     111             :                      "ABORT TRANSACTION"); \
     112             :         else \
     113             :             snprintf((sql), sizeof(sql), \
     114             :                      "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
     115             :                      (entry)->xact_depth, (entry)->xact_depth); \
     116             :     } while(0)
     117             : 
     118             : /*
     119             :  * Extension version number, for supporting older extension versions' objects
     120             :  */
     121             : enum pgfdwVersion
     122             : {
     123             :     PGFDW_V1_1 = 0,
     124             :     PGFDW_V1_2,
     125             : };
     126             : 
     127             : /*
     128             :  * SQL functions
     129             :  */
     130           4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
     131           6 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
     132           6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
     133           6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
     134             : 
     135             : /* prototypes of private functions */
     136             : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
     137             : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
     138             : static void disconnect_pg_server(ConnCacheEntry *entry);
     139             : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
     140             : static void configure_remote_session(PGconn *conn);
     141             : static void do_sql_command_begin(PGconn *conn, const char *sql);
     142             : static void do_sql_command_end(PGconn *conn, const char *sql,
     143             :                                bool consume_input);
     144             : static void begin_remote_xact(ConnCacheEntry *entry);
     145             : static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
     146             :                                   const char *sql);
     147             : static void pgfdw_xact_callback(XactEvent event, void *arg);
     148             : static void pgfdw_subxact_callback(SubXactEvent event,
     149             :                                    SubTransactionId mySubid,
     150             :                                    SubTransactionId parentSubid,
     151             :                                    void *arg);
     152             : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
     153             : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
     154             : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
     155             : static bool pgfdw_cancel_query(PGconn *conn);
     156             : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
     157             : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
     158             :                                    TimestampTz retrycanceltime,
     159             :                                    bool consume_input);
     160             : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
     161             :                                      bool ignore_errors);
     162             : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
     163             : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
     164             :                                          TimestampTz endtime,
     165             :                                          bool consume_input,
     166             :                                          bool ignore_errors);
     167             : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
     168             :                                      TimestampTz retrycanceltime,
     169             :                                      PGresult **result, bool *timed_out);
     170             : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
     171             : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
     172             :                                       List **pending_entries,
     173             :                                       List **cancel_requested);
     174             : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
     175             : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
     176             :                                                int curlevel);
     177             : static void pgfdw_finish_abort_cleanup(List *pending_entries,
     178             :                                        List *cancel_requested,
     179             :                                        bool toplevel);
     180             : static void pgfdw_security_check(const char **keywords, const char **values,
     181             :                                  UserMapping *user, PGconn *conn);
     182             : static bool UserMappingPasswordRequired(UserMapping *user);
     183             : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
     184             : static bool disconnect_cached_connections(Oid serverid);
     185             : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
     186             :                                                   enum pgfdwVersion api_version);
     187             : static int  pgfdw_conn_check(PGconn *conn);
     188             : static bool pgfdw_conn_checkable(void);
     189             : static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
     190             : 
     191             : /*
     192             :  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
     193             :  * server with the user's authorization.  A new connection is established
     194             :  * if we don't already have a suitable one, and a transaction is opened at
     195             :  * the right subtransaction nesting depth if we didn't do that already.
     196             :  *
     197             :  * will_prep_stmt must be true if caller intends to create any prepared
     198             :  * statements.  Since those don't go away automatically at transaction end
     199             :  * (not even on error), we need this flag to cue manual cleanup.
     200             :  *
     201             :  * If state is not NULL, *state receives the per-connection state associated
     202             :  * with the PGconn.
     203             :  */
     204             : PGconn *
     205        4346 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
     206             : {
     207             :     bool        found;
     208        4346 :     bool        retry = false;
     209             :     ConnCacheEntry *entry;
     210             :     ConnCacheKey key;
     211        4346 :     MemoryContext ccxt = CurrentMemoryContext;
     212             : 
     213             :     /* First time through, initialize connection cache hashtable */
     214        4346 :     if (ConnectionHash == NULL)
     215             :     {
     216             :         HASHCTL     ctl;
     217             : 
     218          18 :         if (pgfdw_we_get_result == 0)
     219          18 :             pgfdw_we_get_result =
     220          18 :                 WaitEventExtensionNew("PostgresFdwGetResult");
     221             : 
     222          18 :         ctl.keysize = sizeof(ConnCacheKey);
     223          18 :         ctl.entrysize = sizeof(ConnCacheEntry);
     224          18 :         ConnectionHash = hash_create("postgres_fdw connections", 8,
     225             :                                      &ctl,
     226             :                                      HASH_ELEM | HASH_BLOBS);
     227             : 
     228             :         /*
     229             :          * Register some callback functions that manage connection cleanup.
     230             :          * This should be done just once in each backend.
     231             :          */
     232          18 :         RegisterXactCallback(pgfdw_xact_callback, NULL);
     233          18 :         RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
     234          18 :         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
     235             :                                       pgfdw_inval_callback, (Datum) 0);
     236          18 :         CacheRegisterSyscacheCallback(USERMAPPINGOID,
     237             :                                       pgfdw_inval_callback, (Datum) 0);
     238             :     }
     239             : 
     240             :     /* Set flag that we did GetConnection during the current transaction */
     241        4346 :     xact_got_connection = true;
     242             : 
     243             :     /* Create hash key for the entry.  Assume no pad bytes in key struct */
     244        4346 :     key = user->umid;
     245             : 
     246             :     /*
     247             :      * Find or create cached entry for requested connection.
     248             :      */
     249        4346 :     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
     250        4346 :     if (!found)
     251             :     {
     252             :         /*
     253             :          * We need only clear "conn" here; remaining fields will be filled
     254             :          * later when "conn" is set.
     255             :          */
     256          34 :         entry->conn = NULL;
     257             :     }
     258             : 
     259             :     /* Reject further use of connections which failed abort cleanup. */
     260        4346 :     pgfdw_reject_incomplete_xact_state_change(entry);
     261             : 
     262             :     /*
     263             :      * If the connection needs to be remade due to invalidation, disconnect as
     264             :      * soon as we're out of all transactions.
     265             :      */
     266        4346 :     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
     267             :     {
     268           0 :         elog(DEBUG3, "closing connection %p for option changes to take effect",
     269             :              entry->conn);
     270           0 :         disconnect_pg_server(entry);
     271             :     }
     272             : 
     273             :     /*
     274             :      * If cache entry doesn't have a connection, we have to establish a new
     275             :      * connection.  (If connect_pg_server throws an error, the cache entry
     276             :      * will remain in a valid empty state, ie conn == NULL.)
     277             :      */
     278        4346 :     if (entry->conn == NULL)
     279         140 :         make_new_connection(entry, user);
     280             : 
     281             :     /*
     282             :      * We check the health of the cached connection here when using it.  In
     283             :      * cases where we're out of all transactions, if a broken connection is
     284             :      * detected, we try to reestablish a new connection later.
     285             :      */
     286        4334 :     PG_TRY();
     287             :     {
     288             :         /* Process a pending asynchronous request if any. */
     289        4334 :         if (entry->state.pendingAreq)
     290           0 :             process_pending_request(entry->state.pendingAreq);
     291             :         /* Start a new transaction or subtransaction if needed. */
     292        4334 :         begin_remote_xact(entry);
     293             :     }
     294           4 :     PG_CATCH();
     295             :     {
     296           4 :         MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
     297           4 :         ErrorData  *errdata = CopyErrorData();
     298             : 
     299             :         /*
     300             :          * Determine whether to try to reestablish the connection.
     301             :          *
     302             :          * After a broken connection is detected in libpq, any error other
     303             :          * than connection failure (e.g., out-of-memory) can be thrown
     304             :          * somewhere between return from libpq and the expected ereport() call
     305             :          * in pgfdw_report_error(). In this case, since PQstatus() indicates
     306             :          * CONNECTION_BAD, checking only PQstatus() causes the false detection
     307             :          * of connection failure. To avoid this, we also verify that the
     308             :          * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
     309             :          * checking only the sqlstate can cause another false detection
     310             :          * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
     311             :          * for any libpq-originated error condition.
     312             :          */
     313           4 :         if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
     314           4 :             PQstatus(entry->conn) != CONNECTION_BAD ||
     315           4 :             entry->xact_depth > 0)
     316             :         {
     317           2 :             MemoryContextSwitchTo(ecxt);
     318           2 :             PG_RE_THROW();
     319             :         }
     320             : 
     321             :         /* Clean up the error state */
     322           2 :         FlushErrorState();
     323           2 :         FreeErrorData(errdata);
     324           2 :         errdata = NULL;
     325             : 
     326           2 :         retry = true;
     327             :     }
     328        4332 :     PG_END_TRY();
     329             : 
     330             :     /*
     331             :      * If a broken connection is detected, disconnect it, reestablish a new
     332             :      * connection and retry a new remote transaction. If connection failure is
     333             :      * reported again, we give up getting a connection.
     334             :      */
     335        4332 :     if (retry)
     336             :     {
     337             :         Assert(entry->xact_depth == 0);
     338             : 
     339           2 :         ereport(DEBUG3,
     340             :                 (errmsg_internal("could not start remote transaction on connection %p",
     341             :                                  entry->conn)),
     342             :                 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
     343             : 
     344           2 :         elog(DEBUG3, "closing connection %p to reestablish a new one",
     345             :              entry->conn);
     346           2 :         disconnect_pg_server(entry);
     347             : 
     348           2 :         make_new_connection(entry, user);
     349             : 
     350           2 :         begin_remote_xact(entry);
     351             :     }
     352             : 
     353             :     /* Remember if caller will prepare statements */
     354        4332 :     entry->have_prep_stmt |= will_prep_stmt;
     355             : 
     356             :     /* If caller needs access to the per-connection state, return it. */
     357        4332 :     if (state)
     358        1466 :         *state = &entry->state;
     359             : 
     360        4332 :     return entry->conn;
     361             : }
     362             : 
     363             : /*
     364             :  * Reset all transient state fields in the cached connection entry and
     365             :  * establish new connection to the remote server.
     366             :  */
     367             : static void
     368         142 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
     369             : {
     370         142 :     ForeignServer *server = GetForeignServer(user->serverid);
     371             :     ListCell   *lc;
     372             : 
     373             :     Assert(entry->conn == NULL);
     374             : 
     375             :     /* Reset all transient state fields, to be sure all are clean */
     376         142 :     entry->xact_depth = 0;
     377         142 :     entry->have_prep_stmt = false;
     378         142 :     entry->have_error = false;
     379         142 :     entry->changing_xact_state = false;
     380         142 :     entry->invalidated = false;
     381         142 :     entry->serverid = server->serverid;
     382         142 :     entry->server_hashvalue =
     383         142 :         GetSysCacheHashValue1(FOREIGNSERVEROID,
     384             :                               ObjectIdGetDatum(server->serverid));
     385         142 :     entry->mapping_hashvalue =
     386         142 :         GetSysCacheHashValue1(USERMAPPINGOID,
     387             :                               ObjectIdGetDatum(user->umid));
     388         142 :     memset(&entry->state, 0, sizeof(entry->state));
     389             : 
     390             :     /*
     391             :      * Determine whether to keep the connection that we're about to make here
     392             :      * open even after the transaction using it ends, so that the subsequent
     393             :      * transactions can re-use it.
     394             :      *
     395             :      * By default, all the connections to any foreign servers are kept open.
     396             :      *
     397             :      * Also determine whether to commit/abort (sub)transactions opened on the
     398             :      * remote server in parallel at (sub)transaction end, which is disabled by
     399             :      * default.
     400             :      *
     401             :      * Note: it's enough to determine these only when making a new connection
     402             :      * because if these settings for it are changed, it will be closed and
     403             :      * re-made later.
     404             :      */
     405         142 :     entry->keep_connections = true;
     406         142 :     entry->parallel_commit = false;
     407         142 :     entry->parallel_abort = false;
     408         630 :     foreach(lc, server->options)
     409             :     {
     410         488 :         DefElem    *def = (DefElem *) lfirst(lc);
     411             : 
     412         488 :         if (strcmp(def->defname, "keep_connections") == 0)
     413          16 :             entry->keep_connections = defGetBoolean(def);
     414         472 :         else if (strcmp(def->defname, "parallel_commit") == 0)
     415           4 :             entry->parallel_commit = defGetBoolean(def);
     416         468 :         else if (strcmp(def->defname, "parallel_abort") == 0)
     417           4 :             entry->parallel_abort = defGetBoolean(def);
     418             :     }
     419             : 
     420             :     /* Now try to make the connection */
     421         142 :     entry->conn = connect_pg_server(server, user);
     422             : 
     423         130 :     elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
     424             :          entry->conn, server->servername, user->umid, user->userid);
     425         130 : }
     426             : 
     427             : /*
     428             :  * Check that non-superuser has used password or delegated credentials
     429             :  * to establish connection; otherwise, he's piggybacking on the
     430             :  * postgres server's user identity. See also dblink_security_check()
     431             :  * in contrib/dblink and check_conn_params.
     432             :  */
     433             : static void
     434         134 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
     435             : {
     436             :     /* Superusers bypass the check */
     437         134 :     if (superuser_arg(user->userid))
     438         118 :         return;
     439             : 
     440             : #ifdef ENABLE_GSS
     441             :     /* Connected via GSSAPI with delegated credentials- all good. */
     442             :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
     443             :         return;
     444             : #endif
     445             : 
     446             :     /* Ok if superuser set PW required false. */
     447          16 :     if (!UserMappingPasswordRequired(user))
     448           4 :         return;
     449             : 
     450             :     /* Connected via PW, with PW required true, and provided non-empty PW. */
     451          12 :     if (PQconnectionUsedPassword(conn))
     452             :     {
     453             :         /* ok if params contain a non-empty password */
     454          80 :         for (int i = 0; keywords[i] != NULL; i++)
     455             :         {
     456          72 :             if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     457           0 :                 return;
     458             :         }
     459             :     }
     460             : 
     461             :     /*
     462             :      * Ok if SCRAM pass-through is being used and all required SCRAM options
     463             :      * are set correctly. If pgfdw_has_required_scram_options returns true we
     464             :      * assume that UseScramPassthrough is also true since SCRAM options are
     465             :      * only set when UseScramPassthrough is enabled.
     466             :      */
     467          12 :     if (MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
     468           8 :         return;
     469             : 
     470           4 :     ereport(ERROR,
     471             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     472             :              errmsg("password or GSSAPI delegated credentials required"),
     473             :              errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
     474             :              errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
     475             : }
     476             : 
     477             : /*
     478             :  * Connect to remote server using specified server and user mapping properties.
     479             :  */
     480             : static PGconn *
     481         142 : connect_pg_server(ForeignServer *server, UserMapping *user)
     482             : {
     483         142 :     PGconn     *volatile conn = NULL;
     484             : 
     485             :     /*
     486             :      * Use PG_TRY block to ensure closing connection on error.
     487             :      */
     488         142 :     PG_TRY();
     489             :     {
     490             :         const char **keywords;
     491             :         const char **values;
     492         142 :         char       *appname = NULL;
     493             :         int         n;
     494             : 
     495             :         /*
     496             :          * Construct connection params from generic options of ForeignServer
     497             :          * and UserMapping.  (Some of them might not be libpq options, in
     498             :          * which case we'll just waste a few array slots.)  Add 4 extra slots
     499             :          * for application_name, fallback_application_name, client_encoding,
     500             :          * end marker, and 3 extra slots for scram keys and required scram
     501             :          * pass-through options.
     502             :          */
     503         142 :         n = list_length(server->options) + list_length(user->options) + 4 + 3;
     504         142 :         keywords = (const char **) palloc(n * sizeof(char *));
     505         142 :         values = (const char **) palloc(n * sizeof(char *));
     506             : 
     507         142 :         n = 0;
     508         284 :         n += ExtractConnectionOptions(server->options,
     509         142 :                                       keywords + n, values + n);
     510         284 :         n += ExtractConnectionOptions(user->options,
     511         142 :                                       keywords + n, values + n);
     512             : 
     513             :         /*
     514             :          * Use pgfdw_application_name as application_name if set.
     515             :          *
     516             :          * PQconnectdbParams() processes the parameter arrays from start to
     517             :          * end. If any key word is repeated, the last value is used. Therefore
     518             :          * note that pgfdw_application_name must be added to the arrays after
     519             :          * options of ForeignServer are, so that it can override
     520             :          * application_name set in ForeignServer.
     521             :          */
     522         142 :         if (pgfdw_application_name && *pgfdw_application_name != '\0')
     523             :         {
     524           2 :             keywords[n] = "application_name";
     525           2 :             values[n] = pgfdw_application_name;
     526           2 :             n++;
     527             :         }
     528             : 
     529             :         /*
     530             :          * Search the parameter arrays to find application_name setting, and
     531             :          * replace escape sequences in it with status information if found.
     532             :          * The arrays are searched backwards because the last value is used if
     533             :          * application_name is repeatedly set.
     534             :          */
     535         378 :         for (int i = n - 1; i >= 0; i--)
     536             :         {
     537         272 :             if (strcmp(keywords[i], "application_name") == 0 &&
     538          36 :                 *(values[i]) != '\0')
     539             :             {
     540             :                 /*
     541             :                  * Use this application_name setting if it's not empty string
     542             :                  * even after any escape sequences in it are replaced.
     543             :                  */
     544          36 :                 appname = process_pgfdw_appname(values[i]);
     545          36 :                 if (appname[0] != '\0')
     546             :                 {
     547          36 :                     values[i] = appname;
     548          36 :                     break;
     549             :                 }
     550             : 
     551             :                 /*
     552             :                  * This empty application_name is not used, so we set
     553             :                  * values[i] to NULL and keep searching the array to find the
     554             :                  * next one.
     555             :                  */
     556           0 :                 values[i] = NULL;
     557           0 :                 pfree(appname);
     558           0 :                 appname = NULL;
     559             :             }
     560             :         }
     561             : 
     562             :         /* Use "postgres_fdw" as fallback_application_name */
     563         142 :         keywords[n] = "fallback_application_name";
     564         142 :         values[n] = "postgres_fdw";
     565         142 :         n++;
     566             : 
     567             :         /* Set client_encoding so that libpq can convert encoding properly. */
     568         142 :         keywords[n] = "client_encoding";
     569         142 :         values[n] = GetDatabaseEncodingName();
     570         142 :         n++;
     571             : 
     572             :         /* Add required SCRAM pass-through connection options if it's enabled. */
     573         142 :         if (MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
     574             :         {
     575             :             int         len;
     576             :             int         encoded_len;
     577             : 
     578           8 :             keywords[n] = "scram_client_key";
     579           8 :             len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
     580             :             /* don't forget the zero-terminator */
     581           8 :             values[n] = palloc0(len + 1);
     582          16 :             encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
     583             :                                         sizeof(MyProcPort->scram_ClientKey),
     584           8 :                                         (char *) values[n], len);
     585           8 :             if (encoded_len < 0)
     586           0 :                 elog(ERROR, "could not encode SCRAM client key");
     587           8 :             n++;
     588             : 
     589           8 :             keywords[n] = "scram_server_key";
     590           8 :             len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
     591             :             /* don't forget the zero-terminator */
     592           8 :             values[n] = palloc0(len + 1);
     593          16 :             encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
     594             :                                         sizeof(MyProcPort->scram_ServerKey),
     595           8 :                                         (char *) values[n], len);
     596           8 :             if (encoded_len < 0)
     597           0 :                 elog(ERROR, "could not encode SCRAM server key");
     598           8 :             n++;
     599             : 
     600             :             /*
     601             :              * Require scram-sha-256 to ensure that no other auth method is
     602             :              * used when connecting with foreign server.
     603             :              */
     604           8 :             keywords[n] = "require_auth";
     605           8 :             values[n] = "scram-sha-256";
     606           8 :             n++;
     607             :         }
     608             : 
     609         142 :         keywords[n] = values[n] = NULL;
     610             : 
     611             :         /* Verify the set of connection parameters. */
     612         142 :         check_conn_params(keywords, values, user);
     613             : 
     614             :         /* first time, allocate or get the custom wait event */
     615         138 :         if (pgfdw_we_connect == 0)
     616          18 :             pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
     617             : 
     618             :         /* OK to make connection */
     619         138 :         conn = libpqsrv_connect_params(keywords, values,
     620             :                                        false,   /* expand_dbname */
     621             :                                        pgfdw_we_connect);
     622             : 
     623         138 :         if (!conn || PQstatus(conn) != CONNECTION_OK)
     624           4 :             ereport(ERROR,
     625             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     626             :                      errmsg("could not connect to server \"%s\"",
     627             :                             server->servername),
     628             :                      errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
     629             : 
     630         134 :         PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
     631             :                             "received message via remote connection");
     632             : 
     633             :         /* Perform post-connection security checks. */
     634         134 :         pgfdw_security_check(keywords, values, user, conn);
     635             : 
     636             :         /* Prepare new session for use */
     637         130 :         configure_remote_session(conn);
     638             : 
     639         130 :         if (appname != NULL)
     640          36 :             pfree(appname);
     641         130 :         pfree(keywords);
     642         130 :         pfree(values);
     643             :     }
     644          12 :     PG_CATCH();
     645             :     {
     646          12 :         libpqsrv_disconnect(conn);
     647          12 :         PG_RE_THROW();
     648             :     }
     649         130 :     PG_END_TRY();
     650             : 
     651         130 :     return conn;
     652             : }
     653             : 
     654             : /*
     655             :  * Disconnect any open connection for a connection cache entry.
     656             :  */
     657             : static void
     658         112 : disconnect_pg_server(ConnCacheEntry *entry)
     659             : {
     660         112 :     if (entry->conn != NULL)
     661             :     {
     662         112 :         libpqsrv_disconnect(entry->conn);
     663         112 :         entry->conn = NULL;
     664             :     }
     665         112 : }
     666             : 
     667             : /*
     668             :  * Return true if the password_required is defined and false for this user
     669             :  * mapping, otherwise false. The mapping has been pre-validated.
     670             :  */
     671             : static bool
     672          30 : UserMappingPasswordRequired(UserMapping *user)
     673             : {
     674             :     ListCell   *cell;
     675             : 
     676          52 :     foreach(cell, user->options)
     677             :     {
     678          28 :         DefElem    *def = (DefElem *) lfirst(cell);
     679             : 
     680          28 :         if (strcmp(def->defname, "password_required") == 0)
     681           6 :             return defGetBoolean(def);
     682             :     }
     683             : 
     684          24 :     return true;
     685             : }
     686             : 
     687             : static bool
     688           8 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
     689             : {
     690             :     ListCell   *cell;
     691             : 
     692          32 :     foreach(cell, server->options)
     693             :     {
     694          32 :         DefElem    *def = (DefElem *) lfirst(cell);
     695             : 
     696          32 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
     697           8 :             return defGetBoolean(def);
     698             :     }
     699             : 
     700           0 :     foreach(cell, user->options)
     701             :     {
     702           0 :         DefElem    *def = (DefElem *) lfirst(cell);
     703             : 
     704           0 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
     705           0 :             return defGetBoolean(def);
     706             :     }
     707             : 
     708           0 :     return false;
     709             : }
     710             : 
     711             : /*
     712             :  * For non-superusers, insist that the connstr specify a password or that the
     713             :  * user provided their own GSSAPI delegated credentials.  This
     714             :  * prevents a password from being picked up from .pgpass, a service file, the
     715             :  * environment, etc.  We don't want the postgres user's passwords,
     716             :  * certificates, etc to be accessible to non-superusers.  (See also
     717             :  * dblink_connstr_check in contrib/dblink.)
     718             :  */
     719             : static void
     720         142 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
     721             : {
     722             :     int         i;
     723             : 
     724             :     /* no check required if superuser */
     725         142 :     if (superuser_arg(user->userid))
     726         122 :         return;
     727             : 
     728             : #ifdef ENABLE_GSS
     729             :     /* ok if the user provided their own delegated credentials */
     730             :     if (be_gssapi_get_delegation(MyProcPort))
     731             :         return;
     732             : #endif
     733             : 
     734             :     /* ok if params contain a non-empty password */
     735         128 :     for (i = 0; keywords[i] != NULL; i++)
     736             :     {
     737         114 :         if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     738           6 :             return;
     739             :     }
     740             : 
     741             :     /* ok if the superuser explicitly said so at user mapping creation time */
     742          14 :     if (!UserMappingPasswordRequired(user))
     743           2 :         return;
     744             : 
     745             :     /*
     746             :      * Ok if SCRAM pass-through is being used and all required scram options
     747             :      * are set correctly. If pgfdw_has_required_scram_options returns true we
     748             :      * assume that UseScramPassthrough is also true since SCRAM options are
     749             :      * only set when UseScramPassthrough is enabled.
     750             :      */
     751          12 :     if (MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
     752           8 :         return;
     753             : 
     754           4 :     ereport(ERROR,
     755             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     756             :              errmsg("password or GSSAPI delegated credentials required"),
     757             :              errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
     758             : }
     759             : 
     760             : /*
     761             :  * Issue SET commands to make sure remote session is configured properly.
     762             :  *
     763             :  * We do this just once at connection, assuming nothing will change the
     764             :  * values later.  Since we'll never send volatile function calls to the
     765             :  * remote, there shouldn't be any way to break this assumption from our end.
     766             :  * It's possible to think of ways to break it at the remote end, eg making
     767             :  * a foreign table point to a view that includes a set_config call ---
     768             :  * but once you admit the possibility of a malicious view definition,
     769             :  * there are any number of ways to break things.
     770             :  */
     771             : static void
     772         130 : configure_remote_session(PGconn *conn)
     773             : {
     774         130 :     int         remoteversion = PQserverVersion(conn);
     775             : 
     776             :     /* Force the search path to contain only pg_catalog (see deparse.c) */
     777         130 :     do_sql_command(conn, "SET search_path = pg_catalog");
     778             : 
     779             :     /*
     780             :      * Set remote timezone; this is basically just cosmetic, since all
     781             :      * transmitted and returned timestamptzs should specify a zone explicitly
     782             :      * anyway.  However it makes the regression test outputs more predictable.
     783             :      *
     784             :      * We don't risk setting remote zone equal to ours, since the remote
     785             :      * server might use a different timezone database.  Instead, use GMT
     786             :      * (quoted, because very old servers are picky about case).  That's
     787             :      * guaranteed to work regardless of the remote's timezone database,
     788             :      * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
     789             :      */
     790         130 :     do_sql_command(conn, "SET timezone = 'GMT'");
     791             : 
     792             :     /*
     793             :      * Set values needed to ensure unambiguous data output from remote.  (This
     794             :      * logic should match what pg_dump does.  See also set_transmission_modes
     795             :      * in postgres_fdw.c.)
     796             :      */
     797         130 :     do_sql_command(conn, "SET datestyle = ISO");
     798         130 :     if (remoteversion >= 80400)
     799         130 :         do_sql_command(conn, "SET intervalstyle = postgres");
     800         130 :     if (remoteversion >= 90000)
     801         130 :         do_sql_command(conn, "SET extra_float_digits = 3");
     802             :     else
     803           0 :         do_sql_command(conn, "SET extra_float_digits = 2");
     804         130 : }
     805             : 
     806             : /*
     807             :  * Convenience subroutine to issue a non-data-returning SQL command to remote
     808             :  */
     809             : void
     810        3540 : do_sql_command(PGconn *conn, const char *sql)
     811             : {
     812        3540 :     do_sql_command_begin(conn, sql);
     813        3540 :     do_sql_command_end(conn, sql, false);
     814        3534 : }
     815             : 
     816             : static void
     817        3576 : do_sql_command_begin(PGconn *conn, const char *sql)
     818             : {
     819        3576 :     if (!PQsendQuery(conn, sql))
     820           0 :         pgfdw_report_error(NULL, conn, sql);
     821        3576 : }
     822             : 
     823             : static void
     824        3576 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
     825             : {
     826             :     PGresult   *res;
     827             : 
     828             :     /*
     829             :      * If requested, consume whatever data is available from the socket. (Note
     830             :      * that if all data is available, this allows pgfdw_get_result to call
     831             :      * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
     832             :      * would be large compared to the overhead of PQconsumeInput.)
     833             :      */
     834        3576 :     if (consume_input && !PQconsumeInput(conn))
     835           0 :         pgfdw_report_error(NULL, conn, sql);
     836        3576 :     res = pgfdw_get_result(conn);
     837        3576 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     838           6 :         pgfdw_report_error(res, conn, sql);
     839        3570 :     PQclear(res);
     840        3570 : }
     841             : 
     842             : /*
     843             :  * Start remote transaction or subtransaction, if needed.
     844             :  *
     845             :  * Note that we always use at least REPEATABLE READ in the remote session.
     846             :  * This is so that, if a query initiates multiple scans of the same or
     847             :  * different foreign tables, we will get snapshot-consistent results from
     848             :  * those scans.  A disadvantage is that we can't provide sane emulation of
     849             :  * READ COMMITTED behavior --- it would be nice if we had some other way to
     850             :  * control which remote queries share a snapshot.
     851             :  */
     852             : static void
     853        4336 : begin_remote_xact(ConnCacheEntry *entry)
     854             : {
     855        4336 :     int         curlevel = GetCurrentTransactionNestLevel();
     856             : 
     857             :     /* Start main transaction if we haven't yet */
     858        4336 :     if (entry->xact_depth <= 0)
     859             :     {
     860             :         const char *sql;
     861             : 
     862        1474 :         elog(DEBUG3, "starting remote transaction on connection %p",
     863             :              entry->conn);
     864             : 
     865        1474 :         if (IsolationIsSerializable())
     866           0 :             sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
     867             :         else
     868        1474 :             sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
     869        1474 :         entry->changing_xact_state = true;
     870        1474 :         do_sql_command(entry->conn, sql);
     871        1472 :         entry->xact_depth = 1;
     872        1472 :         entry->changing_xact_state = false;
     873             :     }
     874             : 
     875             :     /*
     876             :      * If we're in a subtransaction, stack up savepoints to match our level.
     877             :      * This ensures we can rollback just the desired effects when a
     878             :      * subtransaction aborts.
     879             :      */
     880        4362 :     while (entry->xact_depth < curlevel)
     881             :     {
     882             :         char        sql[64];
     883             : 
     884          30 :         snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
     885          30 :         entry->changing_xact_state = true;
     886          30 :         do_sql_command(entry->conn, sql);
     887          28 :         entry->xact_depth++;
     888          28 :         entry->changing_xact_state = false;
     889             :     }
     890        4332 : }
     891             : 
     892             : /*
     893             :  * Release connection reference count created by calling GetConnection.
     894             :  */
     895             : void
     896        4226 : ReleaseConnection(PGconn *conn)
     897             : {
     898             :     /*
     899             :      * Currently, we don't actually track connection references because all
     900             :      * cleanup is managed on a transaction or subtransaction basis instead. So
     901             :      * there's nothing to do here.
     902             :      */
     903        4226 : }
     904             : 
     905             : /*
     906             :  * Assign a "unique" number for a cursor.
     907             :  *
     908             :  * These really only need to be unique per connection within a transaction.
     909             :  * For the moment we ignore the per-connection point and assign them across
     910             :  * all connections in the transaction, but we ask for the connection to be
     911             :  * supplied in case we want to refine that.
     912             :  *
     913             :  * Note that even if wraparound happens in a very long transaction, actual
     914             :  * collisions are highly improbable; just be sure to use %u not %d to print.
     915             :  */
     916             : unsigned int
     917        1060 : GetCursorNumber(PGconn *conn)
     918             : {
     919        1060 :     return ++cursor_number;
     920             : }
     921             : 
     922             : /*
     923             :  * Assign a "unique" number for a prepared statement.
     924             :  *
     925             :  * This works much like GetCursorNumber, except that we never reset the counter
     926             :  * within a session.  That's because we can't be 100% sure we've gotten rid
     927             :  * of all prepared statements on all connections, and it's not really worth
     928             :  * increasing the risk of prepared-statement name collisions by resetting.
     929             :  */
     930             : unsigned int
     931         356 : GetPrepStmtNumber(PGconn *conn)
     932             : {
     933         356 :     return ++prep_stmt_number;
     934             : }
     935             : 
     936             : /*
     937             :  * Submit a query and wait for the result.
     938             :  *
     939             :  * Since we don't use non-blocking mode, this can't process interrupts while
     940             :  * pushing the query text to the server.  That risk is relatively small, so we
     941             :  * ignore that for now.
     942             :  *
     943             :  * Caller is responsible for the error handling on the result.
     944             :  */
     945             : PGresult *
     946        7980 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
     947             : {
     948             :     /* First, process a pending asynchronous request, if any. */
     949        7980 :     if (state && state->pendingAreq)
     950           8 :         process_pending_request(state->pendingAreq);
     951             : 
     952        7980 :     if (!PQsendQuery(conn, query))
     953           0 :         return NULL;
     954        7980 :     return pgfdw_get_result(conn);
     955             : }
     956             : 
     957             : /*
     958             :  * Wrap libpqsrv_get_result_last(), adding wait event.
     959             :  *
     960             :  * Caller is responsible for the error handling on the result.
     961             :  */
     962             : PGresult *
     963       16102 : pgfdw_get_result(PGconn *conn)
     964             : {
     965       16102 :     return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
     966             : }
     967             : 
     968             : /*
     969             :  * Report an error we got from the remote server.
     970             :  *
     971             :  * Callers should use pgfdw_report_error() to throw an error, or use
     972             :  * pgfdw_report() for lesser message levels.  (We make this distinction
     973             :  * so that pgfdw_report_error() can be marked noreturn.)
     974             :  *
     975             :  * res: PGresult containing the error (might be NULL)
     976             :  * conn: connection we did the query on
     977             :  * sql: NULL, or text of remote command we tried to execute
     978             :  *
     979             :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
     980             :  * in which case memory context cleanup will clear it eventually).
     981             :  *
     982             :  * Note: callers that choose not to throw ERROR for a remote error are
     983             :  * responsible for making sure that the associated ConnCacheEntry gets
     984             :  * marked with have_error = true.
     985             :  */
     986             : void
     987          32 : pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
     988             : {
     989          32 :     pgfdw_report_internal(ERROR, res, conn, sql);
     990           0 :     pg_unreachable();
     991             : }
     992             : 
     993             : void
     994           0 : pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
     995             : {
     996             :     Assert(elevel < ERROR);      /* use pgfdw_report_error for that */
     997           0 :     pgfdw_report_internal(elevel, res, conn, sql);
     998           0 : }
     999             : 
    1000             : static void
    1001          32 : pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
    1002             :                       const char *sql)
    1003             : {
    1004          32 :     char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    1005          32 :     char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    1006          32 :     char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    1007          32 :     char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    1008          32 :     char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    1009             :     int         sqlstate;
    1010             : 
    1011          32 :     if (diag_sqlstate)
    1012          28 :         sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
    1013             :                                  diag_sqlstate[1],
    1014             :                                  diag_sqlstate[2],
    1015             :                                  diag_sqlstate[3],
    1016             :                                  diag_sqlstate[4]);
    1017             :     else
    1018           4 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    1019             : 
    1020             :     /*
    1021             :      * If we don't get a message from the PGresult, try the PGconn.  This is
    1022             :      * needed because for connection-level failures, PQgetResult may just
    1023             :      * return NULL, not a PGresult at all.
    1024             :      */
    1025          32 :     if (message_primary == NULL)
    1026           4 :         message_primary = pchomp(PQerrorMessage(conn));
    1027             : 
    1028          32 :     ereport(elevel,
    1029             :             (errcode(sqlstate),
    1030             :              (message_primary != NULL && message_primary[0] != '\0') ?
    1031             :              errmsg_internal("%s", message_primary) :
    1032             :              errmsg("could not obtain message string for remote error"),
    1033             :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    1034             :              message_hint ? errhint("%s", message_hint) : 0,
    1035             :              message_context ? errcontext("%s", message_context) : 0,
    1036             :              sql ? errcontext("remote SQL command: %s", sql) : 0));
    1037           0 :     PQclear(res);
    1038           0 : }
    1039             : 
    1040             : /*
    1041             :  * pgfdw_xact_callback --- cleanup at main-transaction end.
    1042             :  *
    1043             :  * This runs just late enough that it must not enter user-defined code
    1044             :  * locally.  (Entering such code on the remote side is fine.  Its remote
    1045             :  * COMMIT TRANSACTION may run deferred triggers.)
    1046             :  */
    1047             : static void
    1048        7942 : pgfdw_xact_callback(XactEvent event, void *arg)
    1049             : {
    1050             :     HASH_SEQ_STATUS scan;
    1051             :     ConnCacheEntry *entry;
    1052        7942 :     List       *pending_entries = NIL;
    1053        7942 :     List       *cancel_requested = NIL;
    1054             : 
    1055             :     /* Quick exit if no connections were touched in this transaction. */
    1056        7942 :     if (!xact_got_connection)
    1057        6532 :         return;
    1058             : 
    1059             :     /*
    1060             :      * Scan all connection cache entries to find open remote transactions, and
    1061             :      * close them.
    1062             :      */
    1063        1410 :     hash_seq_init(&scan, ConnectionHash);
    1064        7226 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1065             :     {
    1066             :         PGresult   *res;
    1067             : 
    1068             :         /* Ignore cache entry if no open connection right now */
    1069        5818 :         if (entry->conn == NULL)
    1070        3288 :             continue;
    1071             : 
    1072             :         /* If it has an open remote transaction, try to close it */
    1073        2530 :         if (entry->xact_depth > 0)
    1074             :         {
    1075        1474 :             elog(DEBUG3, "closing remote transaction on connection %p",
    1076             :                  entry->conn);
    1077             : 
    1078        1474 :             switch (event)
    1079             :             {
    1080        1384 :                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
    1081             :                 case XACT_EVENT_PRE_COMMIT:
    1082             : 
    1083             :                     /*
    1084             :                      * If abort cleanup previously failed for this connection,
    1085             :                      * we can't issue any more commands against it.
    1086             :                      */
    1087        1384 :                     pgfdw_reject_incomplete_xact_state_change(entry);
    1088             : 
    1089             :                     /* Commit all remote transactions during pre-commit */
    1090        1384 :                     entry->changing_xact_state = true;
    1091        1384 :                     if (entry->parallel_commit)
    1092             :                     {
    1093          32 :                         do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
    1094          32 :                         pending_entries = lappend(pending_entries, entry);
    1095          32 :                         continue;
    1096             :                     }
    1097        1352 :                     do_sql_command(entry->conn, "COMMIT TRANSACTION");
    1098        1352 :                     entry->changing_xact_state = false;
    1099             : 
    1100             :                     /*
    1101             :                      * If there were any errors in subtransactions, and we
    1102             :                      * made prepared statements, do a DEALLOCATE ALL to make
    1103             :                      * sure we get rid of all prepared statements. This is
    1104             :                      * annoying and not terribly bulletproof, but it's
    1105             :                      * probably not worth trying harder.
    1106             :                      *
    1107             :                      * DEALLOCATE ALL only exists in 8.3 and later, so this
    1108             :                      * constrains how old a server postgres_fdw can
    1109             :                      * communicate with.  We intentionally ignore errors in
    1110             :                      * the DEALLOCATE, so that we can hobble along to some
    1111             :                      * extent with older servers (leaking prepared statements
    1112             :                      * as we go; but we don't really support update operations
    1113             :                      * pre-8.3 anyway).
    1114             :                      */
    1115        1352 :                     if (entry->have_prep_stmt && entry->have_error)
    1116             :                     {
    1117           0 :                         res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
    1118             :                                                NULL);
    1119           0 :                         PQclear(res);
    1120             :                     }
    1121        1352 :                     entry->have_prep_stmt = false;
    1122        1352 :                     entry->have_error = false;
    1123        1352 :                     break;
    1124           2 :                 case XACT_EVENT_PRE_PREPARE:
    1125             : 
    1126             :                     /*
    1127             :                      * We disallow any remote transactions, since it's not
    1128             :                      * very reasonable to hold them open until the prepared
    1129             :                      * transaction is committed.  For the moment, throw error
    1130             :                      * unconditionally; later we might allow read-only cases.
    1131             :                      * Note that the error will cause us to come right back
    1132             :                      * here with event == XACT_EVENT_ABORT, so we'll clean up
    1133             :                      * the connection state at that point.
    1134             :                      */
    1135           2 :                     ereport(ERROR,
    1136             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1137             :                              errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
    1138             :                     break;
    1139           0 :                 case XACT_EVENT_PARALLEL_COMMIT:
    1140             :                 case XACT_EVENT_COMMIT:
    1141             :                 case XACT_EVENT_PREPARE:
    1142             :                     /* Pre-commit should have closed the open transaction */
    1143           0 :                     elog(ERROR, "missed cleaning up connection during pre-commit");
    1144             :                     break;
    1145          88 :                 case XACT_EVENT_PARALLEL_ABORT:
    1146             :                 case XACT_EVENT_ABORT:
    1147             :                     /* Rollback all remote transactions during abort */
    1148          88 :                     if (entry->parallel_abort)
    1149             :                     {
    1150           8 :                         if (pgfdw_abort_cleanup_begin(entry, true,
    1151             :                                                       &pending_entries,
    1152             :                                                       &cancel_requested))
    1153           8 :                             continue;
    1154             :                     }
    1155             :                     else
    1156          80 :                         pgfdw_abort_cleanup(entry, true);
    1157          80 :                     break;
    1158             :             }
    1159             :         }
    1160             : 
    1161             :         /* Reset state to show we're out of a transaction */
    1162        2488 :         pgfdw_reset_xact_state(entry, true);
    1163             :     }
    1164             : 
    1165             :     /* If there are any pending connections, finish cleaning them up */
    1166        1408 :     if (pending_entries || cancel_requested)
    1167             :     {
    1168          30 :         if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
    1169             :             event == XACT_EVENT_PRE_COMMIT)
    1170             :         {
    1171             :             Assert(cancel_requested == NIL);
    1172          26 :             pgfdw_finish_pre_commit_cleanup(pending_entries);
    1173             :         }
    1174             :         else
    1175             :         {
    1176             :             Assert(event == XACT_EVENT_PARALLEL_ABORT ||
    1177             :                    event == XACT_EVENT_ABORT);
    1178           4 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1179             :                                        true);
    1180             :         }
    1181             :     }
    1182             : 
    1183             :     /*
    1184             :      * Regardless of the event type, we can now mark ourselves as out of the
    1185             :      * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
    1186             :      * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
    1187             :      */
    1188        1408 :     xact_got_connection = false;
    1189             : 
    1190             :     /* Also reset cursor numbering for next transaction */
    1191        1408 :     cursor_number = 0;
    1192             : }
    1193             : 
    1194             : /*
    1195             :  * pgfdw_subxact_callback --- cleanup at subtransaction end.
    1196             :  */
    1197             : static void
    1198          76 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
    1199             :                        SubTransactionId parentSubid, void *arg)
    1200             : {
    1201             :     HASH_SEQ_STATUS scan;
    1202             :     ConnCacheEntry *entry;
    1203             :     int         curlevel;
    1204          76 :     List       *pending_entries = NIL;
    1205          76 :     List       *cancel_requested = NIL;
    1206             : 
    1207             :     /* Nothing to do at subxact start, nor after commit. */
    1208          76 :     if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
    1209             :           event == SUBXACT_EVENT_ABORT_SUB))
    1210          46 :         return;
    1211             : 
    1212             :     /* Quick exit if no connections were touched in this transaction. */
    1213          30 :     if (!xact_got_connection)
    1214           0 :         return;
    1215             : 
    1216             :     /*
    1217             :      * Scan all connection cache entries to find open remote subtransactions
    1218             :      * of the current level, and close them.
    1219             :      */
    1220          30 :     curlevel = GetCurrentTransactionNestLevel();
    1221          30 :     hash_seq_init(&scan, ConnectionHash);
    1222         204 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1223             :     {
    1224             :         char        sql[100];
    1225             : 
    1226             :         /*
    1227             :          * We only care about connections with open remote subtransactions of
    1228             :          * the current level.
    1229             :          */
    1230         174 :         if (entry->conn == NULL || entry->xact_depth < curlevel)
    1231         158 :             continue;
    1232             : 
    1233          28 :         if (entry->xact_depth > curlevel)
    1234           0 :             elog(ERROR, "missed cleaning up remote subtransaction at level %d",
    1235             :                  entry->xact_depth);
    1236             : 
    1237          28 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1238             :         {
    1239             :             /*
    1240             :              * If abort cleanup previously failed for this connection, we
    1241             :              * can't issue any more commands against it.
    1242             :              */
    1243          14 :             pgfdw_reject_incomplete_xact_state_change(entry);
    1244             : 
    1245             :             /* Commit all remote subtransactions during pre-commit */
    1246          14 :             snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1247          14 :             entry->changing_xact_state = true;
    1248          14 :             if (entry->parallel_commit)
    1249             :             {
    1250           4 :                 do_sql_command_begin(entry->conn, sql);
    1251           4 :                 pending_entries = lappend(pending_entries, entry);
    1252           4 :                 continue;
    1253             :             }
    1254          10 :             do_sql_command(entry->conn, sql);
    1255          10 :             entry->changing_xact_state = false;
    1256             :         }
    1257             :         else
    1258             :         {
    1259             :             /* Rollback all remote subtransactions during abort */
    1260          14 :             if (entry->parallel_abort)
    1261             :             {
    1262           8 :                 if (pgfdw_abort_cleanup_begin(entry, false,
    1263             :                                               &pending_entries,
    1264             :                                               &cancel_requested))
    1265           8 :                     continue;
    1266             :             }
    1267             :             else
    1268           6 :                 pgfdw_abort_cleanup(entry, false);
    1269             :         }
    1270             : 
    1271             :         /* OK, we're outta that level of subtransaction */
    1272          16 :         pgfdw_reset_xact_state(entry, false);
    1273             :     }
    1274             : 
    1275             :     /* If there are any pending connections, finish cleaning them up */
    1276          30 :     if (pending_entries || cancel_requested)
    1277             :     {
    1278           6 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1279             :         {
    1280             :             Assert(cancel_requested == NIL);
    1281           2 :             pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
    1282             :         }
    1283             :         else
    1284             :         {
    1285             :             Assert(event == SUBXACT_EVENT_ABORT_SUB);
    1286           4 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1287             :                                        false);
    1288             :         }
    1289             :     }
    1290             : }
    1291             : 
    1292             : /*
    1293             :  * Connection invalidation callback function
    1294             :  *
    1295             :  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
    1296             :  * close connections depending on that entry immediately if current transaction
    1297             :  * has not used those connections yet. Otherwise, mark those connections as
    1298             :  * invalid and then make pgfdw_xact_callback() close them at the end of current
    1299             :  * transaction, since they cannot be closed in the midst of the transaction
    1300             :  * using them. Closed connections will be remade at the next opportunity if
    1301             :  * necessary.
    1302             :  *
    1303             :  * Although most cache invalidation callbacks blow away all the related stuff
    1304             :  * regardless of the given hashvalue, connections are expensive enough that
    1305             :  * it's worth trying to avoid that.
    1306             :  *
    1307             :  * NB: We could avoid unnecessary disconnection more strictly by examining
    1308             :  * individual option values, but it seems too much effort for the gain.
    1309             :  */
    1310             : static void
    1311         344 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
    1312             : {
    1313             :     HASH_SEQ_STATUS scan;
    1314             :     ConnCacheEntry *entry;
    1315             : 
    1316             :     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
    1317             : 
    1318             :     /* ConnectionHash must exist already, if we're registered */
    1319         344 :     hash_seq_init(&scan, ConnectionHash);
    1320        2332 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1321             :     {
    1322             :         /* Ignore invalid entries */
    1323        1988 :         if (entry->conn == NULL)
    1324        1624 :             continue;
    1325             : 
    1326             :         /* hashvalue == 0 means a cache reset, must clear all state */
    1327         364 :         if (hashvalue == 0 ||
    1328         258 :             (cacheid == FOREIGNSERVEROID &&
    1329         364 :              entry->server_hashvalue == hashvalue) ||
    1330         106 :             (cacheid == USERMAPPINGOID &&
    1331         106 :              entry->mapping_hashvalue == hashvalue))
    1332             :         {
    1333             :             /*
    1334             :              * Close the connection immediately if it's not used yet in this
    1335             :              * transaction. Otherwise mark it as invalid so that
    1336             :              * pgfdw_xact_callback() can close it at the end of this
    1337             :              * transaction.
    1338             :              */
    1339          98 :             if (entry->xact_depth == 0)
    1340             :             {
    1341          92 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    1342          92 :                 disconnect_pg_server(entry);
    1343             :             }
    1344             :             else
    1345           6 :                 entry->invalidated = true;
    1346             :         }
    1347             :     }
    1348         344 : }
    1349             : 
    1350             : /*
    1351             :  * Raise an error if the given connection cache entry is marked as being
    1352             :  * in the middle of an xact state change.  This should be called at which no
    1353             :  * such change is expected to be in progress; if one is found to be in
    1354             :  * progress, it means that we aborted in the middle of a previous state change
    1355             :  * and now don't know what the remote transaction state actually is.
    1356             :  * Such connections can't safely be further used.  Re-establishing the
    1357             :  * connection would change the snapshot and roll back any writes already
    1358             :  * performed, so that's not an option, either. Thus, we must abort.
    1359             :  */
    1360             : static void
    1361        5744 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
    1362             : {
    1363             :     ForeignServer *server;
    1364             : 
    1365             :     /* nothing to do for inactive entries and entries of sane state */
    1366        5744 :     if (entry->conn == NULL || !entry->changing_xact_state)
    1367        5744 :         return;
    1368             : 
    1369             :     /* make sure this entry is inactive */
    1370           0 :     disconnect_pg_server(entry);
    1371             : 
    1372             :     /* find server name to be shown in the message below */
    1373           0 :     server = GetForeignServer(entry->serverid);
    1374             : 
    1375           0 :     ereport(ERROR,
    1376             :             (errcode(ERRCODE_CONNECTION_EXCEPTION),
    1377             :              errmsg("connection to server \"%s\" was lost",
    1378             :                     server->servername)));
    1379             : }
    1380             : 
    1381             : /*
    1382             :  * Reset state to show we're out of a (sub)transaction.
    1383             :  */
    1384             : static void
    1385        2556 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
    1386             : {
    1387        2556 :     if (toplevel)
    1388             :     {
    1389             :         /* Reset state to show we're out of a transaction */
    1390        2528 :         entry->xact_depth = 0;
    1391             : 
    1392             :         /*
    1393             :          * If the connection isn't in a good idle state, it is marked as
    1394             :          * invalid or keep_connections option of its server is disabled, then
    1395             :          * discard it to recover. Next GetConnection will open a new
    1396             :          * connection.
    1397             :          */
    1398        5054 :         if (PQstatus(entry->conn) != CONNECTION_OK ||
    1399        2526 :             PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
    1400        2526 :             entry->changing_xact_state ||
    1401        2526 :             entry->invalidated ||
    1402        2522 :             !entry->keep_connections)
    1403             :         {
    1404           8 :             elog(DEBUG3, "discarding connection %p", entry->conn);
    1405           8 :             disconnect_pg_server(entry);
    1406             :         }
    1407             :     }
    1408             :     else
    1409             :     {
    1410             :         /* Reset state to show we're out of a subtransaction */
    1411          28 :         entry->xact_depth--;
    1412             :     }
    1413        2556 : }
    1414             : 
    1415             : /*
    1416             :  * Cancel the currently-in-progress query (whose query text we do not have)
    1417             :  * and ignore the result.  Returns true if we successfully cancel the query
    1418             :  * and discard any pending result, and false if not.
    1419             :  *
    1420             :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1421             :  * recursion trouble, we'll end up slamming the connection shut, which will
    1422             :  * necessitate failing the entire toplevel transaction even if subtransactions
    1423             :  * were used.  Try to use WARNING where we can.
    1424             :  *
    1425             :  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
    1426             :  * query text from the pendingAreq saved in the per-connection state, then
    1427             :  * report the query using it.
    1428             :  */
    1429             : static bool
    1430           4 : pgfdw_cancel_query(PGconn *conn)
    1431             : {
    1432           4 :     TimestampTz now = GetCurrentTimestamp();
    1433             :     TimestampTz endtime;
    1434             :     TimestampTz retrycanceltime;
    1435             : 
    1436             :     /*
    1437             :      * If it takes too long to cancel the query and discard the result, assume
    1438             :      * the connection is dead.
    1439             :      */
    1440           4 :     endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
    1441             : 
    1442             :     /*
    1443             :      * Also, lose patience and re-issue the cancel request after a little bit.
    1444             :      * (This serves to close some race conditions.)
    1445             :      */
    1446           4 :     retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
    1447             : 
    1448           4 :     if (!pgfdw_cancel_query_begin(conn, endtime))
    1449           0 :         return false;
    1450           4 :     return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
    1451             : }
    1452             : 
    1453             : /*
    1454             :  * Submit a cancel request to the given connection, waiting only until
    1455             :  * the given time.
    1456             :  *
    1457             :  * We sleep interruptibly until we receive confirmation that the cancel
    1458             :  * request has been accepted, and if it is, return true; if the timeout
    1459             :  * lapses without that, or the request fails for whatever reason, return
    1460             :  * false.
    1461             :  */
    1462             : static bool
    1463           4 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
    1464             : {
    1465           4 :     const char *errormsg = libpqsrv_cancel(conn, endtime);
    1466             : 
    1467           4 :     if (errormsg != NULL)
    1468           0 :         ereport(WARNING,
    1469             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1470             :                 errmsg("could not send cancel request: %s", errormsg));
    1471             : 
    1472           4 :     return errormsg == NULL;
    1473             : }
    1474             : 
    1475             : static bool
    1476           4 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
    1477             :                        TimestampTz retrycanceltime, bool consume_input)
    1478             : {
    1479             :     PGresult   *result;
    1480             :     bool        timed_out;
    1481             : 
    1482             :     /*
    1483             :      * If requested, consume whatever data is available from the socket. (Note
    1484             :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1485             :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1486             :      * which would be large compared to the overhead of PQconsumeInput.)
    1487             :      */
    1488           4 :     if (consume_input && !PQconsumeInput(conn))
    1489             :     {
    1490           0 :         ereport(WARNING,
    1491             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1492             :                  errmsg("could not get result of cancel request: %s",
    1493             :                         pchomp(PQerrorMessage(conn)))));
    1494           0 :         return false;
    1495             :     }
    1496             : 
    1497             :     /* Get and discard the result of the query. */
    1498           4 :     if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
    1499             :                                  &result, &timed_out))
    1500             :     {
    1501           0 :         if (timed_out)
    1502           0 :             ereport(WARNING,
    1503             :                     (errmsg("could not get result of cancel request due to timeout")));
    1504             :         else
    1505           0 :             ereport(WARNING,
    1506             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1507             :                      errmsg("could not get result of cancel request: %s",
    1508             :                             pchomp(PQerrorMessage(conn)))));
    1509             : 
    1510           0 :         return false;
    1511             :     }
    1512           4 :     PQclear(result);
    1513             : 
    1514           4 :     return true;
    1515             : }
    1516             : 
    1517             : /*
    1518             :  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
    1519             :  * result.  If the query is executed without error, the return value is true.
    1520             :  * If the query is executed successfully but returns an error, the return
    1521             :  * value is true if and only if ignore_errors is set.  If the query can't be
    1522             :  * sent or times out, the return value is false.
    1523             :  *
    1524             :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1525             :  * recursion trouble, we'll end up slamming the connection shut, which will
    1526             :  * necessitate failing the entire toplevel transaction even if subtransactions
    1527             :  * were used.  Try to use WARNING where we can.
    1528             :  */
    1529             : static bool
    1530         118 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
    1531             : {
    1532             :     TimestampTz endtime;
    1533             : 
    1534             :     /*
    1535             :      * If it takes too long to execute a cleanup query, assume the connection
    1536             :      * is dead.  It's fairly likely that this is why we aborted in the first
    1537             :      * place (e.g. statement timeout, user cancel), so the timeout shouldn't
    1538             :      * be too long.
    1539             :      */
    1540         118 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1541             :                                           CONNECTION_CLEANUP_TIMEOUT);
    1542             : 
    1543         118 :     if (!pgfdw_exec_cleanup_query_begin(conn, query))
    1544           0 :         return false;
    1545         118 :     return pgfdw_exec_cleanup_query_end(conn, query, endtime,
    1546             :                                         false, ignore_errors);
    1547             : }
    1548             : 
    1549             : static bool
    1550         142 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
    1551             : {
    1552             :     Assert(query != NULL);
    1553             : 
    1554             :     /*
    1555             :      * Submit a query.  Since we don't use non-blocking mode, this also can
    1556             :      * block.  But its risk is relatively small, so we ignore that for now.
    1557             :      */
    1558         142 :     if (!PQsendQuery(conn, query))
    1559             :     {
    1560           0 :         pgfdw_report(WARNING, NULL, conn, query);
    1561           0 :         return false;
    1562             :     }
    1563             : 
    1564         142 :     return true;
    1565             : }
    1566             : 
    1567             : static bool
    1568         142 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
    1569             :                              TimestampTz endtime, bool consume_input,
    1570             :                              bool ignore_errors)
    1571             : {
    1572             :     PGresult   *result;
    1573             :     bool        timed_out;
    1574             : 
    1575             :     Assert(query != NULL);
    1576             : 
    1577             :     /*
    1578             :      * If requested, consume whatever data is available from the socket. (Note
    1579             :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1580             :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1581             :      * which would be large compared to the overhead of PQconsumeInput.)
    1582             :      */
    1583         142 :     if (consume_input && !PQconsumeInput(conn))
    1584             :     {
    1585           0 :         pgfdw_report(WARNING, NULL, conn, query);
    1586           0 :         return false;
    1587             :     }
    1588             : 
    1589             :     /* Get the result of the query. */
    1590         142 :     if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
    1591             :     {
    1592           0 :         if (timed_out)
    1593           0 :             ereport(WARNING,
    1594             :                     (errmsg("could not get query result due to timeout"),
    1595             :                      errcontext("remote SQL command: %s", query)));
    1596             :         else
    1597           0 :             pgfdw_report(WARNING, NULL, conn, query);
    1598             : 
    1599           0 :         return false;
    1600             :     }
    1601             : 
    1602             :     /* Issue a warning if not successful. */
    1603         142 :     if (PQresultStatus(result) != PGRES_COMMAND_OK)
    1604             :     {
    1605           0 :         pgfdw_report(WARNING, result, conn, query);
    1606           0 :         return ignore_errors;
    1607             :     }
    1608         142 :     PQclear(result);
    1609             : 
    1610         142 :     return true;
    1611             : }
    1612             : 
    1613             : /*
    1614             :  * Get, during abort cleanup, the result of a query that is in progress.
    1615             :  * This might be a query that is being interrupted by a cancel request or by
    1616             :  * transaction abort, or it might be a query that was initiated as part of
    1617             :  * transaction abort to get the remote side back to the appropriate state.
    1618             :  *
    1619             :  * endtime is the time at which we should give up and assume the remote side
    1620             :  * is dead.  retrycanceltime is the time at which we should issue a fresh
    1621             :  * cancel request (pass the same value as endtime if this is not wanted).
    1622             :  *
    1623             :  * Returns true if the timeout expired or connection trouble occurred,
    1624             :  * false otherwise.  Sets *result except in case of a true result.
    1625             :  * Sets *timed_out to true only when the timeout expired.
    1626             :  */
    1627             : static bool
    1628         146 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
    1629             :                          TimestampTz retrycanceltime,
    1630             :                          PGresult **result,
    1631             :                          bool *timed_out)
    1632             : {
    1633         146 :     bool        failed = false;
    1634         146 :     PGresult   *last_res = NULL;
    1635         146 :     int         canceldelta = RETRY_CANCEL_TIMEOUT * 2;
    1636             : 
    1637         146 :     *result = NULL;
    1638         146 :     *timed_out = false;
    1639             :     for (;;)
    1640         160 :     {
    1641             :         PGresult   *res;
    1642             : 
    1643         446 :         while (PQisBusy(conn))
    1644             :         {
    1645             :             int         wc;
    1646         140 :             TimestampTz now = GetCurrentTimestamp();
    1647             :             long        cur_timeout;
    1648             : 
    1649             :             /* If timeout has expired, give up. */
    1650         140 :             if (now >= endtime)
    1651             :             {
    1652           0 :                 *timed_out = true;
    1653           0 :                 failed = true;
    1654           0 :                 goto exit;
    1655             :             }
    1656             : 
    1657             :             /* If we need to re-issue the cancel request, do that. */
    1658         140 :             if (now >= retrycanceltime)
    1659             :             {
    1660             :                 /* We ignore failure to issue the repeated request. */
    1661           0 :                 (void) libpqsrv_cancel(conn, endtime);
    1662             : 
    1663             :                 /* Recompute "now" in case that took measurable time. */
    1664           0 :                 now = GetCurrentTimestamp();
    1665             : 
    1666             :                 /* Adjust re-cancel timeout in increasing steps. */
    1667           0 :                 retrycanceltime = TimestampTzPlusMilliseconds(now,
    1668             :                                                               canceldelta);
    1669           0 :                 canceldelta += canceldelta;
    1670             :             }
    1671             : 
    1672             :             /* If timeout has expired, give up, else get sleep time. */
    1673         140 :             cur_timeout = TimestampDifferenceMilliseconds(now,
    1674             :                                                           Min(endtime,
    1675             :                                                               retrycanceltime));
    1676         140 :             if (cur_timeout <= 0)
    1677             :             {
    1678           0 :                 *timed_out = true;
    1679           0 :                 failed = true;
    1680           0 :                 goto exit;
    1681             :             }
    1682             : 
    1683             :             /* first time, allocate or get the custom wait event */
    1684         140 :             if (pgfdw_we_cleanup_result == 0)
    1685           4 :                 pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
    1686             : 
    1687             :             /* Sleep until there's something to do */
    1688         140 :             wc = WaitLatchOrSocket(MyLatch,
    1689             :                                    WL_LATCH_SET | WL_SOCKET_READABLE |
    1690             :                                    WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1691             :                                    PQsocket(conn),
    1692             :                                    cur_timeout, pgfdw_we_cleanup_result);
    1693         140 :             ResetLatch(MyLatch);
    1694             : 
    1695         140 :             CHECK_FOR_INTERRUPTS();
    1696             : 
    1697             :             /* Data available in socket? */
    1698         140 :             if (wc & WL_SOCKET_READABLE)
    1699             :             {
    1700         140 :                 if (!PQconsumeInput(conn))
    1701             :                 {
    1702             :                     /* connection trouble */
    1703           0 :                     failed = true;
    1704           0 :                     goto exit;
    1705             :                 }
    1706             :             }
    1707             :         }
    1708             : 
    1709         306 :         res = PQgetResult(conn);
    1710         306 :         if (res == NULL)
    1711         146 :             break;              /* query is complete */
    1712             : 
    1713         160 :         PQclear(last_res);
    1714         160 :         last_res = res;
    1715             :     }
    1716         146 : exit:
    1717         146 :     if (failed)
    1718           0 :         PQclear(last_res);
    1719             :     else
    1720         146 :         *result = last_res;
    1721         146 :     return failed;
    1722             : }
    1723             : 
    1724             : /*
    1725             :  * Abort remote transaction or subtransaction.
    1726             :  *
    1727             :  * "toplevel" should be set to true if toplevel (main) transaction is
    1728             :  * rollbacked, false otherwise.
    1729             :  *
    1730             :  * Set entry->changing_xact_state to false on success, true on failure.
    1731             :  */
    1732             : static void
    1733          86 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
    1734             : {
    1735             :     char        sql[100];
    1736             : 
    1737             :     /*
    1738             :      * Don't try to clean up the connection if we're already in error
    1739             :      * recursion trouble.
    1740             :      */
    1741          86 :     if (in_error_recursion_trouble())
    1742           0 :         entry->changing_xact_state = true;
    1743             : 
    1744             :     /*
    1745             :      * If connection is already unsalvageable, don't touch it further.
    1746             :      */
    1747          86 :     if (entry->changing_xact_state)
    1748           2 :         return;
    1749             : 
    1750             :     /*
    1751             :      * Mark this connection as in the process of changing transaction state.
    1752             :      */
    1753          84 :     entry->changing_xact_state = true;
    1754             : 
    1755             :     /* Assume we might have lost track of prepared statements */
    1756          84 :     entry->have_error = true;
    1757             : 
    1758             :     /*
    1759             :      * If a command has been submitted to the remote server by using an
    1760             :      * asynchronous execution function, the command might not have yet
    1761             :      * completed.  Check to see if a command is still being processed by the
    1762             :      * remote server, and if so, request cancellation of the command.
    1763             :      */
    1764          84 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
    1765           4 :         !pgfdw_cancel_query(entry->conn))
    1766           0 :         return;                 /* Unable to cancel running query */
    1767             : 
    1768          84 :     CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1769          84 :     if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
    1770           0 :         return;                 /* Unable to abort remote (sub)transaction */
    1771             : 
    1772          84 :     if (toplevel)
    1773             :     {
    1774          78 :         if (entry->have_prep_stmt && entry->have_error &&
    1775          34 :             !pgfdw_exec_cleanup_query(entry->conn,
    1776             :                                       "DEALLOCATE ALL",
    1777             :                                       true))
    1778           0 :             return;             /* Trouble clearing prepared statements */
    1779             : 
    1780          78 :         entry->have_prep_stmt = false;
    1781          78 :         entry->have_error = false;
    1782             :     }
    1783             : 
    1784             :     /*
    1785             :      * If pendingAreq of the per-connection state is not NULL, it means that
    1786             :      * an asynchronous fetch begun by fetch_more_data_begin() was not done
    1787             :      * successfully and thus the per-connection state was not reset in
    1788             :      * fetch_more_data(); in that case reset the per-connection state here.
    1789             :      */
    1790          84 :     if (entry->state.pendingAreq)
    1791           2 :         memset(&entry->state, 0, sizeof(entry->state));
    1792             : 
    1793             :     /* Disarm changing_xact_state if it all worked */
    1794          84 :     entry->changing_xact_state = false;
    1795             : }
    1796             : 
    1797             : /*
    1798             :  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
    1799             :  * don't wait for the result.
    1800             :  *
    1801             :  * Returns true if the abort command or cancel request is successfully issued,
    1802             :  * false otherwise.  If the abort command is successfully issued, the given
    1803             :  * connection cache entry is appended to *pending_entries.  Otherwise, if the
    1804             :  * cancel request is successfully issued, it is appended to *cancel_requested.
    1805             :  */
    1806             : static bool
    1807          16 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
    1808             :                           List **pending_entries, List **cancel_requested)
    1809             : {
    1810             :     /*
    1811             :      * Don't try to clean up the connection if we're already in error
    1812             :      * recursion trouble.
    1813             :      */
    1814          16 :     if (in_error_recursion_trouble())
    1815           0 :         entry->changing_xact_state = true;
    1816             : 
    1817             :     /*
    1818             :      * If connection is already unsalvageable, don't touch it further.
    1819             :      */
    1820          16 :     if (entry->changing_xact_state)
    1821           0 :         return false;
    1822             : 
    1823             :     /*
    1824             :      * Mark this connection as in the process of changing transaction state.
    1825             :      */
    1826          16 :     entry->changing_xact_state = true;
    1827             : 
    1828             :     /* Assume we might have lost track of prepared statements */
    1829          16 :     entry->have_error = true;
    1830             : 
    1831             :     /*
    1832             :      * If a command has been submitted to the remote server by using an
    1833             :      * asynchronous execution function, the command might not have yet
    1834             :      * completed.  Check to see if a command is still being processed by the
    1835             :      * remote server, and if so, request cancellation of the command.
    1836             :      */
    1837          16 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
    1838             :     {
    1839             :         TimestampTz endtime;
    1840             : 
    1841           0 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1842             :                                               CONNECTION_CLEANUP_TIMEOUT);
    1843           0 :         if (!pgfdw_cancel_query_begin(entry->conn, endtime))
    1844           0 :             return false;       /* Unable to cancel running query */
    1845           0 :         *cancel_requested = lappend(*cancel_requested, entry);
    1846             :     }
    1847             :     else
    1848             :     {
    1849             :         char        sql[100];
    1850             : 
    1851          16 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1852          16 :         if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1853           0 :             return false;       /* Unable to abort remote transaction */
    1854          16 :         *pending_entries = lappend(*pending_entries, entry);
    1855             :     }
    1856             : 
    1857          16 :     return true;
    1858             : }
    1859             : 
    1860             : /*
    1861             :  * Finish pre-commit cleanup of connections on each of which we've sent a
    1862             :  * COMMIT command to the remote server.
    1863             :  */
    1864             : static void
    1865          26 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
    1866             : {
    1867             :     ConnCacheEntry *entry;
    1868          26 :     List       *pending_deallocs = NIL;
    1869             :     ListCell   *lc;
    1870             : 
    1871             :     Assert(pending_entries);
    1872             : 
    1873             :     /*
    1874             :      * Get the result of the COMMIT command for each of the pending entries
    1875             :      */
    1876          58 :     foreach(lc, pending_entries)
    1877             :     {
    1878          32 :         entry = (ConnCacheEntry *) lfirst(lc);
    1879             : 
    1880             :         Assert(entry->changing_xact_state);
    1881             : 
    1882             :         /*
    1883             :          * We might already have received the result on the socket, so pass
    1884             :          * consume_input=true to try to consume it first
    1885             :          */
    1886          32 :         do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
    1887          32 :         entry->changing_xact_state = false;
    1888             : 
    1889             :         /* Do a DEALLOCATE ALL in parallel if needed */
    1890          32 :         if (entry->have_prep_stmt && entry->have_error)
    1891             :         {
    1892             :             /* Ignore errors (see notes in pgfdw_xact_callback) */
    1893           4 :             if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
    1894             :             {
    1895           4 :                 pending_deallocs = lappend(pending_deallocs, entry);
    1896           4 :                 continue;
    1897             :             }
    1898             :         }
    1899          28 :         entry->have_prep_stmt = false;
    1900          28 :         entry->have_error = false;
    1901             : 
    1902          28 :         pgfdw_reset_xact_state(entry, true);
    1903             :     }
    1904             : 
    1905             :     /* No further work if no pending entries */
    1906          26 :     if (!pending_deallocs)
    1907          24 :         return;
    1908             : 
    1909             :     /*
    1910             :      * Get the result of the DEALLOCATE command for each of the pending
    1911             :      * entries
    1912             :      */
    1913           6 :     foreach(lc, pending_deallocs)
    1914             :     {
    1915             :         PGresult   *res;
    1916             : 
    1917           4 :         entry = (ConnCacheEntry *) lfirst(lc);
    1918             : 
    1919             :         /* Ignore errors (see notes in pgfdw_xact_callback) */
    1920           8 :         while ((res = PQgetResult(entry->conn)) != NULL)
    1921             :         {
    1922           4 :             PQclear(res);
    1923             :             /* Stop if the connection is lost (else we'll loop infinitely) */
    1924           4 :             if (PQstatus(entry->conn) == CONNECTION_BAD)
    1925           0 :                 break;
    1926             :         }
    1927           4 :         entry->have_prep_stmt = false;
    1928           4 :         entry->have_error = false;
    1929             : 
    1930           4 :         pgfdw_reset_xact_state(entry, true);
    1931             :     }
    1932             : }
    1933             : 
    1934             : /*
    1935             :  * Finish pre-subcommit cleanup of connections on each of which we've sent a
    1936             :  * RELEASE command to the remote server.
    1937             :  */
    1938             : static void
    1939           2 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
    1940             : {
    1941             :     ConnCacheEntry *entry;
    1942             :     char        sql[100];
    1943             :     ListCell   *lc;
    1944             : 
    1945             :     Assert(pending_entries);
    1946             : 
    1947             :     /*
    1948             :      * Get the result of the RELEASE command for each of the pending entries
    1949             :      */
    1950           2 :     snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1951           6 :     foreach(lc, pending_entries)
    1952             :     {
    1953           4 :         entry = (ConnCacheEntry *) lfirst(lc);
    1954             : 
    1955             :         Assert(entry->changing_xact_state);
    1956             : 
    1957             :         /*
    1958             :          * We might already have received the result on the socket, so pass
    1959             :          * consume_input=true to try to consume it first
    1960             :          */
    1961           4 :         do_sql_command_end(entry->conn, sql, true);
    1962           4 :         entry->changing_xact_state = false;
    1963             : 
    1964           4 :         pgfdw_reset_xact_state(entry, false);
    1965             :     }
    1966           2 : }
    1967             : 
    1968             : /*
    1969             :  * Finish abort cleanup of connections on each of which we've sent an abort
    1970             :  * command or cancel request to the remote server.
    1971             :  */
    1972             : static void
    1973           8 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
    1974             :                            bool toplevel)
    1975             : {
    1976           8 :     List       *pending_deallocs = NIL;
    1977             :     ListCell   *lc;
    1978             : 
    1979             :     /*
    1980             :      * For each of the pending cancel requests (if any), get and discard the
    1981             :      * result of the query, and submit an abort command to the remote server.
    1982             :      */
    1983           8 :     if (cancel_requested)
    1984             :     {
    1985           0 :         foreach(lc, cancel_requested)
    1986             :         {
    1987           0 :             ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1988           0 :             TimestampTz now = GetCurrentTimestamp();
    1989             :             TimestampTz endtime;
    1990             :             TimestampTz retrycanceltime;
    1991             :             char        sql[100];
    1992             : 
    1993             :             Assert(entry->changing_xact_state);
    1994             : 
    1995             :             /*
    1996             :              * Set end time.  You might think we should do this before issuing
    1997             :              * cancel request like in normal mode, but that is problematic,
    1998             :              * because if, for example, it took longer than 30 seconds to
    1999             :              * process the first few entries in the cancel_requested list, it
    2000             :              * would cause a timeout error when processing each of the
    2001             :              * remaining entries in the list, leading to slamming that entry's
    2002             :              * connection shut.
    2003             :              */
    2004           0 :             endtime = TimestampTzPlusMilliseconds(now,
    2005             :                                                   CONNECTION_CLEANUP_TIMEOUT);
    2006           0 :             retrycanceltime = TimestampTzPlusMilliseconds(now,
    2007             :                                                           RETRY_CANCEL_TIMEOUT);
    2008             : 
    2009           0 :             if (!pgfdw_cancel_query_end(entry->conn, endtime,
    2010             :                                         retrycanceltime, true))
    2011             :             {
    2012             :                 /* Unable to cancel running query */
    2013           0 :                 pgfdw_reset_xact_state(entry, toplevel);
    2014           0 :                 continue;
    2015             :             }
    2016             : 
    2017             :             /* Send an abort command in parallel if needed */
    2018           0 :             CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    2019           0 :             if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    2020             :             {
    2021             :                 /* Unable to abort remote (sub)transaction */
    2022           0 :                 pgfdw_reset_xact_state(entry, toplevel);
    2023             :             }
    2024             :             else
    2025           0 :                 pending_entries = lappend(pending_entries, entry);
    2026             :         }
    2027             :     }
    2028             : 
    2029             :     /* No further work if no pending entries */
    2030           8 :     if (!pending_entries)
    2031           0 :         return;
    2032             : 
    2033             :     /*
    2034             :      * Get the result of the abort command for each of the pending entries
    2035             :      */
    2036          24 :     foreach(lc, pending_entries)
    2037             :     {
    2038          16 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2039             :         TimestampTz endtime;
    2040             :         char        sql[100];
    2041             : 
    2042             :         Assert(entry->changing_xact_state);
    2043             : 
    2044             :         /*
    2045             :          * Set end time.  We do this now, not before issuing the command like
    2046             :          * in normal mode, for the same reason as for the cancel_requested
    2047             :          * entries.
    2048             :          */
    2049          16 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2050             :                                               CONNECTION_CLEANUP_TIMEOUT);
    2051             : 
    2052          16 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    2053          16 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
    2054             :                                           true, false))
    2055             :         {
    2056             :             /* Unable to abort remote (sub)transaction */
    2057           0 :             pgfdw_reset_xact_state(entry, toplevel);
    2058           8 :             continue;
    2059             :         }
    2060             : 
    2061          16 :         if (toplevel)
    2062             :         {
    2063             :             /* Do a DEALLOCATE ALL in parallel if needed */
    2064           8 :             if (entry->have_prep_stmt && entry->have_error)
    2065             :             {
    2066           8 :                 if (!pgfdw_exec_cleanup_query_begin(entry->conn,
    2067             :                                                     "DEALLOCATE ALL"))
    2068             :                 {
    2069             :                     /* Trouble clearing prepared statements */
    2070           0 :                     pgfdw_reset_xact_state(entry, toplevel);
    2071             :                 }
    2072             :                 else
    2073           8 :                     pending_deallocs = lappend(pending_deallocs, entry);
    2074           8 :                 continue;
    2075             :             }
    2076           0 :             entry->have_prep_stmt = false;
    2077           0 :             entry->have_error = false;
    2078             :         }
    2079             : 
    2080             :         /* Reset the per-connection state if needed */
    2081           8 :         if (entry->state.pendingAreq)
    2082           0 :             memset(&entry->state, 0, sizeof(entry->state));
    2083             : 
    2084             :         /* We're done with this entry; unset the changing_xact_state flag */
    2085           8 :         entry->changing_xact_state = false;
    2086           8 :         pgfdw_reset_xact_state(entry, toplevel);
    2087             :     }
    2088             : 
    2089             :     /* No further work if no pending entries */
    2090           8 :     if (!pending_deallocs)
    2091           4 :         return;
    2092             :     Assert(toplevel);
    2093             : 
    2094             :     /*
    2095             :      * Get the result of the DEALLOCATE command for each of the pending
    2096             :      * entries
    2097             :      */
    2098          12 :     foreach(lc, pending_deallocs)
    2099             :     {
    2100           8 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2101             :         TimestampTz endtime;
    2102             : 
    2103             :         Assert(entry->changing_xact_state);
    2104             :         Assert(entry->have_prep_stmt);
    2105             :         Assert(entry->have_error);
    2106             : 
    2107             :         /*
    2108             :          * Set end time.  We do this now, not before issuing the command like
    2109             :          * in normal mode, for the same reason as for the cancel_requested
    2110             :          * entries.
    2111             :          */
    2112           8 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2113             :                                               CONNECTION_CLEANUP_TIMEOUT);
    2114             : 
    2115           8 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
    2116             :                                           endtime, true, true))
    2117             :         {
    2118             :             /* Trouble clearing prepared statements */
    2119           0 :             pgfdw_reset_xact_state(entry, toplevel);
    2120           0 :             continue;
    2121             :         }
    2122           8 :         entry->have_prep_stmt = false;
    2123           8 :         entry->have_error = false;
    2124             : 
    2125             :         /* Reset the per-connection state if needed */
    2126           8 :         if (entry->state.pendingAreq)
    2127           0 :             memset(&entry->state, 0, sizeof(entry->state));
    2128             : 
    2129             :         /* We're done with this entry; unset the changing_xact_state flag */
    2130           8 :         entry->changing_xact_state = false;
    2131           8 :         pgfdw_reset_xact_state(entry, toplevel);
    2132             :     }
    2133             : }
    2134             : 
    2135             : /* Number of output arguments (columns) for various API versions */
    2136             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1  2
    2137             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2  6
    2138             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS   6   /* maximum of above */
    2139             : 
    2140             : /*
    2141             :  * Internal function used by postgres_fdw_get_connections variants.
    2142             :  *
    2143             :  * For API version 1.1, this function takes no input parameter and
    2144             :  * returns a set of records with the following values:
    2145             :  *
    2146             :  * - server_name - server name of active connection. In case the foreign server
    2147             :  *   is dropped but still the connection is active, then the server name will
    2148             :  *   be NULL in output.
    2149             :  * - valid - true/false representing whether the connection is valid or not.
    2150             :  *   Note that connections can become invalid in pgfdw_inval_callback.
    2151             :  *
    2152             :  * For API version 1.2 and later, this function takes an input parameter
    2153             :  * to check a connection status and returns the following
    2154             :  * additional values along with the four values from version 1.1:
    2155             :  *
    2156             :  * - user_name - the local user name of the active connection. In case the
    2157             :  *   user mapping is dropped but the connection is still active, then the
    2158             :  *   user name will be NULL in the output.
    2159             :  * - used_in_xact - true if the connection is used in the current transaction.
    2160             :  * - closed - true if the connection is closed.
    2161             :  * - remote_backend_pid - process ID of the remote backend, on the foreign
    2162             :  *   server, handling the connection.
    2163             :  *
    2164             :  * No records are returned when there are no cached connections at all.
    2165             :  */
    2166             : static void
    2167          26 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
    2168             :                                       enum pgfdwVersion api_version)
    2169             : {
    2170          26 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    2171             :     HASH_SEQ_STATUS scan;
    2172             :     ConnCacheEntry *entry;
    2173             : 
    2174          26 :     InitMaterializedSRF(fcinfo, 0);
    2175             : 
    2176             :     /* If cache doesn't exist, we return no records */
    2177          26 :     if (!ConnectionHash)
    2178           0 :         return;
    2179             : 
    2180             :     /* Check we have the expected number of output arguments */
    2181          26 :     switch (rsinfo->setDesc->natts)
    2182             :     {
    2183           0 :         case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
    2184           0 :             if (api_version != PGFDW_V1_1)
    2185           0 :                 elog(ERROR, "incorrect number of output arguments");
    2186           0 :             break;
    2187          26 :         case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
    2188          26 :             if (api_version != PGFDW_V1_2)
    2189           0 :                 elog(ERROR, "incorrect number of output arguments");
    2190          26 :             break;
    2191           0 :         default:
    2192           0 :             elog(ERROR, "incorrect number of output arguments");
    2193             :     }
    2194             : 
    2195          26 :     hash_seq_init(&scan, ConnectionHash);
    2196         226 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2197             :     {
    2198             :         ForeignServer *server;
    2199         200 :         Datum       values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2200         200 :         bool        nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2201         200 :         int         i = 0;
    2202             : 
    2203             :         /* We only look for open remote connections */
    2204         200 :         if (!entry->conn)
    2205         174 :             continue;
    2206             : 
    2207          26 :         server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
    2208             : 
    2209             :         /*
    2210             :          * The foreign server may have been dropped in current explicit
    2211             :          * transaction. It is not possible to drop the server from another
    2212             :          * session when the connection associated with it is in use in the
    2213             :          * current transaction, if tried so, the drop query in another session
    2214             :          * blocks until the current transaction finishes.
    2215             :          *
    2216             :          * Even though the server is dropped in the current transaction, the
    2217             :          * cache can still have associated active connection entry, say we
    2218             :          * call such connections dangling. Since we can not fetch the server
    2219             :          * name from system catalogs for dangling connections, instead we show
    2220             :          * NULL value for server name in output.
    2221             :          *
    2222             :          * We could have done better by storing the server name in the cache
    2223             :          * entry instead of server oid so that it could be used in the output.
    2224             :          * But the server name in each cache entry requires 64 bytes of
    2225             :          * memory, which is huge, when there are many cached connections and
    2226             :          * the use case i.e. dropping the foreign server within the explicit
    2227             :          * current transaction seems rare. So, we chose to show NULL value for
    2228             :          * server name in output.
    2229             :          *
    2230             :          * Such dangling connections get closed either in next use or at the
    2231             :          * end of current explicit transaction in pgfdw_xact_callback.
    2232             :          */
    2233          26 :         if (!server)
    2234             :         {
    2235             :             /*
    2236             :              * If the server has been dropped in the current explicit
    2237             :              * transaction, then this entry would have been invalidated in
    2238             :              * pgfdw_inval_callback at the end of drop server command. Note
    2239             :              * that this connection would not have been closed in
    2240             :              * pgfdw_inval_callback because it is still being used in the
    2241             :              * current explicit transaction. So, assert that here.
    2242             :              */
    2243             :             Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
    2244             : 
    2245             :             /* Show null, if no server name was found */
    2246           2 :             nulls[i++] = true;
    2247             :         }
    2248             :         else
    2249          24 :             values[i++] = CStringGetTextDatum(server->servername);
    2250             : 
    2251          26 :         if (api_version >= PGFDW_V1_2)
    2252             :         {
    2253             :             HeapTuple   tp;
    2254             : 
    2255             :             /* Use the system cache to obtain the user mapping */
    2256          26 :             tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
    2257             : 
    2258             :             /*
    2259             :              * Just like in the foreign server case, user mappings can also be
    2260             :              * dropped in the current explicit transaction. Therefore, the
    2261             :              * similar check as in the server case is required.
    2262             :              */
    2263          26 :             if (!HeapTupleIsValid(tp))
    2264             :             {
    2265             :                 /*
    2266             :                  * If we reach here, this entry must have been invalidated in
    2267             :                  * pgfdw_inval_callback, same as in the server case.
    2268             :                  */
    2269             :                 Assert(entry->conn && entry->xact_depth > 0 &&
    2270             :                        entry->invalidated);
    2271             : 
    2272           2 :                 nulls[i++] = true;
    2273             :             }
    2274             :             else
    2275             :             {
    2276             :                 Oid         userid;
    2277             : 
    2278          24 :                 userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
    2279          24 :                 values[i++] = CStringGetTextDatum(MappingUserName(userid));
    2280          24 :                 ReleaseSysCache(tp);
    2281             :             }
    2282             :         }
    2283             : 
    2284          26 :         values[i++] = BoolGetDatum(!entry->invalidated);
    2285             : 
    2286          26 :         if (api_version >= PGFDW_V1_2)
    2287             :         {
    2288          26 :             bool        check_conn = PG_GETARG_BOOL(0);
    2289             : 
    2290             :             /* Is this connection used in the current transaction? */
    2291          26 :             values[i++] = BoolGetDatum(entry->xact_depth > 0);
    2292             : 
    2293             :             /*
    2294             :              * If a connection status check is requested and supported, return
    2295             :              * whether the connection is closed. Otherwise, return NULL.
    2296             :              */
    2297          26 :             if (check_conn && pgfdw_conn_checkable())
    2298           4 :                 values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
    2299             :             else
    2300          22 :                 nulls[i++] = true;
    2301             : 
    2302             :             /* Return process ID of remote backend */
    2303          26 :             values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
    2304             :         }
    2305             : 
    2306          26 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    2307             :     }
    2308             : }
    2309             : 
    2310             : /*
    2311             :  * List active foreign server connections.
    2312             :  *
    2313             :  * The SQL API of this function has changed multiple times, and will likely
    2314             :  * do so again in future.  To support the case where a newer version of this
    2315             :  * loadable module is being used with an old SQL declaration of the function,
    2316             :  * we continue to support the older API versions.
    2317             :  */
    2318             : Datum
    2319          26 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
    2320             : {
    2321          26 :     postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
    2322             : 
    2323          26 :     PG_RETURN_VOID();
    2324             : }
    2325             : 
    2326             : Datum
    2327           0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
    2328             : {
    2329           0 :     postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
    2330             : 
    2331           0 :     PG_RETURN_VOID();
    2332             : }
    2333             : 
    2334             : /*
    2335             :  * Disconnect the specified cached connections.
    2336             :  *
    2337             :  * This function discards the open connections that are established by
    2338             :  * postgres_fdw from the local session to the foreign server with
    2339             :  * the given name. Note that there can be multiple connections to
    2340             :  * the given server using different user mappings. If the connections
    2341             :  * are used in the current local transaction, they are not disconnected
    2342             :  * and warning messages are reported. This function returns true
    2343             :  * if it disconnects at least one connection, otherwise false. If no
    2344             :  * foreign server with the given name is found, an error is reported.
    2345             :  */
    2346             : Datum
    2347           8 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
    2348             : {
    2349             :     ForeignServer *server;
    2350             :     char       *servername;
    2351             : 
    2352           8 :     servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
    2353           8 :     server = GetForeignServerByName(servername, false);
    2354             : 
    2355           6 :     PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
    2356             : }
    2357             : 
    2358             : /*
    2359             :  * Disconnect all the cached connections.
    2360             :  *
    2361             :  * This function discards all the open connections that are established by
    2362             :  * postgres_fdw from the local session to the foreign servers.
    2363             :  * If the connections are used in the current local transaction, they are
    2364             :  * not disconnected and warning messages are reported. This function
    2365             :  * returns true if it disconnects at least one connection, otherwise false.
    2366             :  */
    2367             : Datum
    2368          10 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
    2369             : {
    2370          10 :     PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
    2371             : }
    2372             : 
    2373             : /*
    2374             :  * Workhorse to disconnect cached connections.
    2375             :  *
    2376             :  * This function scans all the connection cache entries and disconnects
    2377             :  * the open connections whose foreign server OID matches with
    2378             :  * the specified one. If InvalidOid is specified, it disconnects all
    2379             :  * the cached connections.
    2380             :  *
    2381             :  * This function emits a warning for each connection that's used in
    2382             :  * the current transaction and doesn't close it. It returns true if
    2383             :  * it disconnects at least one connection, otherwise false.
    2384             :  *
    2385             :  * Note that this function disconnects even the connections that are
    2386             :  * established by other users in the same local session using different
    2387             :  * user mappings. This leads even non-superuser to be able to close
    2388             :  * the connections established by superusers in the same local session.
    2389             :  *
    2390             :  * XXX As of now we don't see any security risk doing this. But we should
    2391             :  * set some restrictions on that, for example, prevent non-superuser
    2392             :  * from closing the connections established by superusers even
    2393             :  * in the same session?
    2394             :  */
    2395             : static bool
    2396          16 : disconnect_cached_connections(Oid serverid)
    2397             : {
    2398             :     HASH_SEQ_STATUS scan;
    2399             :     ConnCacheEntry *entry;
    2400          16 :     bool        all = !OidIsValid(serverid);
    2401          16 :     bool        result = false;
    2402             : 
    2403             :     /*
    2404             :      * Connection cache hashtable has not been initialized yet in this
    2405             :      * session, so return false.
    2406             :      */
    2407          16 :     if (!ConnectionHash)
    2408           0 :         return false;
    2409             : 
    2410          16 :     hash_seq_init(&scan, ConnectionHash);
    2411         134 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2412             :     {
    2413             :         /* Ignore cache entry if no open connection right now. */
    2414         118 :         if (!entry->conn)
    2415          96 :             continue;
    2416             : 
    2417          22 :         if (all || entry->serverid == serverid)
    2418             :         {
    2419             :             /*
    2420             :              * Emit a warning because the connection to close is used in the
    2421             :              * current transaction and cannot be disconnected right now.
    2422             :              */
    2423          16 :             if (entry->xact_depth > 0)
    2424             :             {
    2425             :                 ForeignServer *server;
    2426             : 
    2427           6 :                 server = GetForeignServerExtended(entry->serverid,
    2428             :                                                   FSV_MISSING_OK);
    2429             : 
    2430           6 :                 if (!server)
    2431             :                 {
    2432             :                     /*
    2433             :                      * If the foreign server was dropped while its connection
    2434             :                      * was used in the current transaction, the connection
    2435             :                      * must have been marked as invalid by
    2436             :                      * pgfdw_inval_callback at the end of DROP SERVER command.
    2437             :                      */
    2438             :                     Assert(entry->invalidated);
    2439             : 
    2440           0 :                     ereport(WARNING,
    2441             :                             (errmsg("cannot close dropped server connection because it is still in use")));
    2442             :                 }
    2443             :                 else
    2444           6 :                     ereport(WARNING,
    2445             :                             (errmsg("cannot close connection for server \"%s\" because it is still in use",
    2446             :                                     server->servername)));
    2447             :             }
    2448             :             else
    2449             :             {
    2450          10 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    2451          10 :                 disconnect_pg_server(entry);
    2452          10 :                 result = true;
    2453             :             }
    2454             :         }
    2455             :     }
    2456             : 
    2457          16 :     return result;
    2458             : }
    2459             : 
    2460             : /*
    2461             :  * Check if the remote server closed the connection.
    2462             :  *
    2463             :  * Returns 1 if the connection is closed, -1 if an error occurred,
    2464             :  * and 0 if it's not closed or if the connection check is unavailable
    2465             :  * on this platform.
    2466             :  */
    2467             : static int
    2468           4 : pgfdw_conn_check(PGconn *conn)
    2469             : {
    2470           4 :     int         sock = PQsocket(conn);
    2471             : 
    2472           4 :     if (PQstatus(conn) != CONNECTION_OK || sock == -1)
    2473           0 :         return -1;
    2474             : 
    2475             : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2476             :     {
    2477             :         struct pollfd input_fd;
    2478             :         int         result;
    2479             : 
    2480           4 :         input_fd.fd = sock;
    2481           4 :         input_fd.events = POLLRDHUP;
    2482           4 :         input_fd.revents = 0;
    2483             : 
    2484             :         do
    2485           4 :             result = poll(&input_fd, 1, 0);
    2486           4 :         while (result < 0 && errno == EINTR);
    2487             : 
    2488           4 :         if (result < 0)
    2489           0 :             return -1;
    2490             : 
    2491           4 :         return (input_fd.revents &
    2492           4 :                 (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
    2493             :     }
    2494             : #else
    2495             :     return 0;
    2496             : #endif
    2497             : }
    2498             : 
    2499             : /*
    2500             :  * Check if connection status checking is available on this platform.
    2501             :  *
    2502             :  * Returns true if available, false otherwise.
    2503             :  */
    2504             : static bool
    2505           4 : pgfdw_conn_checkable(void)
    2506             : {
    2507             : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2508           4 :     return true;
    2509             : #else
    2510             :     return false;
    2511             : #endif
    2512             : }
    2513             : 
    2514             : /*
    2515             :  * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
    2516             :  * keys used to pass-through are coming from the initial connection from the
    2517             :  * client with the server.
    2518             :  *
    2519             :  * All required SCRAM options are set by postgres_fdw, so we just need to
    2520             :  * ensure that these options are not overwritten by the user.
    2521             :  */
    2522             : static bool
    2523          16 : pgfdw_has_required_scram_options(const char **keywords, const char **values)
    2524             : {
    2525          16 :     bool        has_scram_server_key = false;
    2526          16 :     bool        has_scram_client_key = false;
    2527          16 :     bool        has_require_auth = false;
    2528          16 :     bool        has_scram_keys = false;
    2529             : 
    2530             :     /*
    2531             :      * Continue iterating even if we found the keys that we need to validate
    2532             :      * to make sure that there is no other declaration of these keys that can
    2533             :      * overwrite the first.
    2534             :      */
    2535         160 :     for (int i = 0; keywords[i] != NULL; i++)
    2536             :     {
    2537         144 :         if (strcmp(keywords[i], "scram_client_key") == 0)
    2538             :         {
    2539          16 :             if (values[i] != NULL && values[i][0] != '\0')
    2540          16 :                 has_scram_client_key = true;
    2541             :             else
    2542           0 :                 has_scram_client_key = false;
    2543             :         }
    2544             : 
    2545         144 :         if (strcmp(keywords[i], "scram_server_key") == 0)
    2546             :         {
    2547          16 :             if (values[i] != NULL && values[i][0] != '\0')
    2548          16 :                 has_scram_server_key = true;
    2549             :             else
    2550           0 :                 has_scram_server_key = false;
    2551             :         }
    2552             : 
    2553         144 :         if (strcmp(keywords[i], "require_auth") == 0)
    2554             :         {
    2555          16 :             if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
    2556          16 :                 has_require_auth = true;
    2557             :             else
    2558           0 :                 has_require_auth = false;
    2559             :         }
    2560             :     }
    2561             : 
    2562          16 :     has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
    2563             : 
    2564          16 :     return (has_scram_keys && has_require_auth);
    2565             : }

Generated by: LCOV version 1.16