LCOV - code coverage report
Current view: top level - contrib/postgres_fdw - connection.c (source / functions) Hit Total Coverage
Test: PostgreSQL 18devel Lines: 569 683 83.3 %
Date: 2025-01-18 04:15:08 Functions: 47 48 97.9 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * connection.c
       4             :  *        Connection management functions for postgres_fdw
       5             :  *
       6             :  * Portions Copyright (c) 2012-2025, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/postgres_fdw/connection.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #if HAVE_POLL_H
      16             : #include <poll.h>
      17             : #endif
      18             : 
      19             : #include "access/xact.h"
      20             : #include "catalog/pg_user_mapping.h"
      21             : #include "commands/defrem.h"
      22             : #include "common/base64.h"
      23             : #include "funcapi.h"
      24             : #include "libpq/libpq-be.h"
      25             : #include "libpq/libpq-be-fe-helpers.h"
      26             : #include "mb/pg_wchar.h"
      27             : #include "miscadmin.h"
      28             : #include "pgstat.h"
      29             : #include "postgres_fdw.h"
      30             : #include "storage/latch.h"
      31             : #include "utils/builtins.h"
      32             : #include "utils/hsearch.h"
      33             : #include "utils/inval.h"
      34             : #include "utils/syscache.h"
      35             : 
      36             : /*
      37             :  * Connection cache hash table entry
      38             :  *
      39             :  * The lookup key in this hash table is the user mapping OID. We use just one
      40             :  * connection per user mapping ID, which ensures that all the scans use the
      41             :  * same snapshot during a query.  Using the user mapping OID rather than
      42             :  * the foreign server OID + user OID avoids creating multiple connections when
      43             :  * the public user mapping applies to all user OIDs.
      44             :  *
      45             :  * The "conn" pointer can be NULL if we don't currently have a live connection.
      46             :  * When we do have a connection, xact_depth tracks the current depth of
      47             :  * transactions and subtransactions open on the remote side.  We need to issue
      48             :  * commands at the same nesting depth on the remote as we're executing at
      49             :  * ourselves, so that rolling back a subtransaction will kill the right
      50             :  * queries and not the wrong ones.
      51             :  */
      52             : typedef Oid ConnCacheKey;
      53             : 
      54             : typedef struct ConnCacheEntry
      55             : {
      56             :     ConnCacheKey key;           /* hash key (must be first) */
      57             :     PGconn     *conn;           /* connection to foreign server, or NULL */
      58             :     /* Remaining fields are invalid when conn is NULL: */
      59             :     int         xact_depth;     /* 0 = no xact open, 1 = main xact open, 2 =
      60             :                                  * one level of subxact open, etc */
      61             :     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
      62             :     bool        have_error;     /* have any subxacts aborted in this xact? */
      63             :     bool        changing_xact_state;    /* xact state change in process */
      64             :     bool        parallel_commit;    /* do we commit (sub)xacts in parallel? */
      65             :     bool        parallel_abort; /* do we abort (sub)xacts in parallel? */
      66             :     bool        invalidated;    /* true if reconnect is pending */
      67             :     bool        keep_connections;   /* setting value of keep_connections
      68             :                                      * server option */
      69             :     Oid         serverid;       /* foreign server OID used to get server name */
      70             :     uint32      server_hashvalue;   /* hash value of foreign server OID */
      71             :     uint32      mapping_hashvalue;  /* hash value of user mapping OID */
      72             :     PgFdwConnState state;       /* extra per-connection state */
      73             : } ConnCacheEntry;
      74             : 
      75             : /*
      76             :  * Connection cache (initialized on first use)
      77             :  */
      78             : static HTAB *ConnectionHash = NULL;
      79             : 
      80             : /* for assigning cursor numbers and prepared statement numbers */
      81             : static unsigned int cursor_number = 0;
      82             : static unsigned int prep_stmt_number = 0;
      83             : 
      84             : /* tracks whether any work is needed in callback functions */
      85             : static bool xact_got_connection = false;
      86             : 
      87             : /* custom wait event values, retrieved from shared memory */
      88             : static uint32 pgfdw_we_cleanup_result = 0;
      89             : static uint32 pgfdw_we_connect = 0;
      90             : static uint32 pgfdw_we_get_result = 0;
      91             : 
      92             : /*
      93             :  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
      94             :  * query; if it takes longer than 30 seconds to do these, we assume the
      95             :  * connection is dead.
      96             :  */
      97             : #define CONNECTION_CLEANUP_TIMEOUT  30000
      98             : 
      99             : /*
     100             :  * Milliseconds to wait before issuing another cancel request.  This covers
     101             :  * the race condition where the remote session ignored our cancel request
     102             :  * because it arrived while idle.
     103             :  */
     104             : #define RETRY_CANCEL_TIMEOUT    1000
     105             : 
     106             : /* Macro for constructing abort command to be sent */
     107             : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
     108             :     do { \
     109             :         if (toplevel) \
     110             :             snprintf((sql), sizeof(sql), \
     111             :                      "ABORT TRANSACTION"); \
     112             :         else \
     113             :             snprintf((sql), sizeof(sql), \
     114             :                      "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
     115             :                      (entry)->xact_depth, (entry)->xact_depth); \
     116             :     } while(0)
     117             : 
     118             : /*
     119             :  * Extension version number, for supporting older extension versions' objects
     120             :  */
     121             : enum pgfdwVersion
     122             : {
     123             :     PGFDW_V1_1 = 0,
     124             :     PGFDW_V1_2,
     125             : };
     126             : 
     127             : /*
     128             :  * SQL functions
     129             :  */
     130           4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
     131           6 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
     132           6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
     133           6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
     134             : 
     135             : /* prototypes of private functions */
     136             : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
     137             : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
     138             : static void disconnect_pg_server(ConnCacheEntry *entry);
     139             : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
     140             : static void configure_remote_session(PGconn *conn);
     141             : static void do_sql_command_begin(PGconn *conn, const char *sql);
     142             : static void do_sql_command_end(PGconn *conn, const char *sql,
     143             :                                bool consume_input);
     144             : static void begin_remote_xact(ConnCacheEntry *entry);
     145             : static void pgfdw_xact_callback(XactEvent event, void *arg);
     146             : static void pgfdw_subxact_callback(SubXactEvent event,
     147             :                                    SubTransactionId mySubid,
     148             :                                    SubTransactionId parentSubid,
     149             :                                    void *arg);
     150             : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
     151             : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
     152             : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
     153             : static bool pgfdw_cancel_query(PGconn *conn);
     154             : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
     155             : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
     156             :                                    TimestampTz retrycanceltime,
     157             :                                    bool consume_input);
     158             : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
     159             :                                      bool ignore_errors);
     160             : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
     161             : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
     162             :                                          TimestampTz endtime,
     163             :                                          bool consume_input,
     164             :                                          bool ignore_errors);
     165             : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
     166             :                                      TimestampTz retrycanceltime,
     167             :                                      PGresult **result, bool *timed_out);
     168             : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
     169             : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
     170             :                                       List **pending_entries,
     171             :                                       List **cancel_requested);
     172             : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
     173             : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
     174             :                                                int curlevel);
     175             : static void pgfdw_finish_abort_cleanup(List *pending_entries,
     176             :                                        List *cancel_requested,
     177             :                                        bool toplevel);
     178             : static void pgfdw_security_check(const char **keywords, const char **values,
     179             :                                  UserMapping *user, PGconn *conn);
     180             : static bool UserMappingPasswordRequired(UserMapping *user);
     181             : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
     182             : static bool disconnect_cached_connections(Oid serverid);
     183             : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
     184             :                                                   enum pgfdwVersion api_version);
     185             : static int  pgfdw_conn_check(PGconn *conn);
     186             : static bool pgfdw_conn_checkable(void);
     187             : 
     188             : /*
     189             :  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
     190             :  * server with the user's authorization.  A new connection is established
     191             :  * if we don't already have a suitable one, and a transaction is opened at
     192             :  * the right subtransaction nesting depth if we didn't do that already.
     193             :  *
     194             :  * will_prep_stmt must be true if caller intends to create any prepared
     195             :  * statements.  Since those don't go away automatically at transaction end
     196             :  * (not even on error), we need this flag to cue manual cleanup.
     197             :  *
     198             :  * If state is not NULL, *state receives the per-connection state associated
     199             :  * with the PGconn.
     200             :  */
     201             : PGconn *
     202        4262 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
     203             : {
     204             :     bool        found;
     205        4262 :     bool        retry = false;
     206             :     ConnCacheEntry *entry;
     207             :     ConnCacheKey key;
     208        4262 :     MemoryContext ccxt = CurrentMemoryContext;
     209             : 
     210             :     /* First time through, initialize connection cache hashtable */
     211        4262 :     if (ConnectionHash == NULL)
     212             :     {
     213             :         HASHCTL     ctl;
     214             : 
     215          18 :         if (pgfdw_we_get_result == 0)
     216          18 :             pgfdw_we_get_result =
     217          18 :                 WaitEventExtensionNew("PostgresFdwGetResult");
     218             : 
     219          18 :         ctl.keysize = sizeof(ConnCacheKey);
     220          18 :         ctl.entrysize = sizeof(ConnCacheEntry);
     221          18 :         ConnectionHash = hash_create("postgres_fdw connections", 8,
     222             :                                      &ctl,
     223             :                                      HASH_ELEM | HASH_BLOBS);
     224             : 
     225             :         /*
     226             :          * Register some callback functions that manage connection cleanup.
     227             :          * This should be done just once in each backend.
     228             :          */
     229          18 :         RegisterXactCallback(pgfdw_xact_callback, NULL);
     230          18 :         RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
     231          18 :         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
     232             :                                       pgfdw_inval_callback, (Datum) 0);
     233          18 :         CacheRegisterSyscacheCallback(USERMAPPINGOID,
     234             :                                       pgfdw_inval_callback, (Datum) 0);
     235             :     }
     236             : 
     237             :     /* Set flag that we did GetConnection during the current transaction */
     238        4262 :     xact_got_connection = true;
     239             : 
     240             :     /* Create hash key for the entry.  Assume no pad bytes in key struct */
     241        4262 :     key = user->umid;
     242             : 
     243             :     /*
     244             :      * Find or create cached entry for requested connection.
     245             :      */
     246        4262 :     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
     247        4262 :     if (!found)
     248             :     {
     249             :         /*
     250             :          * We need only clear "conn" here; remaining fields will be filled
     251             :          * later when "conn" is set.
     252             :          */
     253          34 :         entry->conn = NULL;
     254             :     }
     255             : 
     256             :     /* Reject further use of connections which failed abort cleanup. */
     257        4262 :     pgfdw_reject_incomplete_xact_state_change(entry);
     258             : 
     259             :     /*
     260             :      * If the connection needs to be remade due to invalidation, disconnect as
     261             :      * soon as we're out of all transactions.
     262             :      */
     263        4262 :     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
     264             :     {
     265           0 :         elog(DEBUG3, "closing connection %p for option changes to take effect",
     266             :              entry->conn);
     267           0 :         disconnect_pg_server(entry);
     268             :     }
     269             : 
     270             :     /*
     271             :      * If cache entry doesn't have a connection, we have to establish a new
     272             :      * connection.  (If connect_pg_server throws an error, the cache entry
     273             :      * will remain in a valid empty state, ie conn == NULL.)
     274             :      */
     275        4262 :     if (entry->conn == NULL)
     276         140 :         make_new_connection(entry, user);
     277             : 
     278             :     /*
     279             :      * We check the health of the cached connection here when using it.  In
     280             :      * cases where we're out of all transactions, if a broken connection is
     281             :      * detected, we try to reestablish a new connection later.
     282             :      */
     283        4250 :     PG_TRY();
     284             :     {
     285             :         /* Process a pending asynchronous request if any. */
     286        4250 :         if (entry->state.pendingAreq)
     287           0 :             process_pending_request(entry->state.pendingAreq);
     288             :         /* Start a new transaction or subtransaction if needed. */
     289        4250 :         begin_remote_xact(entry);
     290             :     }
     291           4 :     PG_CATCH();
     292             :     {
     293           4 :         MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
     294           4 :         ErrorData  *errdata = CopyErrorData();
     295             : 
     296             :         /*
     297             :          * Determine whether to try to reestablish the connection.
     298             :          *
     299             :          * After a broken connection is detected in libpq, any error other
     300             :          * than connection failure (e.g., out-of-memory) can be thrown
     301             :          * somewhere between return from libpq and the expected ereport() call
     302             :          * in pgfdw_report_error(). In this case, since PQstatus() indicates
     303             :          * CONNECTION_BAD, checking only PQstatus() causes the false detection
     304             :          * of connection failure. To avoid this, we also verify that the
     305             :          * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
     306             :          * checking only the sqlstate can cause another false detection
     307             :          * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
     308             :          * for any libpq-originated error condition.
     309             :          */
     310           4 :         if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
     311           4 :             PQstatus(entry->conn) != CONNECTION_BAD ||
     312           4 :             entry->xact_depth > 0)
     313             :         {
     314           2 :             MemoryContextSwitchTo(ecxt);
     315           2 :             PG_RE_THROW();
     316             :         }
     317             : 
     318             :         /* Clean up the error state */
     319           2 :         FlushErrorState();
     320           2 :         FreeErrorData(errdata);
     321           2 :         errdata = NULL;
     322             : 
     323           2 :         retry = true;
     324             :     }
     325        4248 :     PG_END_TRY();
     326             : 
     327             :     /*
     328             :      * If a broken connection is detected, disconnect it, reestablish a new
     329             :      * connection and retry a new remote transaction. If connection failure is
     330             :      * reported again, we give up getting a connection.
     331             :      */
     332        4248 :     if (retry)
     333             :     {
     334             :         Assert(entry->xact_depth == 0);
     335             : 
     336           2 :         ereport(DEBUG3,
     337             :                 (errmsg_internal("could not start remote transaction on connection %p",
     338             :                                  entry->conn)),
     339             :                 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
     340             : 
     341           2 :         elog(DEBUG3, "closing connection %p to reestablish a new one",
     342             :              entry->conn);
     343           2 :         disconnect_pg_server(entry);
     344             : 
     345           2 :         make_new_connection(entry, user);
     346             : 
     347           2 :         begin_remote_xact(entry);
     348             :     }
     349             : 
     350             :     /* Remember if caller will prepare statements */
     351        4248 :     entry->have_prep_stmt |= will_prep_stmt;
     352             : 
     353             :     /* If caller needs access to the per-connection state, return it. */
     354        4248 :     if (state)
     355        1454 :         *state = &entry->state;
     356             : 
     357        4248 :     return entry->conn;
     358             : }
     359             : 
     360             : /*
     361             :  * Reset all transient state fields in the cached connection entry and
     362             :  * establish new connection to the remote server.
     363             :  */
     364             : static void
     365         142 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
     366             : {
     367         142 :     ForeignServer *server = GetForeignServer(user->serverid);
     368             :     ListCell   *lc;
     369             : 
     370             :     Assert(entry->conn == NULL);
     371             : 
     372             :     /* Reset all transient state fields, to be sure all are clean */
     373         142 :     entry->xact_depth = 0;
     374         142 :     entry->have_prep_stmt = false;
     375         142 :     entry->have_error = false;
     376         142 :     entry->changing_xact_state = false;
     377         142 :     entry->invalidated = false;
     378         142 :     entry->serverid = server->serverid;
     379         142 :     entry->server_hashvalue =
     380         142 :         GetSysCacheHashValue1(FOREIGNSERVEROID,
     381             :                               ObjectIdGetDatum(server->serverid));
     382         142 :     entry->mapping_hashvalue =
     383         142 :         GetSysCacheHashValue1(USERMAPPINGOID,
     384             :                               ObjectIdGetDatum(user->umid));
     385         142 :     memset(&entry->state, 0, sizeof(entry->state));
     386             : 
     387             :     /*
     388             :      * Determine whether to keep the connection that we're about to make here
     389             :      * open even after the transaction using it ends, so that the subsequent
     390             :      * transactions can re-use it.
     391             :      *
     392             :      * By default, all the connections to any foreign servers are kept open.
     393             :      *
     394             :      * Also determine whether to commit/abort (sub)transactions opened on the
     395             :      * remote server in parallel at (sub)transaction end, which is disabled by
     396             :      * default.
     397             :      *
     398             :      * Note: it's enough to determine these only when making a new connection
     399             :      * because if these settings for it are changed, it will be closed and
     400             :      * re-made later.
     401             :      */
     402         142 :     entry->keep_connections = true;
     403         142 :     entry->parallel_commit = false;
     404         142 :     entry->parallel_abort = false;
     405         630 :     foreach(lc, server->options)
     406             :     {
     407         488 :         DefElem    *def = (DefElem *) lfirst(lc);
     408             : 
     409         488 :         if (strcmp(def->defname, "keep_connections") == 0)
     410          16 :             entry->keep_connections = defGetBoolean(def);
     411         472 :         else if (strcmp(def->defname, "parallel_commit") == 0)
     412           4 :             entry->parallel_commit = defGetBoolean(def);
     413         468 :         else if (strcmp(def->defname, "parallel_abort") == 0)
     414           4 :             entry->parallel_abort = defGetBoolean(def);
     415             :     }
     416             : 
     417             :     /* Now try to make the connection */
     418         142 :     entry->conn = connect_pg_server(server, user);
     419             : 
     420         130 :     elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
     421             :          entry->conn, server->servername, user->umid, user->userid);
     422         130 : }
     423             : 
     424             : /*
     425             :  * Check that non-superuser has used password or delegated credentials
     426             :  * to establish connection; otherwise, he's piggybacking on the
     427             :  * postgres server's user identity. See also dblink_security_check()
     428             :  * in contrib/dblink and check_conn_params.
     429             :  */
     430             : static void
     431         126 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
     432             : {
     433             :     /* Superusers bypass the check */
     434         126 :     if (superuser_arg(user->userid))
     435         118 :         return;
     436             : 
     437             : #ifdef ENABLE_GSS
     438             :     /* Connected via GSSAPI with delegated credentials- all good. */
     439             :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
     440             :         return;
     441             : #endif
     442             : 
     443             :     /* Ok if superuser set PW required false. */
     444           8 :     if (!UserMappingPasswordRequired(user))
     445           4 :         return;
     446             : 
     447             :     /* Connected via PW, with PW required true, and provided non-empty PW. */
     448           4 :     if (PQconnectionUsedPassword(conn))
     449             :     {
     450             :         /* ok if params contain a non-empty password */
     451           0 :         for (int i = 0; keywords[i] != NULL; i++)
     452             :         {
     453           0 :             if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     454           0 :                 return;
     455             :         }
     456             :     }
     457             : 
     458           4 :     ereport(ERROR,
     459             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     460             :              errmsg("password or GSSAPI delegated credentials required"),
     461             :              errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
     462             :              errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
     463             : }
     464             : 
     465             : /*
     466             :  * Connect to remote server using specified server and user mapping properties.
     467             :  */
     468             : static PGconn *
     469         142 : connect_pg_server(ForeignServer *server, UserMapping *user)
     470             : {
     471         142 :     PGconn     *volatile conn = NULL;
     472             : 
     473             :     /*
     474             :      * Use PG_TRY block to ensure closing connection on error.
     475             :      */
     476         142 :     PG_TRY();
     477             :     {
     478             :         const char **keywords;
     479             :         const char **values;
     480         142 :         char       *appname = NULL;
     481             :         int         n;
     482             : 
     483             :         /*
     484             :          * Construct connection params from generic options of ForeignServer
     485             :          * and UserMapping.  (Some of them might not be libpq options, in
     486             :          * which case we'll just waste a few array slots.)  Add 4 extra slots
     487             :          * for application_name, fallback_application_name, client_encoding,
     488             :          * end marker.
     489             :          */
     490         142 :         n = list_length(server->options) + list_length(user->options) + 4 + 2;
     491         142 :         keywords = (const char **) palloc(n * sizeof(char *));
     492         142 :         values = (const char **) palloc(n * sizeof(char *));
     493             : 
     494         142 :         n = 0;
     495         284 :         n += ExtractConnectionOptions(server->options,
     496         142 :                                       keywords + n, values + n);
     497         284 :         n += ExtractConnectionOptions(user->options,
     498         142 :                                       keywords + n, values + n);
     499             : 
     500             :         /*
     501             :          * Use pgfdw_application_name as application_name if set.
     502             :          *
     503             :          * PQconnectdbParams() processes the parameter arrays from start to
     504             :          * end. If any key word is repeated, the last value is used. Therefore
     505             :          * note that pgfdw_application_name must be added to the arrays after
     506             :          * options of ForeignServer are, so that it can override
     507             :          * application_name set in ForeignServer.
     508             :          */
     509         142 :         if (pgfdw_application_name && *pgfdw_application_name != '\0')
     510             :         {
     511           2 :             keywords[n] = "application_name";
     512           2 :             values[n] = pgfdw_application_name;
     513           2 :             n++;
     514             :         }
     515             : 
     516             :         /*
     517             :          * Search the parameter arrays to find application_name setting, and
     518             :          * replace escape sequences in it with status information if found.
     519             :          * The arrays are searched backwards because the last value is used if
     520             :          * application_name is repeatedly set.
     521             :          */
     522         378 :         for (int i = n - 1; i >= 0; i--)
     523             :         {
     524         272 :             if (strcmp(keywords[i], "application_name") == 0 &&
     525          36 :                 *(values[i]) != '\0')
     526             :             {
     527             :                 /*
     528             :                  * Use this application_name setting if it's not empty string
     529             :                  * even after any escape sequences in it are replaced.
     530             :                  */
     531          36 :                 appname = process_pgfdw_appname(values[i]);
     532          36 :                 if (appname[0] != '\0')
     533             :                 {
     534          36 :                     values[i] = appname;
     535          36 :                     break;
     536             :                 }
     537             : 
     538             :                 /*
     539             :                  * This empty application_name is not used, so we set
     540             :                  * values[i] to NULL and keep searching the array to find the
     541             :                  * next one.
     542             :                  */
     543           0 :                 values[i] = NULL;
     544           0 :                 pfree(appname);
     545           0 :                 appname = NULL;
     546             :             }
     547             :         }
     548             : 
     549             :         /* Use "postgres_fdw" as fallback_application_name */
     550         142 :         keywords[n] = "fallback_application_name";
     551         142 :         values[n] = "postgres_fdw";
     552         142 :         n++;
     553             : 
     554             :         /* Set client_encoding so that libpq can convert encoding properly. */
     555         142 :         keywords[n] = "client_encoding";
     556         142 :         values[n] = GetDatabaseEncodingName();
     557         142 :         n++;
     558             : 
     559         142 :         if (MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
     560             :         {
     561             :             int         len;
     562             :             int         encoded_len;
     563             : 
     564           8 :             keywords[n] = "scram_client_key";
     565           8 :             len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
     566             :             /* don't forget the zero-terminator */
     567           8 :             values[n] = palloc0(len + 1);
     568          16 :             encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ClientKey,
     569             :                                         sizeof(MyProcPort->scram_ClientKey),
     570           8 :                                         (char *) values[n], len);
     571           8 :             if (encoded_len < 0)
     572           0 :                 elog(ERROR, "could not encode SCRAM client key");
     573           8 :             n++;
     574             : 
     575           8 :             keywords[n] = "scram_server_key";
     576           8 :             len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
     577             :             /* don't forget the zero-terminator */
     578           8 :             values[n] = palloc0(len + 1);
     579          16 :             encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ServerKey,
     580             :                                         sizeof(MyProcPort->scram_ServerKey),
     581           8 :                                         (char *) values[n], len);
     582           8 :             if (encoded_len < 0)
     583           0 :                 elog(ERROR, "could not encode SCRAM server key");
     584           8 :             n++;
     585             :         }
     586             : 
     587         142 :         keywords[n] = values[n] = NULL;
     588             : 
     589             :         /*
     590             :          * Verify the set of connection parameters only if scram pass-through
     591             :          * is not being used because the password is not necessary.
     592             :          */
     593         142 :         if (!(MyProcPort->has_scram_keys && UseScramPassthrough(server, user)))
     594         134 :             check_conn_params(keywords, values, user);
     595             : 
     596             :         /* first time, allocate or get the custom wait event */
     597         138 :         if (pgfdw_we_connect == 0)
     598          18 :             pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
     599             : 
     600             :         /* OK to make connection */
     601         138 :         conn = libpqsrv_connect_params(keywords, values,
     602             :                                        false,   /* expand_dbname */
     603             :                                        pgfdw_we_connect);
     604             : 
     605         138 :         if (!conn || PQstatus(conn) != CONNECTION_OK)
     606           4 :             ereport(ERROR,
     607             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     608             :                      errmsg("could not connect to server \"%s\"",
     609             :                             server->servername),
     610             :                      errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
     611             : 
     612             :         /*
     613             :          * Perform post-connection security checks only if scram pass-through
     614             :          * is not being used because the password is not necessary.
     615             :          */
     616         134 :         if (!(MyProcPort->has_scram_keys && UseScramPassthrough(server, user)))
     617         126 :             pgfdw_security_check(keywords, values, user, conn);
     618             : 
     619             :         /* Prepare new session for use */
     620         130 :         configure_remote_session(conn);
     621             : 
     622         130 :         if (appname != NULL)
     623          36 :             pfree(appname);
     624         130 :         pfree(keywords);
     625         130 :         pfree(values);
     626             :     }
     627          12 :     PG_CATCH();
     628             :     {
     629          12 :         libpqsrv_disconnect(conn);
     630          12 :         PG_RE_THROW();
     631             :     }
     632         130 :     PG_END_TRY();
     633             : 
     634         130 :     return conn;
     635             : }
     636             : 
     637             : /*
     638             :  * Disconnect any open connection for a connection cache entry.
     639             :  */
     640             : static void
     641         112 : disconnect_pg_server(ConnCacheEntry *entry)
     642             : {
     643         112 :     if (entry->conn != NULL)
     644             :     {
     645         112 :         libpqsrv_disconnect(entry->conn);
     646         112 :         entry->conn = NULL;
     647             :     }
     648         112 : }
     649             : 
     650             : /*
     651             :  * Return true if the password_required is defined and false for this user
     652             :  * mapping, otherwise false. The mapping has been pre-validated.
     653             :  */
     654             : static bool
     655          14 : UserMappingPasswordRequired(UserMapping *user)
     656             : {
     657             :     ListCell   *cell;
     658             : 
     659          20 :     foreach(cell, user->options)
     660             :     {
     661          12 :         DefElem    *def = (DefElem *) lfirst(cell);
     662             : 
     663          12 :         if (strcmp(def->defname, "password_required") == 0)
     664           6 :             return defGetBoolean(def);
     665             :     }
     666             : 
     667           8 :     return true;
     668             : }
     669             : 
     670             : static bool
     671          24 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
     672             : {
     673             :     ListCell   *cell;
     674             : 
     675          96 :     foreach(cell, server->options)
     676             :     {
     677          96 :         DefElem    *def = (DefElem *) lfirst(cell);
     678             : 
     679          96 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
     680          24 :             return defGetBoolean(def);
     681             :     }
     682             : 
     683           0 :     foreach(cell, user->options)
     684             :     {
     685           0 :         DefElem    *def = (DefElem *) lfirst(cell);
     686             : 
     687           0 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
     688           0 :             return defGetBoolean(def);
     689             :     }
     690             : 
     691           0 :     return false;
     692             : }
     693             : 
     694             : /*
     695             :  * For non-superusers, insist that the connstr specify a password or that the
     696             :  * user provided their own GSSAPI delegated credentials.  This
     697             :  * prevents a password from being picked up from .pgpass, a service file, the
     698             :  * environment, etc.  We don't want the postgres user's passwords,
     699             :  * certificates, etc to be accessible to non-superusers.  (See also
     700             :  * dblink_connstr_check in contrib/dblink.)
     701             :  */
     702             : static void
     703         134 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
     704             : {
     705             :     int         i;
     706             : 
     707             :     /* no check required if superuser */
     708         134 :     if (superuser_arg(user->userid))
     709         122 :         return;
     710             : 
     711             : #ifdef ENABLE_GSS
     712             :     /* ok if the user provided their own delegated credentials */
     713             :     if (be_gssapi_get_delegation(MyProcPort))
     714             :         return;
     715             : #endif
     716             : 
     717             :     /* ok if params contain a non-empty password */
     718          48 :     for (i = 0; keywords[i] != NULL; i++)
     719             :     {
     720          42 :         if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     721           6 :             return;
     722             :     }
     723             : 
     724             :     /* ok if the superuser explicitly said so at user mapping creation time */
     725           6 :     if (!UserMappingPasswordRequired(user))
     726           2 :         return;
     727             : 
     728           4 :     ereport(ERROR,
     729             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     730             :              errmsg("password or GSSAPI delegated credentials required"),
     731             :              errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
     732             : }
     733             : 
     734             : /*
     735             :  * Issue SET commands to make sure remote session is configured properly.
     736             :  *
     737             :  * We do this just once at connection, assuming nothing will change the
     738             :  * values later.  Since we'll never send volatile function calls to the
     739             :  * remote, there shouldn't be any way to break this assumption from our end.
     740             :  * It's possible to think of ways to break it at the remote end, eg making
     741             :  * a foreign table point to a view that includes a set_config call ---
     742             :  * but once you admit the possibility of a malicious view definition,
     743             :  * there are any number of ways to break things.
     744             :  */
     745             : static void
     746         130 : configure_remote_session(PGconn *conn)
     747             : {
     748         130 :     int         remoteversion = PQserverVersion(conn);
     749             : 
     750             :     /* Force the search path to contain only pg_catalog (see deparse.c) */
     751         130 :     do_sql_command(conn, "SET search_path = pg_catalog");
     752             : 
     753             :     /*
     754             :      * Set remote timezone; this is basically just cosmetic, since all
     755             :      * transmitted and returned timestamptzs should specify a zone explicitly
     756             :      * anyway.  However it makes the regression test outputs more predictable.
     757             :      *
     758             :      * We don't risk setting remote zone equal to ours, since the remote
     759             :      * server might use a different timezone database.  Instead, use GMT
     760             :      * (quoted, because very old servers are picky about case).  That's
     761             :      * guaranteed to work regardless of the remote's timezone database,
     762             :      * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
     763             :      */
     764         130 :     do_sql_command(conn, "SET timezone = 'GMT'");
     765             : 
     766             :     /*
     767             :      * Set values needed to ensure unambiguous data output from remote.  (This
     768             :      * logic should match what pg_dump does.  See also set_transmission_modes
     769             :      * in postgres_fdw.c.)
     770             :      */
     771         130 :     do_sql_command(conn, "SET datestyle = ISO");
     772         130 :     if (remoteversion >= 80400)
     773         130 :         do_sql_command(conn, "SET intervalstyle = postgres");
     774         130 :     if (remoteversion >= 90000)
     775         130 :         do_sql_command(conn, "SET extra_float_digits = 3");
     776             :     else
     777           0 :         do_sql_command(conn, "SET extra_float_digits = 2");
     778         130 : }
     779             : 
     780             : /*
     781             :  * Convenience subroutine to issue a non-data-returning SQL command to remote
     782             :  */
     783             : void
     784        3520 : do_sql_command(PGconn *conn, const char *sql)
     785             : {
     786        3520 :     do_sql_command_begin(conn, sql);
     787        3520 :     do_sql_command_end(conn, sql, false);
     788        3514 : }
     789             : 
     790             : static void
     791        3556 : do_sql_command_begin(PGconn *conn, const char *sql)
     792             : {
     793        3556 :     if (!PQsendQuery(conn, sql))
     794           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     795        3556 : }
     796             : 
     797             : static void
     798        3556 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
     799             : {
     800             :     PGresult   *res;
     801             : 
     802             :     /*
     803             :      * If requested, consume whatever data is available from the socket. (Note
     804             :      * that if all data is available, this allows pgfdw_get_result to call
     805             :      * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
     806             :      * would be large compared to the overhead of PQconsumeInput.)
     807             :      */
     808        3556 :     if (consume_input && !PQconsumeInput(conn))
     809           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     810        3556 :     res = pgfdw_get_result(conn);
     811        3556 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     812           6 :         pgfdw_report_error(ERROR, res, conn, true, sql);
     813        3550 :     PQclear(res);
     814        3550 : }
     815             : 
     816             : /*
     817             :  * Start remote transaction or subtransaction, if needed.
     818             :  *
     819             :  * Note that we always use at least REPEATABLE READ in the remote session.
     820             :  * This is so that, if a query initiates multiple scans of the same or
     821             :  * different foreign tables, we will get snapshot-consistent results from
     822             :  * those scans.  A disadvantage is that we can't provide sane emulation of
     823             :  * READ COMMITTED behavior --- it would be nice if we had some other way to
     824             :  * control which remote queries share a snapshot.
     825             :  */
     826             : static void
     827        4252 : begin_remote_xact(ConnCacheEntry *entry)
     828             : {
     829        4252 :     int         curlevel = GetCurrentTransactionNestLevel();
     830             : 
     831             :     /* Start main transaction if we haven't yet */
     832        4252 :     if (entry->xact_depth <= 0)
     833             :     {
     834             :         const char *sql;
     835             : 
     836        1464 :         elog(DEBUG3, "starting remote transaction on connection %p",
     837             :              entry->conn);
     838             : 
     839        1464 :         if (IsolationIsSerializable())
     840           0 :             sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
     841             :         else
     842        1464 :             sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
     843        1464 :         entry->changing_xact_state = true;
     844        1464 :         do_sql_command(entry->conn, sql);
     845        1462 :         entry->xact_depth = 1;
     846        1462 :         entry->changing_xact_state = false;
     847             :     }
     848             : 
     849             :     /*
     850             :      * If we're in a subtransaction, stack up savepoints to match our level.
     851             :      * This ensures we can rollback just the desired effects when a
     852             :      * subtransaction aborts.
     853             :      */
     854        4278 :     while (entry->xact_depth < curlevel)
     855             :     {
     856             :         char        sql[64];
     857             : 
     858          30 :         snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
     859          30 :         entry->changing_xact_state = true;
     860          30 :         do_sql_command(entry->conn, sql);
     861          28 :         entry->xact_depth++;
     862          28 :         entry->changing_xact_state = false;
     863             :     }
     864        4248 : }
     865             : 
     866             : /*
     867             :  * Release connection reference count created by calling GetConnection.
     868             :  */
     869             : void
     870        4142 : ReleaseConnection(PGconn *conn)
     871             : {
     872             :     /*
     873             :      * Currently, we don't actually track connection references because all
     874             :      * cleanup is managed on a transaction or subtransaction basis instead. So
     875             :      * there's nothing to do here.
     876             :      */
     877        4142 : }
     878             : 
     879             : /*
     880             :  * Assign a "unique" number for a cursor.
     881             :  *
     882             :  * These really only need to be unique per connection within a transaction.
     883             :  * For the moment we ignore the per-connection point and assign them across
     884             :  * all connections in the transaction, but we ask for the connection to be
     885             :  * supplied in case we want to refine that.
     886             :  *
     887             :  * Note that even if wraparound happens in a very long transaction, actual
     888             :  * collisions are highly improbable; just be sure to use %u not %d to print.
     889             :  */
     890             : unsigned int
     891        1048 : GetCursorNumber(PGconn *conn)
     892             : {
     893        1048 :     return ++cursor_number;
     894             : }
     895             : 
     896             : /*
     897             :  * Assign a "unique" number for a prepared statement.
     898             :  *
     899             :  * This works much like GetCursorNumber, except that we never reset the counter
     900             :  * within a session.  That's because we can't be 100% sure we've gotten rid
     901             :  * of all prepared statements on all connections, and it's not really worth
     902             :  * increasing the risk of prepared-statement name collisions by resetting.
     903             :  */
     904             : unsigned int
     905         356 : GetPrepStmtNumber(PGconn *conn)
     906             : {
     907         356 :     return ++prep_stmt_number;
     908             : }
     909             : 
     910             : /*
     911             :  * Submit a query and wait for the result.
     912             :  *
     913             :  * Since we don't use non-blocking mode, this can't process interrupts while
     914             :  * pushing the query text to the server.  That risk is relatively small, so we
     915             :  * ignore that for now.
     916             :  *
     917             :  * Caller is responsible for the error handling on the result.
     918             :  */
     919             : PGresult *
     920        7896 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
     921             : {
     922             :     /* First, process a pending asynchronous request, if any. */
     923        7896 :     if (state && state->pendingAreq)
     924           8 :         process_pending_request(state->pendingAreq);
     925             : 
     926        7896 :     if (!PQsendQuery(conn, query))
     927           0 :         return NULL;
     928        7896 :     return pgfdw_get_result(conn);
     929             : }
     930             : 
     931             : /*
     932             :  * Wrap libpqsrv_get_result_last(), adding wait event.
     933             :  *
     934             :  * Caller is responsible for the error handling on the result.
     935             :  */
     936             : PGresult *
     937       15990 : pgfdw_get_result(PGconn *conn)
     938             : {
     939       15990 :     return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
     940             : }
     941             : 
     942             : /*
     943             :  * Report an error we got from the remote server.
     944             :  *
     945             :  * elevel: error level to use (typically ERROR, but might be less)
     946             :  * res: PGresult containing the error
     947             :  * conn: connection we did the query on
     948             :  * clear: if true, PQclear the result (otherwise caller will handle it)
     949             :  * sql: NULL, or text of remote command we tried to execute
     950             :  *
     951             :  * Note: callers that choose not to throw ERROR for a remote error are
     952             :  * responsible for making sure that the associated ConnCacheEntry gets
     953             :  * marked with have_error = true.
     954             :  */
     955             : void
     956          32 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
     957             :                    bool clear, const char *sql)
     958             : {
     959             :     /* If requested, PGresult must be released before leaving this function. */
     960          32 :     PG_TRY();
     961             :     {
     962          32 :         char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     963          32 :         char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
     964          32 :         char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
     965          32 :         char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
     966          32 :         char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
     967             :         int         sqlstate;
     968             : 
     969          32 :         if (diag_sqlstate)
     970          28 :             sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
     971             :                                      diag_sqlstate[1],
     972             :                                      diag_sqlstate[2],
     973             :                                      diag_sqlstate[3],
     974             :                                      diag_sqlstate[4]);
     975             :         else
     976           4 :             sqlstate = ERRCODE_CONNECTION_FAILURE;
     977             : 
     978             :         /*
     979             :          * If we don't get a message from the PGresult, try the PGconn.  This
     980             :          * is needed because for connection-level failures, PQgetResult may
     981             :          * just return NULL, not a PGresult at all.
     982             :          */
     983          32 :         if (message_primary == NULL)
     984           4 :             message_primary = pchomp(PQerrorMessage(conn));
     985             : 
     986          32 :         ereport(elevel,
     987             :                 (errcode(sqlstate),
     988             :                  (message_primary != NULL && message_primary[0] != '\0') ?
     989             :                  errmsg_internal("%s", message_primary) :
     990             :                  errmsg("could not obtain message string for remote error"),
     991             :                  message_detail ? errdetail_internal("%s", message_detail) : 0,
     992             :                  message_hint ? errhint("%s", message_hint) : 0,
     993             :                  message_context ? errcontext("%s", message_context) : 0,
     994             :                  sql ? errcontext("remote SQL command: %s", sql) : 0));
     995             :     }
     996          32 :     PG_FINALLY();
     997             :     {
     998          32 :         if (clear)
     999          30 :             PQclear(res);
    1000             :     }
    1001          32 :     PG_END_TRY();
    1002           0 : }
    1003             : 
    1004             : /*
    1005             :  * pgfdw_xact_callback --- cleanup at main-transaction end.
    1006             :  *
    1007             :  * This runs just late enough that it must not enter user-defined code
    1008             :  * locally.  (Entering such code on the remote side is fine.  Its remote
    1009             :  * COMMIT TRANSACTION may run deferred triggers.)
    1010             :  */
    1011             : static void
    1012        7726 : pgfdw_xact_callback(XactEvent event, void *arg)
    1013             : {
    1014             :     HASH_SEQ_STATUS scan;
    1015             :     ConnCacheEntry *entry;
    1016        7726 :     List       *pending_entries = NIL;
    1017        7726 :     List       *cancel_requested = NIL;
    1018             : 
    1019             :     /* Quick exit if no connections were touched in this transaction. */
    1020        7726 :     if (!xact_got_connection)
    1021        6326 :         return;
    1022             : 
    1023             :     /*
    1024             :      * Scan all connection cache entries to find open remote transactions, and
    1025             :      * close them.
    1026             :      */
    1027        1400 :     hash_seq_init(&scan, ConnectionHash);
    1028        7192 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1029             :     {
    1030             :         PGresult   *res;
    1031             : 
    1032             :         /* Ignore cache entry if no open connection right now */
    1033        5794 :         if (entry->conn == NULL)
    1034        3280 :             continue;
    1035             : 
    1036             :         /* If it has an open remote transaction, try to close it */
    1037        2514 :         if (entry->xact_depth > 0)
    1038             :         {
    1039        1464 :             elog(DEBUG3, "closing remote transaction on connection %p",
    1040             :                  entry->conn);
    1041             : 
    1042        1464 :             switch (event)
    1043             :             {
    1044        1374 :                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
    1045             :                 case XACT_EVENT_PRE_COMMIT:
    1046             : 
    1047             :                     /*
    1048             :                      * If abort cleanup previously failed for this connection,
    1049             :                      * we can't issue any more commands against it.
    1050             :                      */
    1051        1374 :                     pgfdw_reject_incomplete_xact_state_change(entry);
    1052             : 
    1053             :                     /* Commit all remote transactions during pre-commit */
    1054        1374 :                     entry->changing_xact_state = true;
    1055        1374 :                     if (entry->parallel_commit)
    1056             :                     {
    1057          32 :                         do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
    1058          32 :                         pending_entries = lappend(pending_entries, entry);
    1059          32 :                         continue;
    1060             :                     }
    1061        1342 :                     do_sql_command(entry->conn, "COMMIT TRANSACTION");
    1062        1342 :                     entry->changing_xact_state = false;
    1063             : 
    1064             :                     /*
    1065             :                      * If there were any errors in subtransactions, and we
    1066             :                      * made prepared statements, do a DEALLOCATE ALL to make
    1067             :                      * sure we get rid of all prepared statements. This is
    1068             :                      * annoying and not terribly bulletproof, but it's
    1069             :                      * probably not worth trying harder.
    1070             :                      *
    1071             :                      * DEALLOCATE ALL only exists in 8.3 and later, so this
    1072             :                      * constrains how old a server postgres_fdw can
    1073             :                      * communicate with.  We intentionally ignore errors in
    1074             :                      * the DEALLOCATE, so that we can hobble along to some
    1075             :                      * extent with older servers (leaking prepared statements
    1076             :                      * as we go; but we don't really support update operations
    1077             :                      * pre-8.3 anyway).
    1078             :                      */
    1079        1342 :                     if (entry->have_prep_stmt && entry->have_error)
    1080             :                     {
    1081           0 :                         res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
    1082             :                                                NULL);
    1083           0 :                         PQclear(res);
    1084             :                     }
    1085        1342 :                     entry->have_prep_stmt = false;
    1086        1342 :                     entry->have_error = false;
    1087        1342 :                     break;
    1088           2 :                 case XACT_EVENT_PRE_PREPARE:
    1089             : 
    1090             :                     /*
    1091             :                      * We disallow any remote transactions, since it's not
    1092             :                      * very reasonable to hold them open until the prepared
    1093             :                      * transaction is committed.  For the moment, throw error
    1094             :                      * unconditionally; later we might allow read-only cases.
    1095             :                      * Note that the error will cause us to come right back
    1096             :                      * here with event == XACT_EVENT_ABORT, so we'll clean up
    1097             :                      * the connection state at that point.
    1098             :                      */
    1099           2 :                     ereport(ERROR,
    1100             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1101             :                              errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
    1102             :                     break;
    1103           0 :                 case XACT_EVENT_PARALLEL_COMMIT:
    1104             :                 case XACT_EVENT_COMMIT:
    1105             :                 case XACT_EVENT_PREPARE:
    1106             :                     /* Pre-commit should have closed the open transaction */
    1107           0 :                     elog(ERROR, "missed cleaning up connection during pre-commit");
    1108             :                     break;
    1109          88 :                 case XACT_EVENT_PARALLEL_ABORT:
    1110             :                 case XACT_EVENT_ABORT:
    1111             :                     /* Rollback all remote transactions during abort */
    1112          88 :                     if (entry->parallel_abort)
    1113             :                     {
    1114           8 :                         if (pgfdw_abort_cleanup_begin(entry, true,
    1115             :                                                       &pending_entries,
    1116             :                                                       &cancel_requested))
    1117           8 :                             continue;
    1118             :                     }
    1119             :                     else
    1120          80 :                         pgfdw_abort_cleanup(entry, true);
    1121          80 :                     break;
    1122             :             }
    1123        1050 :         }
    1124             : 
    1125             :         /* Reset state to show we're out of a transaction */
    1126        2472 :         pgfdw_reset_xact_state(entry, true);
    1127             :     }
    1128             : 
    1129             :     /* If there are any pending connections, finish cleaning them up */
    1130        1398 :     if (pending_entries || cancel_requested)
    1131             :     {
    1132          30 :         if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
    1133             :             event == XACT_EVENT_PRE_COMMIT)
    1134             :         {
    1135             :             Assert(cancel_requested == NIL);
    1136          26 :             pgfdw_finish_pre_commit_cleanup(pending_entries);
    1137             :         }
    1138             :         else
    1139             :         {
    1140             :             Assert(event == XACT_EVENT_PARALLEL_ABORT ||
    1141             :                    event == XACT_EVENT_ABORT);
    1142           4 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1143             :                                        true);
    1144             :         }
    1145             :     }
    1146             : 
    1147             :     /*
    1148             :      * Regardless of the event type, we can now mark ourselves as out of the
    1149             :      * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
    1150             :      * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
    1151             :      */
    1152        1398 :     xact_got_connection = false;
    1153             : 
    1154             :     /* Also reset cursor numbering for next transaction */
    1155        1398 :     cursor_number = 0;
    1156             : }
    1157             : 
    1158             : /*
    1159             :  * pgfdw_subxact_callback --- cleanup at subtransaction end.
    1160             :  */
    1161             : static void
    1162          76 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
    1163             :                        SubTransactionId parentSubid, void *arg)
    1164             : {
    1165             :     HASH_SEQ_STATUS scan;
    1166             :     ConnCacheEntry *entry;
    1167             :     int         curlevel;
    1168          76 :     List       *pending_entries = NIL;
    1169          76 :     List       *cancel_requested = NIL;
    1170             : 
    1171             :     /* Nothing to do at subxact start, nor after commit. */
    1172          76 :     if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
    1173             :           event == SUBXACT_EVENT_ABORT_SUB))
    1174          46 :         return;
    1175             : 
    1176             :     /* Quick exit if no connections were touched in this transaction. */
    1177          30 :     if (!xact_got_connection)
    1178           0 :         return;
    1179             : 
    1180             :     /*
    1181             :      * Scan all connection cache entries to find open remote subtransactions
    1182             :      * of the current level, and close them.
    1183             :      */
    1184          30 :     curlevel = GetCurrentTransactionNestLevel();
    1185          30 :     hash_seq_init(&scan, ConnectionHash);
    1186         204 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1187             :     {
    1188             :         char        sql[100];
    1189             : 
    1190             :         /*
    1191             :          * We only care about connections with open remote subtransactions of
    1192             :          * the current level.
    1193             :          */
    1194         174 :         if (entry->conn == NULL || entry->xact_depth < curlevel)
    1195         158 :             continue;
    1196             : 
    1197          28 :         if (entry->xact_depth > curlevel)
    1198           0 :             elog(ERROR, "missed cleaning up remote subtransaction at level %d",
    1199             :                  entry->xact_depth);
    1200             : 
    1201          28 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1202             :         {
    1203             :             /*
    1204             :              * If abort cleanup previously failed for this connection, we
    1205             :              * can't issue any more commands against it.
    1206             :              */
    1207          14 :             pgfdw_reject_incomplete_xact_state_change(entry);
    1208             : 
    1209             :             /* Commit all remote subtransactions during pre-commit */
    1210          14 :             snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1211          14 :             entry->changing_xact_state = true;
    1212          14 :             if (entry->parallel_commit)
    1213             :             {
    1214           4 :                 do_sql_command_begin(entry->conn, sql);
    1215           4 :                 pending_entries = lappend(pending_entries, entry);
    1216           4 :                 continue;
    1217             :             }
    1218          10 :             do_sql_command(entry->conn, sql);
    1219          10 :             entry->changing_xact_state = false;
    1220             :         }
    1221             :         else
    1222             :         {
    1223             :             /* Rollback all remote subtransactions during abort */
    1224          14 :             if (entry->parallel_abort)
    1225             :             {
    1226           8 :                 if (pgfdw_abort_cleanup_begin(entry, false,
    1227             :                                               &pending_entries,
    1228             :                                               &cancel_requested))
    1229           8 :                     continue;
    1230             :             }
    1231             :             else
    1232           6 :                 pgfdw_abort_cleanup(entry, false);
    1233             :         }
    1234             : 
    1235             :         /* OK, we're outta that level of subtransaction */
    1236          16 :         pgfdw_reset_xact_state(entry, false);
    1237             :     }
    1238             : 
    1239             :     /* If there are any pending connections, finish cleaning them up */
    1240          30 :     if (pending_entries || cancel_requested)
    1241             :     {
    1242           6 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1243             :         {
    1244             :             Assert(cancel_requested == NIL);
    1245           2 :             pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
    1246             :         }
    1247             :         else
    1248             :         {
    1249             :             Assert(event == SUBXACT_EVENT_ABORT_SUB);
    1250           4 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1251             :                                        false);
    1252             :         }
    1253             :     }
    1254             : }
    1255             : 
    1256             : /*
    1257             :  * Connection invalidation callback function
    1258             :  *
    1259             :  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
    1260             :  * close connections depending on that entry immediately if current transaction
    1261             :  * has not used those connections yet. Otherwise, mark those connections as
    1262             :  * invalid and then make pgfdw_xact_callback() close them at the end of current
    1263             :  * transaction, since they cannot be closed in the midst of the transaction
    1264             :  * using them. Closed connections will be remade at the next opportunity if
    1265             :  * necessary.
    1266             :  *
    1267             :  * Although most cache invalidation callbacks blow away all the related stuff
    1268             :  * regardless of the given hashvalue, connections are expensive enough that
    1269             :  * it's worth trying to avoid that.
    1270             :  *
    1271             :  * NB: We could avoid unnecessary disconnection more strictly by examining
    1272             :  * individual option values, but it seems too much effort for the gain.
    1273             :  */
    1274             : static void
    1275         344 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
    1276             : {
    1277             :     HASH_SEQ_STATUS scan;
    1278             :     ConnCacheEntry *entry;
    1279             : 
    1280             :     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
    1281             : 
    1282             :     /* ConnectionHash must exist already, if we're registered */
    1283         344 :     hash_seq_init(&scan, ConnectionHash);
    1284        2332 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1285             :     {
    1286             :         /* Ignore invalid entries */
    1287        1988 :         if (entry->conn == NULL)
    1288        1624 :             continue;
    1289             : 
    1290             :         /* hashvalue == 0 means a cache reset, must clear all state */
    1291         364 :         if (hashvalue == 0 ||
    1292         258 :             (cacheid == FOREIGNSERVEROID &&
    1293         364 :              entry->server_hashvalue == hashvalue) ||
    1294         106 :             (cacheid == USERMAPPINGOID &&
    1295         106 :              entry->mapping_hashvalue == hashvalue))
    1296             :         {
    1297             :             /*
    1298             :              * Close the connection immediately if it's not used yet in this
    1299             :              * transaction. Otherwise mark it as invalid so that
    1300             :              * pgfdw_xact_callback() can close it at the end of this
    1301             :              * transaction.
    1302             :              */
    1303          98 :             if (entry->xact_depth == 0)
    1304             :             {
    1305          92 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    1306          92 :                 disconnect_pg_server(entry);
    1307             :             }
    1308             :             else
    1309           6 :                 entry->invalidated = true;
    1310             :         }
    1311             :     }
    1312         344 : }
    1313             : 
    1314             : /*
    1315             :  * Raise an error if the given connection cache entry is marked as being
    1316             :  * in the middle of an xact state change.  This should be called at which no
    1317             :  * such change is expected to be in progress; if one is found to be in
    1318             :  * progress, it means that we aborted in the middle of a previous state change
    1319             :  * and now don't know what the remote transaction state actually is.
    1320             :  * Such connections can't safely be further used.  Re-establishing the
    1321             :  * connection would change the snapshot and roll back any writes already
    1322             :  * performed, so that's not an option, either. Thus, we must abort.
    1323             :  */
    1324             : static void
    1325        5650 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
    1326             : {
    1327             :     ForeignServer *server;
    1328             : 
    1329             :     /* nothing to do for inactive entries and entries of sane state */
    1330        5650 :     if (entry->conn == NULL || !entry->changing_xact_state)
    1331        5650 :         return;
    1332             : 
    1333             :     /* make sure this entry is inactive */
    1334           0 :     disconnect_pg_server(entry);
    1335             : 
    1336             :     /* find server name to be shown in the message below */
    1337           0 :     server = GetForeignServer(entry->serverid);
    1338             : 
    1339           0 :     ereport(ERROR,
    1340             :             (errcode(ERRCODE_CONNECTION_EXCEPTION),
    1341             :              errmsg("connection to server \"%s\" was lost",
    1342             :                     server->servername)));
    1343             : }
    1344             : 
    1345             : /*
    1346             :  * Reset state to show we're out of a (sub)transaction.
    1347             :  */
    1348             : static void
    1349        2540 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
    1350             : {
    1351        2540 :     if (toplevel)
    1352             :     {
    1353             :         /* Reset state to show we're out of a transaction */
    1354        2512 :         entry->xact_depth = 0;
    1355             : 
    1356             :         /*
    1357             :          * If the connection isn't in a good idle state, it is marked as
    1358             :          * invalid or keep_connections option of its server is disabled, then
    1359             :          * discard it to recover. Next GetConnection will open a new
    1360             :          * connection.
    1361             :          */
    1362        5022 :         if (PQstatus(entry->conn) != CONNECTION_OK ||
    1363        2510 :             PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
    1364        2510 :             entry->changing_xact_state ||
    1365        2510 :             entry->invalidated ||
    1366        2506 :             !entry->keep_connections)
    1367             :         {
    1368           8 :             elog(DEBUG3, "discarding connection %p", entry->conn);
    1369           8 :             disconnect_pg_server(entry);
    1370             :         }
    1371             :     }
    1372             :     else
    1373             :     {
    1374             :         /* Reset state to show we're out of a subtransaction */
    1375          28 :         entry->xact_depth--;
    1376             :     }
    1377        2540 : }
    1378             : 
    1379             : /*
    1380             :  * Cancel the currently-in-progress query (whose query text we do not have)
    1381             :  * and ignore the result.  Returns true if we successfully cancel the query
    1382             :  * and discard any pending result, and false if not.
    1383             :  *
    1384             :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1385             :  * recursion trouble, we'll end up slamming the connection shut, which will
    1386             :  * necessitate failing the entire toplevel transaction even if subtransactions
    1387             :  * were used.  Try to use WARNING where we can.
    1388             :  *
    1389             :  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
    1390             :  * query text from the pendingAreq saved in the per-connection state, then
    1391             :  * report the query using it.
    1392             :  */
    1393             : static bool
    1394           4 : pgfdw_cancel_query(PGconn *conn)
    1395             : {
    1396           4 :     TimestampTz now = GetCurrentTimestamp();
    1397             :     TimestampTz endtime;
    1398             :     TimestampTz retrycanceltime;
    1399             : 
    1400             :     /*
    1401             :      * If it takes too long to cancel the query and discard the result, assume
    1402             :      * the connection is dead.
    1403             :      */
    1404           4 :     endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
    1405             : 
    1406             :     /*
    1407             :      * Also, lose patience and re-issue the cancel request after a little bit.
    1408             :      * (This serves to close some race conditions.)
    1409             :      */
    1410           4 :     retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
    1411             : 
    1412           4 :     if (!pgfdw_cancel_query_begin(conn, endtime))
    1413           0 :         return false;
    1414           4 :     return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
    1415             : }
    1416             : 
    1417             : /*
    1418             :  * Submit a cancel request to the given connection, waiting only until
    1419             :  * the given time.
    1420             :  *
    1421             :  * We sleep interruptibly until we receive confirmation that the cancel
    1422             :  * request has been accepted, and if it is, return true; if the timeout
    1423             :  * lapses without that, or the request fails for whatever reason, return
    1424             :  * false.
    1425             :  */
    1426             : static bool
    1427           4 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
    1428             : {
    1429           4 :     const char *errormsg = libpqsrv_cancel(conn, endtime);
    1430             : 
    1431           4 :     if (errormsg != NULL)
    1432           0 :         ereport(WARNING,
    1433             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1434             :                 errmsg("could not send cancel request: %s", errormsg));
    1435             : 
    1436           4 :     return errormsg == NULL;
    1437             : }
    1438             : 
    1439             : static bool
    1440           4 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
    1441             :                        TimestampTz retrycanceltime, bool consume_input)
    1442             : {
    1443             :     PGresult   *result;
    1444             :     bool        timed_out;
    1445             : 
    1446             :     /*
    1447             :      * If requested, consume whatever data is available from the socket. (Note
    1448             :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1449             :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1450             :      * which would be large compared to the overhead of PQconsumeInput.)
    1451             :      */
    1452           4 :     if (consume_input && !PQconsumeInput(conn))
    1453             :     {
    1454           0 :         ereport(WARNING,
    1455             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1456             :                  errmsg("could not get result of cancel request: %s",
    1457             :                         pchomp(PQerrorMessage(conn)))));
    1458           0 :         return false;
    1459             :     }
    1460             : 
    1461             :     /* Get and discard the result of the query. */
    1462           4 :     if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
    1463             :                                  &result, &timed_out))
    1464             :     {
    1465           0 :         if (timed_out)
    1466           0 :             ereport(WARNING,
    1467             :                     (errmsg("could not get result of cancel request due to timeout")));
    1468             :         else
    1469           0 :             ereport(WARNING,
    1470             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1471             :                      errmsg("could not get result of cancel request: %s",
    1472             :                             pchomp(PQerrorMessage(conn)))));
    1473             : 
    1474           0 :         return false;
    1475             :     }
    1476           4 :     PQclear(result);
    1477             : 
    1478           4 :     return true;
    1479             : }
    1480             : 
    1481             : /*
    1482             :  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
    1483             :  * result.  If the query is executed without error, the return value is true.
    1484             :  * If the query is executed successfully but returns an error, the return
    1485             :  * value is true if and only if ignore_errors is set.  If the query can't be
    1486             :  * sent or times out, the return value is false.
    1487             :  *
    1488             :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1489             :  * recursion trouble, we'll end up slamming the connection shut, which will
    1490             :  * necessitate failing the entire toplevel transaction even if subtransactions
    1491             :  * were used.  Try to use WARNING where we can.
    1492             :  */
    1493             : static bool
    1494         118 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
    1495             : {
    1496             :     TimestampTz endtime;
    1497             : 
    1498             :     /*
    1499             :      * If it takes too long to execute a cleanup query, assume the connection
    1500             :      * is dead.  It's fairly likely that this is why we aborted in the first
    1501             :      * place (e.g. statement timeout, user cancel), so the timeout shouldn't
    1502             :      * be too long.
    1503             :      */
    1504         118 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1505             :                                           CONNECTION_CLEANUP_TIMEOUT);
    1506             : 
    1507         118 :     if (!pgfdw_exec_cleanup_query_begin(conn, query))
    1508           0 :         return false;
    1509         118 :     return pgfdw_exec_cleanup_query_end(conn, query, endtime,
    1510             :                                         false, ignore_errors);
    1511             : }
    1512             : 
    1513             : static bool
    1514         142 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
    1515             : {
    1516             :     Assert(query != NULL);
    1517             : 
    1518             :     /*
    1519             :      * Submit a query.  Since we don't use non-blocking mode, this also can
    1520             :      * block.  But its risk is relatively small, so we ignore that for now.
    1521             :      */
    1522         142 :     if (!PQsendQuery(conn, query))
    1523             :     {
    1524           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
    1525           0 :         return false;
    1526             :     }
    1527             : 
    1528         142 :     return true;
    1529             : }
    1530             : 
    1531             : static bool
    1532         142 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
    1533             :                              TimestampTz endtime, bool consume_input,
    1534             :                              bool ignore_errors)
    1535             : {
    1536             :     PGresult   *result;
    1537             :     bool        timed_out;
    1538             : 
    1539             :     Assert(query != NULL);
    1540             : 
    1541             :     /*
    1542             :      * If requested, consume whatever data is available from the socket. (Note
    1543             :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1544             :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1545             :      * which would be large compared to the overhead of PQconsumeInput.)
    1546             :      */
    1547         142 :     if (consume_input && !PQconsumeInput(conn))
    1548             :     {
    1549           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
    1550           0 :         return false;
    1551             :     }
    1552             : 
    1553             :     /* Get the result of the query. */
    1554         142 :     if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
    1555             :     {
    1556           0 :         if (timed_out)
    1557           0 :             ereport(WARNING,
    1558             :                     (errmsg("could not get query result due to timeout"),
    1559             :                      errcontext("remote SQL command: %s", query)));
    1560             :         else
    1561           0 :             pgfdw_report_error(WARNING, NULL, conn, false, query);
    1562             : 
    1563           0 :         return false;
    1564             :     }
    1565             : 
    1566             :     /* Issue a warning if not successful. */
    1567         142 :     if (PQresultStatus(result) != PGRES_COMMAND_OK)
    1568             :     {
    1569           0 :         pgfdw_report_error(WARNING, result, conn, true, query);
    1570           0 :         return ignore_errors;
    1571             :     }
    1572         142 :     PQclear(result);
    1573             : 
    1574         142 :     return true;
    1575             : }
    1576             : 
    1577             : /*
    1578             :  * Get, during abort cleanup, the result of a query that is in progress.
    1579             :  * This might be a query that is being interrupted by a cancel request or by
    1580             :  * transaction abort, or it might be a query that was initiated as part of
    1581             :  * transaction abort to get the remote side back to the appropriate state.
    1582             :  *
    1583             :  * endtime is the time at which we should give up and assume the remote side
    1584             :  * is dead.  retrycanceltime is the time at which we should issue a fresh
    1585             :  * cancel request (pass the same value as endtime if this is not wanted).
    1586             :  *
    1587             :  * Returns true if the timeout expired or connection trouble occurred,
    1588             :  * false otherwise.  Sets *result except in case of a true result.
    1589             :  * Sets *timed_out to true only when the timeout expired.
    1590             :  */
    1591             : static bool
    1592         146 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
    1593             :                          TimestampTz retrycanceltime,
    1594             :                          PGresult **result,
    1595             :                          bool *timed_out)
    1596             : {
    1597         146 :     volatile bool failed = false;
    1598         146 :     PGresult   *volatile last_res = NULL;
    1599             : 
    1600         146 :     *result = NULL;
    1601         146 :     *timed_out = false;
    1602             : 
    1603             :     /* In what follows, do not leak any PGresults on an error. */
    1604         146 :     PG_TRY();
    1605             :     {
    1606         146 :         int         canceldelta = RETRY_CANCEL_TIMEOUT * 2;
    1607             : 
    1608             :         for (;;)
    1609         160 :         {
    1610             :             PGresult   *res;
    1611             : 
    1612         442 :             while (PQisBusy(conn))
    1613             :             {
    1614             :                 int         wc;
    1615         136 :                 TimestampTz now = GetCurrentTimestamp();
    1616             :                 long        cur_timeout;
    1617             : 
    1618             :                 /* If timeout has expired, give up. */
    1619         136 :                 if (now >= endtime)
    1620             :                 {
    1621           0 :                     *timed_out = true;
    1622           0 :                     failed = true;
    1623           0 :                     goto exit;
    1624             :                 }
    1625             : 
    1626             :                 /* If we need to re-issue the cancel request, do that. */
    1627         136 :                 if (now >= retrycanceltime)
    1628             :                 {
    1629             :                     /* We ignore failure to issue the repeated request. */
    1630           0 :                     (void) libpqsrv_cancel(conn, endtime);
    1631             : 
    1632             :                     /* Recompute "now" in case that took measurable time. */
    1633           0 :                     now = GetCurrentTimestamp();
    1634             : 
    1635             :                     /* Adjust re-cancel timeout in increasing steps. */
    1636           0 :                     retrycanceltime = TimestampTzPlusMilliseconds(now,
    1637             :                                                                   canceldelta);
    1638           0 :                     canceldelta += canceldelta;
    1639             :                 }
    1640             : 
    1641             :                 /* If timeout has expired, give up, else get sleep time. */
    1642         136 :                 cur_timeout = TimestampDifferenceMilliseconds(now,
    1643             :                                                               Min(endtime,
    1644             :                                                                   retrycanceltime));
    1645         136 :                 if (cur_timeout <= 0)
    1646             :                 {
    1647           0 :                     *timed_out = true;
    1648           0 :                     failed = true;
    1649           0 :                     goto exit;
    1650             :                 }
    1651             : 
    1652             :                 /* first time, allocate or get the custom wait event */
    1653         136 :                 if (pgfdw_we_cleanup_result == 0)
    1654           4 :                     pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
    1655             : 
    1656             :                 /* Sleep until there's something to do */
    1657         136 :                 wc = WaitLatchOrSocket(MyLatch,
    1658             :                                        WL_LATCH_SET | WL_SOCKET_READABLE |
    1659             :                                        WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1660             :                                        PQsocket(conn),
    1661             :                                        cur_timeout, pgfdw_we_cleanup_result);
    1662         136 :                 ResetLatch(MyLatch);
    1663             : 
    1664         136 :                 CHECK_FOR_INTERRUPTS();
    1665             : 
    1666             :                 /* Data available in socket? */
    1667         136 :                 if (wc & WL_SOCKET_READABLE)
    1668             :                 {
    1669         136 :                     if (!PQconsumeInput(conn))
    1670             :                     {
    1671             :                         /* connection trouble */
    1672           0 :                         failed = true;
    1673           0 :                         goto exit;
    1674             :                     }
    1675             :                 }
    1676             :             }
    1677             : 
    1678         306 :             res = PQgetResult(conn);
    1679         306 :             if (res == NULL)
    1680         146 :                 break;          /* query is complete */
    1681             : 
    1682         160 :             PQclear(last_res);
    1683         160 :             last_res = res;
    1684             :         }
    1685         146 : exit:   ;
    1686             :     }
    1687           0 :     PG_CATCH();
    1688             :     {
    1689           0 :         PQclear(last_res);
    1690           0 :         PG_RE_THROW();
    1691             :     }
    1692         146 :     PG_END_TRY();
    1693             : 
    1694         146 :     if (failed)
    1695           0 :         PQclear(last_res);
    1696             :     else
    1697         146 :         *result = last_res;
    1698         146 :     return failed;
    1699             : }
    1700             : 
    1701             : /*
    1702             :  * Abort remote transaction or subtransaction.
    1703             :  *
    1704             :  * "toplevel" should be set to true if toplevel (main) transaction is
    1705             :  * rollbacked, false otherwise.
    1706             :  *
    1707             :  * Set entry->changing_xact_state to false on success, true on failure.
    1708             :  */
    1709             : static void
    1710          86 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
    1711             : {
    1712             :     char        sql[100];
    1713             : 
    1714             :     /*
    1715             :      * Don't try to clean up the connection if we're already in error
    1716             :      * recursion trouble.
    1717             :      */
    1718          86 :     if (in_error_recursion_trouble())
    1719           0 :         entry->changing_xact_state = true;
    1720             : 
    1721             :     /*
    1722             :      * If connection is already unsalvageable, don't touch it further.
    1723             :      */
    1724          86 :     if (entry->changing_xact_state)
    1725           2 :         return;
    1726             : 
    1727             :     /*
    1728             :      * Mark this connection as in the process of changing transaction state.
    1729             :      */
    1730          84 :     entry->changing_xact_state = true;
    1731             : 
    1732             :     /* Assume we might have lost track of prepared statements */
    1733          84 :     entry->have_error = true;
    1734             : 
    1735             :     /*
    1736             :      * If a command has been submitted to the remote server by using an
    1737             :      * asynchronous execution function, the command might not have yet
    1738             :      * completed.  Check to see if a command is still being processed by the
    1739             :      * remote server, and if so, request cancellation of the command.
    1740             :      */
    1741          84 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
    1742           4 :         !pgfdw_cancel_query(entry->conn))
    1743           0 :         return;                 /* Unable to cancel running query */
    1744             : 
    1745          84 :     CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1746          84 :     if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
    1747           0 :         return;                 /* Unable to abort remote (sub)transaction */
    1748             : 
    1749          84 :     if (toplevel)
    1750             :     {
    1751          78 :         if (entry->have_prep_stmt && entry->have_error &&
    1752          34 :             !pgfdw_exec_cleanup_query(entry->conn,
    1753             :                                       "DEALLOCATE ALL",
    1754             :                                       true))
    1755           0 :             return;             /* Trouble clearing prepared statements */
    1756             : 
    1757          78 :         entry->have_prep_stmt = false;
    1758          78 :         entry->have_error = false;
    1759             :     }
    1760             : 
    1761             :     /*
    1762             :      * If pendingAreq of the per-connection state is not NULL, it means that
    1763             :      * an asynchronous fetch begun by fetch_more_data_begin() was not done
    1764             :      * successfully and thus the per-connection state was not reset in
    1765             :      * fetch_more_data(); in that case reset the per-connection state here.
    1766             :      */
    1767          84 :     if (entry->state.pendingAreq)
    1768           2 :         memset(&entry->state, 0, sizeof(entry->state));
    1769             : 
    1770             :     /* Disarm changing_xact_state if it all worked */
    1771          84 :     entry->changing_xact_state = false;
    1772             : }
    1773             : 
    1774             : /*
    1775             :  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
    1776             :  * don't wait for the result.
    1777             :  *
    1778             :  * Returns true if the abort command or cancel request is successfully issued,
    1779             :  * false otherwise.  If the abort command is successfully issued, the given
    1780             :  * connection cache entry is appended to *pending_entries.  Otherwise, if the
    1781             :  * cancel request is successfully issued, it is appended to *cancel_requested.
    1782             :  */
    1783             : static bool
    1784          16 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
    1785             :                           List **pending_entries, List **cancel_requested)
    1786             : {
    1787             :     /*
    1788             :      * Don't try to clean up the connection if we're already in error
    1789             :      * recursion trouble.
    1790             :      */
    1791          16 :     if (in_error_recursion_trouble())
    1792           0 :         entry->changing_xact_state = true;
    1793             : 
    1794             :     /*
    1795             :      * If connection is already unsalvageable, don't touch it further.
    1796             :      */
    1797          16 :     if (entry->changing_xact_state)
    1798           0 :         return false;
    1799             : 
    1800             :     /*
    1801             :      * Mark this connection as in the process of changing transaction state.
    1802             :      */
    1803          16 :     entry->changing_xact_state = true;
    1804             : 
    1805             :     /* Assume we might have lost track of prepared statements */
    1806          16 :     entry->have_error = true;
    1807             : 
    1808             :     /*
    1809             :      * If a command has been submitted to the remote server by using an
    1810             :      * asynchronous execution function, the command might not have yet
    1811             :      * completed.  Check to see if a command is still being processed by the
    1812             :      * remote server, and if so, request cancellation of the command.
    1813             :      */
    1814          16 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
    1815             :     {
    1816             :         TimestampTz endtime;
    1817             : 
    1818           0 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1819             :                                               CONNECTION_CLEANUP_TIMEOUT);
    1820           0 :         if (!pgfdw_cancel_query_begin(entry->conn, endtime))
    1821           0 :             return false;       /* Unable to cancel running query */
    1822           0 :         *cancel_requested = lappend(*cancel_requested, entry);
    1823             :     }
    1824             :     else
    1825             :     {
    1826             :         char        sql[100];
    1827             : 
    1828          16 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1829          16 :         if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1830           0 :             return false;       /* Unable to abort remote transaction */
    1831          16 :         *pending_entries = lappend(*pending_entries, entry);
    1832             :     }
    1833             : 
    1834          16 :     return true;
    1835             : }
    1836             : 
    1837             : /*
    1838             :  * Finish pre-commit cleanup of connections on each of which we've sent a
    1839             :  * COMMIT command to the remote server.
    1840             :  */
    1841             : static void
    1842          26 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
    1843             : {
    1844             :     ConnCacheEntry *entry;
    1845          26 :     List       *pending_deallocs = NIL;
    1846             :     ListCell   *lc;
    1847             : 
    1848             :     Assert(pending_entries);
    1849             : 
    1850             :     /*
    1851             :      * Get the result of the COMMIT command for each of the pending entries
    1852             :      */
    1853          58 :     foreach(lc, pending_entries)
    1854             :     {
    1855          32 :         entry = (ConnCacheEntry *) lfirst(lc);
    1856             : 
    1857             :         Assert(entry->changing_xact_state);
    1858             : 
    1859             :         /*
    1860             :          * We might already have received the result on the socket, so pass
    1861             :          * consume_input=true to try to consume it first
    1862             :          */
    1863          32 :         do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
    1864          32 :         entry->changing_xact_state = false;
    1865             : 
    1866             :         /* Do a DEALLOCATE ALL in parallel if needed */
    1867          32 :         if (entry->have_prep_stmt && entry->have_error)
    1868             :         {
    1869             :             /* Ignore errors (see notes in pgfdw_xact_callback) */
    1870           4 :             if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
    1871             :             {
    1872           4 :                 pending_deallocs = lappend(pending_deallocs, entry);
    1873           4 :                 continue;
    1874             :             }
    1875             :         }
    1876          28 :         entry->have_prep_stmt = false;
    1877          28 :         entry->have_error = false;
    1878             : 
    1879          28 :         pgfdw_reset_xact_state(entry, true);
    1880             :     }
    1881             : 
    1882             :     /* No further work if no pending entries */
    1883          26 :     if (!pending_deallocs)
    1884          24 :         return;
    1885             : 
    1886             :     /*
    1887             :      * Get the result of the DEALLOCATE command for each of the pending
    1888             :      * entries
    1889             :      */
    1890           6 :     foreach(lc, pending_deallocs)
    1891             :     {
    1892             :         PGresult   *res;
    1893             : 
    1894           4 :         entry = (ConnCacheEntry *) lfirst(lc);
    1895             : 
    1896             :         /* Ignore errors (see notes in pgfdw_xact_callback) */
    1897           8 :         while ((res = PQgetResult(entry->conn)) != NULL)
    1898             :         {
    1899           4 :             PQclear(res);
    1900             :             /* Stop if the connection is lost (else we'll loop infinitely) */
    1901           4 :             if (PQstatus(entry->conn) == CONNECTION_BAD)
    1902           0 :                 break;
    1903             :         }
    1904           4 :         entry->have_prep_stmt = false;
    1905           4 :         entry->have_error = false;
    1906             : 
    1907           4 :         pgfdw_reset_xact_state(entry, true);
    1908             :     }
    1909             : }
    1910             : 
    1911             : /*
    1912             :  * Finish pre-subcommit cleanup of connections on each of which we've sent a
    1913             :  * RELEASE command to the remote server.
    1914             :  */
    1915             : static void
    1916           2 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
    1917             : {
    1918             :     ConnCacheEntry *entry;
    1919             :     char        sql[100];
    1920             :     ListCell   *lc;
    1921             : 
    1922             :     Assert(pending_entries);
    1923             : 
    1924             :     /*
    1925             :      * Get the result of the RELEASE command for each of the pending entries
    1926             :      */
    1927           2 :     snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1928           6 :     foreach(lc, pending_entries)
    1929             :     {
    1930           4 :         entry = (ConnCacheEntry *) lfirst(lc);
    1931             : 
    1932             :         Assert(entry->changing_xact_state);
    1933             : 
    1934             :         /*
    1935             :          * We might already have received the result on the socket, so pass
    1936             :          * consume_input=true to try to consume it first
    1937             :          */
    1938           4 :         do_sql_command_end(entry->conn, sql, true);
    1939           4 :         entry->changing_xact_state = false;
    1940             : 
    1941           4 :         pgfdw_reset_xact_state(entry, false);
    1942             :     }
    1943           2 : }
    1944             : 
    1945             : /*
    1946             :  * Finish abort cleanup of connections on each of which we've sent an abort
    1947             :  * command or cancel request to the remote server.
    1948             :  */
    1949             : static void
    1950           8 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
    1951             :                            bool toplevel)
    1952             : {
    1953           8 :     List       *pending_deallocs = NIL;
    1954             :     ListCell   *lc;
    1955             : 
    1956             :     /*
    1957             :      * For each of the pending cancel requests (if any), get and discard the
    1958             :      * result of the query, and submit an abort command to the remote server.
    1959             :      */
    1960           8 :     if (cancel_requested)
    1961             :     {
    1962           0 :         foreach(lc, cancel_requested)
    1963             :         {
    1964           0 :             ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1965           0 :             TimestampTz now = GetCurrentTimestamp();
    1966             :             TimestampTz endtime;
    1967             :             TimestampTz retrycanceltime;
    1968             :             char        sql[100];
    1969             : 
    1970             :             Assert(entry->changing_xact_state);
    1971             : 
    1972             :             /*
    1973             :              * Set end time.  You might think we should do this before issuing
    1974             :              * cancel request like in normal mode, but that is problematic,
    1975             :              * because if, for example, it took longer than 30 seconds to
    1976             :              * process the first few entries in the cancel_requested list, it
    1977             :              * would cause a timeout error when processing each of the
    1978             :              * remaining entries in the list, leading to slamming that entry's
    1979             :              * connection shut.
    1980             :              */
    1981           0 :             endtime = TimestampTzPlusMilliseconds(now,
    1982             :                                                   CONNECTION_CLEANUP_TIMEOUT);
    1983           0 :             retrycanceltime = TimestampTzPlusMilliseconds(now,
    1984             :                                                           RETRY_CANCEL_TIMEOUT);
    1985             : 
    1986           0 :             if (!pgfdw_cancel_query_end(entry->conn, endtime,
    1987             :                                         retrycanceltime, true))
    1988             :             {
    1989             :                 /* Unable to cancel running query */
    1990           0 :                 pgfdw_reset_xact_state(entry, toplevel);
    1991           0 :                 continue;
    1992             :             }
    1993             : 
    1994             :             /* Send an abort command in parallel if needed */
    1995           0 :             CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1996           0 :             if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1997             :             {
    1998             :                 /* Unable to abort remote (sub)transaction */
    1999           0 :                 pgfdw_reset_xact_state(entry, toplevel);
    2000             :             }
    2001             :             else
    2002           0 :                 pending_entries = lappend(pending_entries, entry);
    2003             :         }
    2004             :     }
    2005             : 
    2006             :     /* No further work if no pending entries */
    2007           8 :     if (!pending_entries)
    2008           0 :         return;
    2009             : 
    2010             :     /*
    2011             :      * Get the result of the abort command for each of the pending entries
    2012             :      */
    2013          24 :     foreach(lc, pending_entries)
    2014             :     {
    2015          16 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2016             :         TimestampTz endtime;
    2017             :         char        sql[100];
    2018             : 
    2019             :         Assert(entry->changing_xact_state);
    2020             : 
    2021             :         /*
    2022             :          * Set end time.  We do this now, not before issuing the command like
    2023             :          * in normal mode, for the same reason as for the cancel_requested
    2024             :          * entries.
    2025             :          */
    2026          16 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2027             :                                               CONNECTION_CLEANUP_TIMEOUT);
    2028             : 
    2029          16 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    2030          16 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
    2031             :                                           true, false))
    2032             :         {
    2033             :             /* Unable to abort remote (sub)transaction */
    2034           0 :             pgfdw_reset_xact_state(entry, toplevel);
    2035           8 :             continue;
    2036             :         }
    2037             : 
    2038          16 :         if (toplevel)
    2039             :         {
    2040             :             /* Do a DEALLOCATE ALL in parallel if needed */
    2041           8 :             if (entry->have_prep_stmt && entry->have_error)
    2042             :             {
    2043           8 :                 if (!pgfdw_exec_cleanup_query_begin(entry->conn,
    2044             :                                                     "DEALLOCATE ALL"))
    2045             :                 {
    2046             :                     /* Trouble clearing prepared statements */
    2047           0 :                     pgfdw_reset_xact_state(entry, toplevel);
    2048             :                 }
    2049             :                 else
    2050           8 :                     pending_deallocs = lappend(pending_deallocs, entry);
    2051           8 :                 continue;
    2052             :             }
    2053           0 :             entry->have_prep_stmt = false;
    2054           0 :             entry->have_error = false;
    2055             :         }
    2056             : 
    2057             :         /* Reset the per-connection state if needed */
    2058           8 :         if (entry->state.pendingAreq)
    2059           0 :             memset(&entry->state, 0, sizeof(entry->state));
    2060             : 
    2061             :         /* We're done with this entry; unset the changing_xact_state flag */
    2062           8 :         entry->changing_xact_state = false;
    2063           8 :         pgfdw_reset_xact_state(entry, toplevel);
    2064             :     }
    2065             : 
    2066             :     /* No further work if no pending entries */
    2067           8 :     if (!pending_deallocs)
    2068           4 :         return;
    2069             :     Assert(toplevel);
    2070             : 
    2071             :     /*
    2072             :      * Get the result of the DEALLOCATE command for each of the pending
    2073             :      * entries
    2074             :      */
    2075          12 :     foreach(lc, pending_deallocs)
    2076             :     {
    2077           8 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2078             :         TimestampTz endtime;
    2079             : 
    2080             :         Assert(entry->changing_xact_state);
    2081             :         Assert(entry->have_prep_stmt);
    2082             :         Assert(entry->have_error);
    2083             : 
    2084             :         /*
    2085             :          * Set end time.  We do this now, not before issuing the command like
    2086             :          * in normal mode, for the same reason as for the cancel_requested
    2087             :          * entries.
    2088             :          */
    2089           8 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2090             :                                               CONNECTION_CLEANUP_TIMEOUT);
    2091             : 
    2092           8 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
    2093             :                                           endtime, true, true))
    2094             :         {
    2095             :             /* Trouble clearing prepared statements */
    2096           0 :             pgfdw_reset_xact_state(entry, toplevel);
    2097           0 :             continue;
    2098             :         }
    2099           8 :         entry->have_prep_stmt = false;
    2100           8 :         entry->have_error = false;
    2101             : 
    2102             :         /* Reset the per-connection state if needed */
    2103           8 :         if (entry->state.pendingAreq)
    2104           0 :             memset(&entry->state, 0, sizeof(entry->state));
    2105             : 
    2106             :         /* We're done with this entry; unset the changing_xact_state flag */
    2107           8 :         entry->changing_xact_state = false;
    2108           8 :         pgfdw_reset_xact_state(entry, toplevel);
    2109             :     }
    2110             : }
    2111             : 
    2112             : /* Number of output arguments (columns) for various API versions */
    2113             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1  2
    2114             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2  5
    2115             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS   5   /* maximum of above */
    2116             : 
    2117             : /*
    2118             :  * Internal function used by postgres_fdw_get_connections variants.
    2119             :  *
    2120             :  * For API version 1.1, this function takes no input parameter and
    2121             :  * returns a set of records with the following values:
    2122             :  *
    2123             :  * - server_name - server name of active connection. In case the foreign server
    2124             :  *   is dropped but still the connection is active, then the server name will
    2125             :  *   be NULL in output.
    2126             :  * - valid - true/false representing whether the connection is valid or not.
    2127             :  *   Note that connections can become invalid in pgfdw_inval_callback.
    2128             :  *
    2129             :  * For API version 1.2 and later, this function takes an input parameter
    2130             :  * to check a connection status and returns the following
    2131             :  * additional values along with the three values from version 1.1:
    2132             :  *
    2133             :  * - user_name - the local user name of the active connection. In case the
    2134             :  *   user mapping is dropped but the connection is still active, then the
    2135             :  *   user name will be NULL in the output.
    2136             :  * - used_in_xact - true if the connection is used in the current transaction.
    2137             :  * - closed - true if the connection is closed.
    2138             :  *
    2139             :  * No records are returned when there are no cached connections at all.
    2140             :  */
    2141             : static void
    2142          26 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
    2143             :                                       enum pgfdwVersion api_version)
    2144             : {
    2145          26 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    2146             :     HASH_SEQ_STATUS scan;
    2147             :     ConnCacheEntry *entry;
    2148             : 
    2149          26 :     InitMaterializedSRF(fcinfo, 0);
    2150             : 
    2151             :     /* If cache doesn't exist, we return no records */
    2152          26 :     if (!ConnectionHash)
    2153           0 :         return;
    2154             : 
    2155             :     /* Check we have the expected number of output arguments */
    2156          26 :     switch (rsinfo->setDesc->natts)
    2157             :     {
    2158           0 :         case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
    2159           0 :             if (api_version != PGFDW_V1_1)
    2160           0 :                 elog(ERROR, "incorrect number of output arguments");
    2161           0 :             break;
    2162          26 :         case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
    2163          26 :             if (api_version != PGFDW_V1_2)
    2164           0 :                 elog(ERROR, "incorrect number of output arguments");
    2165          26 :             break;
    2166           0 :         default:
    2167           0 :             elog(ERROR, "incorrect number of output arguments");
    2168             :     }
    2169             : 
    2170          26 :     hash_seq_init(&scan, ConnectionHash);
    2171         226 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2172             :     {
    2173             :         ForeignServer *server;
    2174         200 :         Datum       values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2175         200 :         bool        nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2176         200 :         int         i = 0;
    2177             : 
    2178             :         /* We only look for open remote connections */
    2179         200 :         if (!entry->conn)
    2180         174 :             continue;
    2181             : 
    2182          26 :         server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
    2183             : 
    2184             :         /*
    2185             :          * The foreign server may have been dropped in current explicit
    2186             :          * transaction. It is not possible to drop the server from another
    2187             :          * session when the connection associated with it is in use in the
    2188             :          * current transaction, if tried so, the drop query in another session
    2189             :          * blocks until the current transaction finishes.
    2190             :          *
    2191             :          * Even though the server is dropped in the current transaction, the
    2192             :          * cache can still have associated active connection entry, say we
    2193             :          * call such connections dangling. Since we can not fetch the server
    2194             :          * name from system catalogs for dangling connections, instead we show
    2195             :          * NULL value for server name in output.
    2196             :          *
    2197             :          * We could have done better by storing the server name in the cache
    2198             :          * entry instead of server oid so that it could be used in the output.
    2199             :          * But the server name in each cache entry requires 64 bytes of
    2200             :          * memory, which is huge, when there are many cached connections and
    2201             :          * the use case i.e. dropping the foreign server within the explicit
    2202             :          * current transaction seems rare. So, we chose to show NULL value for
    2203             :          * server name in output.
    2204             :          *
    2205             :          * Such dangling connections get closed either in next use or at the
    2206             :          * end of current explicit transaction in pgfdw_xact_callback.
    2207             :          */
    2208          26 :         if (!server)
    2209             :         {
    2210             :             /*
    2211             :              * If the server has been dropped in the current explicit
    2212             :              * transaction, then this entry would have been invalidated in
    2213             :              * pgfdw_inval_callback at the end of drop server command. Note
    2214             :              * that this connection would not have been closed in
    2215             :              * pgfdw_inval_callback because it is still being used in the
    2216             :              * current explicit transaction. So, assert that here.
    2217             :              */
    2218             :             Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
    2219             : 
    2220             :             /* Show null, if no server name was found */
    2221           2 :             nulls[i++] = true;
    2222             :         }
    2223             :         else
    2224          24 :             values[i++] = CStringGetTextDatum(server->servername);
    2225             : 
    2226          26 :         if (api_version >= PGFDW_V1_2)
    2227             :         {
    2228             :             HeapTuple   tp;
    2229             : 
    2230             :             /* Use the system cache to obtain the user mapping */
    2231          26 :             tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
    2232             : 
    2233             :             /*
    2234             :              * Just like in the foreign server case, user mappings can also be
    2235             :              * dropped in the current explicit transaction. Therefore, the
    2236             :              * similar check as in the server case is required.
    2237             :              */
    2238          26 :             if (!HeapTupleIsValid(tp))
    2239             :             {
    2240             :                 /*
    2241             :                  * If we reach here, this entry must have been invalidated in
    2242             :                  * pgfdw_inval_callback, same as in the server case.
    2243             :                  */
    2244             :                 Assert(entry->conn && entry->xact_depth > 0 &&
    2245             :                        entry->invalidated);
    2246             : 
    2247           2 :                 nulls[i++] = true;
    2248             :             }
    2249             :             else
    2250             :             {
    2251             :                 Oid         userid;
    2252             : 
    2253          24 :                 userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
    2254          24 :                 values[i++] = CStringGetTextDatum(MappingUserName(userid));
    2255          24 :                 ReleaseSysCache(tp);
    2256             :             }
    2257             :         }
    2258             : 
    2259          26 :         values[i++] = BoolGetDatum(!entry->invalidated);
    2260             : 
    2261          26 :         if (api_version >= PGFDW_V1_2)
    2262             :         {
    2263          26 :             bool        check_conn = PG_GETARG_BOOL(0);
    2264             : 
    2265             :             /* Is this connection used in the current transaction? */
    2266          26 :             values[i++] = BoolGetDatum(entry->xact_depth > 0);
    2267             : 
    2268             :             /*
    2269             :              * If a connection status check is requested and supported, return
    2270             :              * whether the connection is closed. Otherwise, return NULL.
    2271             :              */
    2272          26 :             if (check_conn && pgfdw_conn_checkable())
    2273           4 :                 values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
    2274             :             else
    2275          22 :                 nulls[i++] = true;
    2276             :         }
    2277             : 
    2278          26 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    2279             :     }
    2280             : }
    2281             : 
    2282             : /*
    2283             :  * List active foreign server connections.
    2284             :  *
    2285             :  * The SQL API of this function has changed multiple times, and will likely
    2286             :  * do so again in future.  To support the case where a newer version of this
    2287             :  * loadable module is being used with an old SQL declaration of the function,
    2288             :  * we continue to support the older API versions.
    2289             :  */
    2290             : Datum
    2291          26 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
    2292             : {
    2293          26 :     postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
    2294             : 
    2295          26 :     PG_RETURN_VOID();
    2296             : }
    2297             : 
    2298             : Datum
    2299           0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
    2300             : {
    2301           0 :     postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
    2302             : 
    2303           0 :     PG_RETURN_VOID();
    2304             : }
    2305             : 
    2306             : /*
    2307             :  * Disconnect the specified cached connections.
    2308             :  *
    2309             :  * This function discards the open connections that are established by
    2310             :  * postgres_fdw from the local session to the foreign server with
    2311             :  * the given name. Note that there can be multiple connections to
    2312             :  * the given server using different user mappings. If the connections
    2313             :  * are used in the current local transaction, they are not disconnected
    2314             :  * and warning messages are reported. This function returns true
    2315             :  * if it disconnects at least one connection, otherwise false. If no
    2316             :  * foreign server with the given name is found, an error is reported.
    2317             :  */
    2318             : Datum
    2319           8 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
    2320             : {
    2321             :     ForeignServer *server;
    2322             :     char       *servername;
    2323             : 
    2324           8 :     servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
    2325           8 :     server = GetForeignServerByName(servername, false);
    2326             : 
    2327           6 :     PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
    2328             : }
    2329             : 
    2330             : /*
    2331             :  * Disconnect all the cached connections.
    2332             :  *
    2333             :  * This function discards all the open connections that are established by
    2334             :  * postgres_fdw from the local session to the foreign servers.
    2335             :  * If the connections are used in the current local transaction, they are
    2336             :  * not disconnected and warning messages are reported. This function
    2337             :  * returns true if it disconnects at least one connection, otherwise false.
    2338             :  */
    2339             : Datum
    2340          10 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
    2341             : {
    2342          10 :     PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
    2343             : }
    2344             : 
    2345             : /*
    2346             :  * Workhorse to disconnect cached connections.
    2347             :  *
    2348             :  * This function scans all the connection cache entries and disconnects
    2349             :  * the open connections whose foreign server OID matches with
    2350             :  * the specified one. If InvalidOid is specified, it disconnects all
    2351             :  * the cached connections.
    2352             :  *
    2353             :  * This function emits a warning for each connection that's used in
    2354             :  * the current transaction and doesn't close it. It returns true if
    2355             :  * it disconnects at least one connection, otherwise false.
    2356             :  *
    2357             :  * Note that this function disconnects even the connections that are
    2358             :  * established by other users in the same local session using different
    2359             :  * user mappings. This leads even non-superuser to be able to close
    2360             :  * the connections established by superusers in the same local session.
    2361             :  *
    2362             :  * XXX As of now we don't see any security risk doing this. But we should
    2363             :  * set some restrictions on that, for example, prevent non-superuser
    2364             :  * from closing the connections established by superusers even
    2365             :  * in the same session?
    2366             :  */
    2367             : static bool
    2368          16 : disconnect_cached_connections(Oid serverid)
    2369             : {
    2370             :     HASH_SEQ_STATUS scan;
    2371             :     ConnCacheEntry *entry;
    2372          16 :     bool        all = !OidIsValid(serverid);
    2373          16 :     bool        result = false;
    2374             : 
    2375             :     /*
    2376             :      * Connection cache hashtable has not been initialized yet in this
    2377             :      * session, so return false.
    2378             :      */
    2379          16 :     if (!ConnectionHash)
    2380           0 :         return false;
    2381             : 
    2382          16 :     hash_seq_init(&scan, ConnectionHash);
    2383         134 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2384             :     {
    2385             :         /* Ignore cache entry if no open connection right now. */
    2386         118 :         if (!entry->conn)
    2387          96 :             continue;
    2388             : 
    2389          22 :         if (all || entry->serverid == serverid)
    2390             :         {
    2391             :             /*
    2392             :              * Emit a warning because the connection to close is used in the
    2393             :              * current transaction and cannot be disconnected right now.
    2394             :              */
    2395          16 :             if (entry->xact_depth > 0)
    2396             :             {
    2397             :                 ForeignServer *server;
    2398             : 
    2399           6 :                 server = GetForeignServerExtended(entry->serverid,
    2400             :                                                   FSV_MISSING_OK);
    2401             : 
    2402           6 :                 if (!server)
    2403             :                 {
    2404             :                     /*
    2405             :                      * If the foreign server was dropped while its connection
    2406             :                      * was used in the current transaction, the connection
    2407             :                      * must have been marked as invalid by
    2408             :                      * pgfdw_inval_callback at the end of DROP SERVER command.
    2409             :                      */
    2410             :                     Assert(entry->invalidated);
    2411             : 
    2412           0 :                     ereport(WARNING,
    2413             :                             (errmsg("cannot close dropped server connection because it is still in use")));
    2414             :                 }
    2415             :                 else
    2416           6 :                     ereport(WARNING,
    2417             :                             (errmsg("cannot close connection for server \"%s\" because it is still in use",
    2418             :                                     server->servername)));
    2419             :             }
    2420             :             else
    2421             :             {
    2422          10 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    2423          10 :                 disconnect_pg_server(entry);
    2424          10 :                 result = true;
    2425             :             }
    2426             :         }
    2427             :     }
    2428             : 
    2429          16 :     return result;
    2430             : }
    2431             : 
    2432             : /*
    2433             :  * Check if the remote server closed the connection.
    2434             :  *
    2435             :  * Returns 1 if the connection is closed, -1 if an error occurred,
    2436             :  * and 0 if it's not closed or if the connection check is unavailable
    2437             :  * on this platform.
    2438             :  */
    2439             : static int
    2440           4 : pgfdw_conn_check(PGconn *conn)
    2441             : {
    2442           4 :     int         sock = PQsocket(conn);
    2443             : 
    2444           4 :     if (PQstatus(conn) != CONNECTION_OK || sock == -1)
    2445           0 :         return -1;
    2446             : 
    2447             : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2448             :     {
    2449             :         struct pollfd input_fd;
    2450             :         int         result;
    2451             : 
    2452           4 :         input_fd.fd = sock;
    2453           4 :         input_fd.events = POLLRDHUP;
    2454           4 :         input_fd.revents = 0;
    2455             : 
    2456             :         do
    2457           4 :             result = poll(&input_fd, 1, 0);
    2458           4 :         while (result < 0 && errno == EINTR);
    2459             : 
    2460           4 :         if (result < 0)
    2461           0 :             return -1;
    2462             : 
    2463           4 :         return (input_fd.revents &
    2464           4 :                 (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
    2465             :     }
    2466             : #else
    2467             :     return 0;
    2468             : #endif
    2469             : }
    2470             : 
    2471             : /*
    2472             :  * Check if connection status checking is available on this platform.
    2473             :  *
    2474             :  * Returns true if available, false otherwise.
    2475             :  */
    2476             : static bool
    2477           4 : pgfdw_conn_checkable(void)
    2478             : {
    2479             : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2480           4 :     return true;
    2481             : #else
    2482             :     return false;
    2483             : #endif
    2484             : }

Generated by: LCOV version 1.14