LCOV - code coverage report
Current view: top level - contrib/postgres_fdw - connection.c (source / functions) Hit Total Coverage
Test: PostgreSQL 17devel Lines: 508 595 85.4 %
Date: 2024-05-09 05:10:41 Functions: 42 42 100.0 %
Legend: Lines: hit not hit

          Line data    Source code
       1             : /*-------------------------------------------------------------------------
       2             :  *
       3             :  * connection.c
       4             :  *        Connection management functions for postgres_fdw
       5             :  *
       6             :  * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
       7             :  *
       8             :  * IDENTIFICATION
       9             :  *        contrib/postgres_fdw/connection.c
      10             :  *
      11             :  *-------------------------------------------------------------------------
      12             :  */
      13             : #include "postgres.h"
      14             : 
      15             : #include "access/htup_details.h"
      16             : #include "access/xact.h"
      17             : #include "catalog/pg_user_mapping.h"
      18             : #include "commands/defrem.h"
      19             : #include "funcapi.h"
      20             : #include "libpq/libpq-be.h"
      21             : #include "libpq/libpq-be-fe-helpers.h"
      22             : #include "mb/pg_wchar.h"
      23             : #include "miscadmin.h"
      24             : #include "pgstat.h"
      25             : #include "postgres_fdw.h"
      26             : #include "storage/fd.h"
      27             : #include "storage/latch.h"
      28             : #include "utils/builtins.h"
      29             : #include "utils/datetime.h"
      30             : #include "utils/hsearch.h"
      31             : #include "utils/inval.h"
      32             : #include "utils/memutils.h"
      33             : #include "utils/syscache.h"
      34             : 
      35             : /*
      36             :  * Connection cache hash table entry
      37             :  *
      38             :  * The lookup key in this hash table is the user mapping OID. We use just one
      39             :  * connection per user mapping ID, which ensures that all the scans use the
      40             :  * same snapshot during a query.  Using the user mapping OID rather than
      41             :  * the foreign server OID + user OID avoids creating multiple connections when
      42             :  * the public user mapping applies to all user OIDs.
      43             :  *
      44             :  * The "conn" pointer can be NULL if we don't currently have a live connection.
      45             :  * When we do have a connection, xact_depth tracks the current depth of
      46             :  * transactions and subtransactions open on the remote side.  We need to issue
      47             :  * commands at the same nesting depth on the remote as we're executing at
      48             :  * ourselves, so that rolling back a subtransaction will kill the right
      49             :  * queries and not the wrong ones.
      50             :  */
      51             : typedef Oid ConnCacheKey;
      52             : 
      53             : typedef struct ConnCacheEntry
      54             : {
      55             :     ConnCacheKey key;           /* hash key (must be first) */
      56             :     PGconn     *conn;           /* connection to foreign server, or NULL */
      57             :     /* Remaining fields are invalid when conn is NULL: */
      58             :     int         xact_depth;     /* 0 = no xact open, 1 = main xact open, 2 =
      59             :                                  * one level of subxact open, etc */
      60             :     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
      61             :     bool        have_error;     /* have any subxacts aborted in this xact? */
      62             :     bool        changing_xact_state;    /* xact state change in process */
      63             :     bool        parallel_commit;    /* do we commit (sub)xacts in parallel? */
      64             :     bool        parallel_abort; /* do we abort (sub)xacts in parallel? */
      65             :     bool        invalidated;    /* true if reconnect is pending */
      66             :     bool        keep_connections;   /* setting value of keep_connections
      67             :                                      * server option */
      68             :     Oid         serverid;       /* foreign server OID used to get server name */
      69             :     uint32      server_hashvalue;   /* hash value of foreign server OID */
      70             :     uint32      mapping_hashvalue;  /* hash value of user mapping OID */
      71             :     PgFdwConnState state;       /* extra per-connection state */
      72             : } ConnCacheEntry;
      73             : 
      74             : /*
      75             :  * Connection cache (initialized on first use)
      76             :  */
      77             : static HTAB *ConnectionHash = NULL;
      78             : 
      79             : /* for assigning cursor numbers and prepared statement numbers */
      80             : static unsigned int cursor_number = 0;
      81             : static unsigned int prep_stmt_number = 0;
      82             : 
      83             : /* tracks whether any work is needed in callback functions */
      84             : static bool xact_got_connection = false;
      85             : 
      86             : /* custom wait event values, retrieved from shared memory */
      87             : static uint32 pgfdw_we_cleanup_result = 0;
      88             : static uint32 pgfdw_we_connect = 0;
      89             : static uint32 pgfdw_we_get_result = 0;
      90             : 
      91             : /*
      92             :  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
      93             :  * query; if it takes longer than 30 seconds to do these, we assume the
      94             :  * connection is dead.
      95             :  */
      96             : #define CONNECTION_CLEANUP_TIMEOUT  30000
      97             : 
      98             : /* Macro for constructing abort command to be sent */
      99             : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
     100             :     do { \
     101             :         if (toplevel) \
     102             :             snprintf((sql), sizeof(sql), \
     103             :                      "ABORT TRANSACTION"); \
     104             :         else \
     105             :             snprintf((sql), sizeof(sql), \
     106             :                      "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
     107             :                      (entry)->xact_depth, (entry)->xact_depth); \
     108             :     } while(0)
     109             : 
     110             : /*
     111             :  * SQL functions
     112             :  */
     113           4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
     114           4 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
     115           4 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
     116             : 
     117             : /* prototypes of private functions */
     118             : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
     119             : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
     120             : static void disconnect_pg_server(ConnCacheEntry *entry);
     121             : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
     122             : static void configure_remote_session(PGconn *conn);
     123             : static void do_sql_command_begin(PGconn *conn, const char *sql);
     124             : static void do_sql_command_end(PGconn *conn, const char *sql,
     125             :                                bool consume_input);
     126             : static void begin_remote_xact(ConnCacheEntry *entry);
     127             : static void pgfdw_xact_callback(XactEvent event, void *arg);
     128             : static void pgfdw_subxact_callback(SubXactEvent event,
     129             :                                    SubTransactionId mySubid,
     130             :                                    SubTransactionId parentSubid,
     131             :                                    void *arg);
     132             : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
     133             : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
     134             : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
     135             : static bool pgfdw_cancel_query(PGconn *conn);
     136             : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
     137             : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
     138             :                                    bool consume_input);
     139             : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
     140             :                                      bool ignore_errors);
     141             : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
     142             : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
     143             :                                          TimestampTz endtime,
     144             :                                          bool consume_input,
     145             :                                          bool ignore_errors);
     146             : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
     147             :                                      PGresult **result, bool *timed_out);
     148             : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
     149             : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
     150             :                                       List **pending_entries,
     151             :                                       List **cancel_requested);
     152             : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
     153             : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
     154             :                                                int curlevel);
     155             : static void pgfdw_finish_abort_cleanup(List *pending_entries,
     156             :                                        List *cancel_requested,
     157             :                                        bool toplevel);
     158             : static void pgfdw_security_check(const char **keywords, const char **values,
     159             :                                  UserMapping *user, PGconn *conn);
     160             : static bool UserMappingPasswordRequired(UserMapping *user);
     161             : static bool disconnect_cached_connections(Oid serverid);
     162             : 
     163             : /*
     164             :  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
     165             :  * server with the user's authorization.  A new connection is established
     166             :  * if we don't already have a suitable one, and a transaction is opened at
     167             :  * the right subtransaction nesting depth if we didn't do that already.
     168             :  *
     169             :  * will_prep_stmt must be true if caller intends to create any prepared
     170             :  * statements.  Since those don't go away automatically at transaction end
     171             :  * (not even on error), we need this flag to cue manual cleanup.
     172             :  *
     173             :  * If state is not NULL, *state receives the per-connection state associated
     174             :  * with the PGconn.
     175             :  */
     176             : PGconn *
     177        4188 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
     178             : {
     179             :     bool        found;
     180        4188 :     bool        retry = false;
     181             :     ConnCacheEntry *entry;
     182             :     ConnCacheKey key;
     183        4188 :     MemoryContext ccxt = CurrentMemoryContext;
     184             : 
     185             :     /* First time through, initialize connection cache hashtable */
     186        4188 :     if (ConnectionHash == NULL)
     187             :     {
     188             :         HASHCTL     ctl;
     189             : 
     190           8 :         if (pgfdw_we_get_result == 0)
     191           8 :             pgfdw_we_get_result =
     192           8 :                 WaitEventExtensionNew("PostgresFdwGetResult");
     193             : 
     194           8 :         ctl.keysize = sizeof(ConnCacheKey);
     195           8 :         ctl.entrysize = sizeof(ConnCacheEntry);
     196           8 :         ConnectionHash = hash_create("postgres_fdw connections", 8,
     197             :                                      &ctl,
     198             :                                      HASH_ELEM | HASH_BLOBS);
     199             : 
     200             :         /*
     201             :          * Register some callback functions that manage connection cleanup.
     202             :          * This should be done just once in each backend.
     203             :          */
     204           8 :         RegisterXactCallback(pgfdw_xact_callback, NULL);
     205           8 :         RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
     206           8 :         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
     207             :                                       pgfdw_inval_callback, (Datum) 0);
     208           8 :         CacheRegisterSyscacheCallback(USERMAPPINGOID,
     209             :                                       pgfdw_inval_callback, (Datum) 0);
     210             :     }
     211             : 
     212             :     /* Set flag that we did GetConnection during the current transaction */
     213        4188 :     xact_got_connection = true;
     214             : 
     215             :     /* Create hash key for the entry.  Assume no pad bytes in key struct */
     216        4188 :     key = user->umid;
     217             : 
     218             :     /*
     219             :      * Find or create cached entry for requested connection.
     220             :      */
     221        4188 :     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
     222        4188 :     if (!found)
     223             :     {
     224             :         /*
     225             :          * We need only clear "conn" here; remaining fields will be filled
     226             :          * later when "conn" is set.
     227             :          */
     228          24 :         entry->conn = NULL;
     229             :     }
     230             : 
     231             :     /* Reject further use of connections which failed abort cleanup. */
     232        4188 :     pgfdw_reject_incomplete_xact_state_change(entry);
     233             : 
     234             :     /*
     235             :      * If the connection needs to be remade due to invalidation, disconnect as
     236             :      * soon as we're out of all transactions.
     237             :      */
     238        4188 :     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
     239             :     {
     240           0 :         elog(DEBUG3, "closing connection %p for option changes to take effect",
     241             :              entry->conn);
     242           0 :         disconnect_pg_server(entry);
     243             :     }
     244             : 
     245             :     /*
     246             :      * If cache entry doesn't have a connection, we have to establish a new
     247             :      * connection.  (If connect_pg_server throws an error, the cache entry
     248             :      * will remain in a valid empty state, ie conn == NULL.)
     249             :      */
     250        4188 :     if (entry->conn == NULL)
     251         128 :         make_new_connection(entry, user);
     252             : 
     253             :     /*
     254             :      * We check the health of the cached connection here when using it.  In
     255             :      * cases where we're out of all transactions, if a broken connection is
     256             :      * detected, we try to reestablish a new connection later.
     257             :      */
     258        4176 :     PG_TRY();
     259             :     {
     260             :         /* Process a pending asynchronous request if any. */
     261        4176 :         if (entry->state.pendingAreq)
     262           0 :             process_pending_request(entry->state.pendingAreq);
     263             :         /* Start a new transaction or subtransaction if needed. */
     264        4176 :         begin_remote_xact(entry);
     265             :     }
     266           4 :     PG_CATCH();
     267             :     {
     268           4 :         MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
     269           4 :         ErrorData  *errdata = CopyErrorData();
     270             : 
     271             :         /*
     272             :          * Determine whether to try to reestablish the connection.
     273             :          *
     274             :          * After a broken connection is detected in libpq, any error other
     275             :          * than connection failure (e.g., out-of-memory) can be thrown
     276             :          * somewhere between return from libpq and the expected ereport() call
     277             :          * in pgfdw_report_error(). In this case, since PQstatus() indicates
     278             :          * CONNECTION_BAD, checking only PQstatus() causes the false detection
     279             :          * of connection failure. To avoid this, we also verify that the
     280             :          * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
     281             :          * checking only the sqlstate can cause another false detection
     282             :          * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
     283             :          * for any libpq-originated error condition.
     284             :          */
     285           4 :         if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
     286           4 :             PQstatus(entry->conn) != CONNECTION_BAD ||
     287           4 :             entry->xact_depth > 0)
     288             :         {
     289           2 :             MemoryContextSwitchTo(ecxt);
     290           2 :             PG_RE_THROW();
     291             :         }
     292             : 
     293             :         /* Clean up the error state */
     294           2 :         FlushErrorState();
     295           2 :         FreeErrorData(errdata);
     296           2 :         errdata = NULL;
     297             : 
     298           2 :         retry = true;
     299             :     }
     300        4174 :     PG_END_TRY();
     301             : 
     302             :     /*
     303             :      * If a broken connection is detected, disconnect it, reestablish a new
     304             :      * connection and retry a new remote transaction. If connection failure is
     305             :      * reported again, we give up getting a connection.
     306             :      */
     307        4174 :     if (retry)
     308             :     {
     309             :         Assert(entry->xact_depth == 0);
     310             : 
     311           2 :         ereport(DEBUG3,
     312             :                 (errmsg_internal("could not start remote transaction on connection %p",
     313             :                                  entry->conn)),
     314             :                 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
     315             : 
     316           2 :         elog(DEBUG3, "closing connection %p to reestablish a new one",
     317             :              entry->conn);
     318           2 :         disconnect_pg_server(entry);
     319             : 
     320           2 :         make_new_connection(entry, user);
     321             : 
     322           2 :         begin_remote_xact(entry);
     323             :     }
     324             : 
     325             :     /* Remember if caller will prepare statements */
     326        4174 :     entry->have_prep_stmt |= will_prep_stmt;
     327             : 
     328             :     /* If caller needs access to the per-connection state, return it. */
     329        4174 :     if (state)
     330        1420 :         *state = &entry->state;
     331             : 
     332        4174 :     return entry->conn;
     333             : }
     334             : 
     335             : /*
     336             :  * Reset all transient state fields in the cached connection entry and
     337             :  * establish new connection to the remote server.
     338             :  */
     339             : static void
     340         130 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
     341             : {
     342         130 :     ForeignServer *server = GetForeignServer(user->serverid);
     343             :     ListCell   *lc;
     344             : 
     345             :     Assert(entry->conn == NULL);
     346             : 
     347             :     /* Reset all transient state fields, to be sure all are clean */
     348         130 :     entry->xact_depth = 0;
     349         130 :     entry->have_prep_stmt = false;
     350         130 :     entry->have_error = false;
     351         130 :     entry->changing_xact_state = false;
     352         130 :     entry->invalidated = false;
     353         130 :     entry->serverid = server->serverid;
     354         130 :     entry->server_hashvalue =
     355         130 :         GetSysCacheHashValue1(FOREIGNSERVEROID,
     356             :                               ObjectIdGetDatum(server->serverid));
     357         130 :     entry->mapping_hashvalue =
     358         130 :         GetSysCacheHashValue1(USERMAPPINGOID,
     359             :                               ObjectIdGetDatum(user->umid));
     360         130 :     memset(&entry->state, 0, sizeof(entry->state));
     361             : 
     362             :     /*
     363             :      * Determine whether to keep the connection that we're about to make here
     364             :      * open even after the transaction using it ends, so that the subsequent
     365             :      * transactions can re-use it.
     366             :      *
     367             :      * By default, all the connections to any foreign servers are kept open.
     368             :      *
     369             :      * Also determine whether to commit/abort (sub)transactions opened on the
     370             :      * remote server in parallel at (sub)transaction end, which is disabled by
     371             :      * default.
     372             :      *
     373             :      * Note: it's enough to determine these only when making a new connection
     374             :      * because if these settings for it are changed, it will be closed and
     375             :      * re-made later.
     376             :      */
     377         130 :     entry->keep_connections = true;
     378         130 :     entry->parallel_commit = false;
     379         130 :     entry->parallel_abort = false;
     380         562 :     foreach(lc, server->options)
     381             :     {
     382         432 :         DefElem    *def = (DefElem *) lfirst(lc);
     383             : 
     384         432 :         if (strcmp(def->defname, "keep_connections") == 0)
     385          12 :             entry->keep_connections = defGetBoolean(def);
     386         420 :         else if (strcmp(def->defname, "parallel_commit") == 0)
     387           4 :             entry->parallel_commit = defGetBoolean(def);
     388         416 :         else if (strcmp(def->defname, "parallel_abort") == 0)
     389           4 :             entry->parallel_abort = defGetBoolean(def);
     390             :     }
     391             : 
     392             :     /* Now try to make the connection */
     393         130 :     entry->conn = connect_pg_server(server, user);
     394             : 
     395         118 :     elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
     396             :          entry->conn, server->servername, user->umid, user->userid);
     397         118 : }
     398             : 
     399             : /*
     400             :  * Check that non-superuser has used password or delegated credentials
     401             :  * to establish connection; otherwise, he's piggybacking on the
     402             :  * postgres server's user identity. See also dblink_security_check()
     403             :  * in contrib/dblink and check_conn_params.
     404             :  */
     405             : static void
     406         122 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
     407             : {
     408             :     /* Superusers bypass the check */
     409         122 :     if (superuser_arg(user->userid))
     410         114 :         return;
     411             : 
     412             : #ifdef ENABLE_GSS
     413             :     /* Connected via GSSAPI with delegated credentials- all good. */
     414             :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
     415             :         return;
     416             : #endif
     417             : 
     418             :     /* Ok if superuser set PW required false. */
     419           8 :     if (!UserMappingPasswordRequired(user))
     420           4 :         return;
     421             : 
     422             :     /* Connected via PW, with PW required true, and provided non-empty PW. */
     423           4 :     if (PQconnectionUsedPassword(conn))
     424             :     {
     425             :         /* ok if params contain a non-empty password */
     426           0 :         for (int i = 0; keywords[i] != NULL; i++)
     427             :         {
     428           0 :             if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     429           0 :                 return;
     430             :         }
     431             :     }
     432             : 
     433           4 :     ereport(ERROR,
     434             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     435             :              errmsg("password or GSSAPI delegated credentials required"),
     436             :              errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
     437             :              errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
     438             : }
     439             : 
     440             : /*
     441             :  * Connect to remote server using specified server and user mapping properties.
     442             :  */
     443             : static PGconn *
     444         130 : connect_pg_server(ForeignServer *server, UserMapping *user)
     445             : {
     446         130 :     PGconn     *volatile conn = NULL;
     447             : 
     448             :     /*
     449             :      * Use PG_TRY block to ensure closing connection on error.
     450             :      */
     451         130 :     PG_TRY();
     452             :     {
     453             :         const char **keywords;
     454             :         const char **values;
     455         130 :         char       *appname = NULL;
     456             :         int         n;
     457             : 
     458             :         /*
     459             :          * Construct connection params from generic options of ForeignServer
     460             :          * and UserMapping.  (Some of them might not be libpq options, in
     461             :          * which case we'll just waste a few array slots.)  Add 4 extra slots
     462             :          * for application_name, fallback_application_name, client_encoding,
     463             :          * end marker.
     464             :          */
     465         130 :         n = list_length(server->options) + list_length(user->options) + 4;
     466         130 :         keywords = (const char **) palloc(n * sizeof(char *));
     467         130 :         values = (const char **) palloc(n * sizeof(char *));
     468             : 
     469         130 :         n = 0;
     470         260 :         n += ExtractConnectionOptions(server->options,
     471         130 :                                       keywords + n, values + n);
     472         260 :         n += ExtractConnectionOptions(user->options,
     473         130 :                                       keywords + n, values + n);
     474             : 
     475             :         /*
     476             :          * Use pgfdw_application_name as application_name if set.
     477             :          *
     478             :          * PQconnectdbParams() processes the parameter arrays from start to
     479             :          * end. If any key word is repeated, the last value is used. Therefore
     480             :          * note that pgfdw_application_name must be added to the arrays after
     481             :          * options of ForeignServer are, so that it can override
     482             :          * application_name set in ForeignServer.
     483             :          */
     484         130 :         if (pgfdw_application_name && *pgfdw_application_name != '\0')
     485             :         {
     486           2 :             keywords[n] = "application_name";
     487           2 :             values[n] = pgfdw_application_name;
     488           2 :             n++;
     489             :         }
     490             : 
     491             :         /*
     492             :          * Search the parameter arrays to find application_name setting, and
     493             :          * replace escape sequences in it with status information if found.
     494             :          * The arrays are searched backwards because the last value is used if
     495             :          * application_name is repeatedly set.
     496             :          */
     497         334 :         for (int i = n - 1; i >= 0; i--)
     498             :         {
     499         236 :             if (strcmp(keywords[i], "application_name") == 0 &&
     500          32 :                 *(values[i]) != '\0')
     501             :             {
     502             :                 /*
     503             :                  * Use this application_name setting if it's not empty string
     504             :                  * even after any escape sequences in it are replaced.
     505             :                  */
     506          32 :                 appname = process_pgfdw_appname(values[i]);
     507          32 :                 if (appname[0] != '\0')
     508             :                 {
     509          32 :                     values[i] = appname;
     510          32 :                     break;
     511             :                 }
     512             : 
     513             :                 /*
     514             :                  * This empty application_name is not used, so we set
     515             :                  * values[i] to NULL and keep searching the array to find the
     516             :                  * next one.
     517             :                  */
     518           0 :                 values[i] = NULL;
     519           0 :                 pfree(appname);
     520           0 :                 appname = NULL;
     521             :             }
     522             :         }
     523             : 
     524             :         /* Use "postgres_fdw" as fallback_application_name */
     525         130 :         keywords[n] = "fallback_application_name";
     526         130 :         values[n] = "postgres_fdw";
     527         130 :         n++;
     528             : 
     529             :         /* Set client_encoding so that libpq can convert encoding properly. */
     530         130 :         keywords[n] = "client_encoding";
     531         130 :         values[n] = GetDatabaseEncodingName();
     532         130 :         n++;
     533             : 
     534         130 :         keywords[n] = values[n] = NULL;
     535             : 
     536             :         /* verify the set of connection parameters */
     537         130 :         check_conn_params(keywords, values, user);
     538             : 
     539             :         /* first time, allocate or get the custom wait event */
     540         126 :         if (pgfdw_we_connect == 0)
     541           8 :             pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
     542             : 
     543             :         /* OK to make connection */
     544         126 :         conn = libpqsrv_connect_params(keywords, values,
     545             :                                        false,   /* expand_dbname */
     546             :                                        pgfdw_we_connect);
     547             : 
     548         126 :         if (!conn || PQstatus(conn) != CONNECTION_OK)
     549           4 :             ereport(ERROR,
     550             :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     551             :                      errmsg("could not connect to server \"%s\"",
     552             :                             server->servername),
     553             :                      errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
     554             : 
     555             :         /* Perform post-connection security checks */
     556         122 :         pgfdw_security_check(keywords, values, user, conn);
     557             : 
     558             :         /* Prepare new session for use */
     559         118 :         configure_remote_session(conn);
     560             : 
     561         118 :         if (appname != NULL)
     562          32 :             pfree(appname);
     563         118 :         pfree(keywords);
     564         118 :         pfree(values);
     565             :     }
     566          12 :     PG_CATCH();
     567             :     {
     568          12 :         libpqsrv_disconnect(conn);
     569          12 :         PG_RE_THROW();
     570             :     }
     571         118 :     PG_END_TRY();
     572             : 
     573         118 :     return conn;
     574             : }
     575             : 
     576             : /*
     577             :  * Disconnect any open connection for a connection cache entry.
     578             :  */
     579             : static void
     580         112 : disconnect_pg_server(ConnCacheEntry *entry)
     581             : {
     582         112 :     if (entry->conn != NULL)
     583             :     {
     584         112 :         libpqsrv_disconnect(entry->conn);
     585         112 :         entry->conn = NULL;
     586             :     }
     587         112 : }
     588             : 
     589             : /*
     590             :  * Return true if the password_required is defined and false for this user
     591             :  * mapping, otherwise false. The mapping has been pre-validated.
     592             :  */
     593             : static bool
     594          14 : UserMappingPasswordRequired(UserMapping *user)
     595             : {
     596             :     ListCell   *cell;
     597             : 
     598          20 :     foreach(cell, user->options)
     599             :     {
     600          12 :         DefElem    *def = (DefElem *) lfirst(cell);
     601             : 
     602          12 :         if (strcmp(def->defname, "password_required") == 0)
     603           6 :             return defGetBoolean(def);
     604             :     }
     605             : 
     606           8 :     return true;
     607             : }
     608             : 
     609             : /*
     610             :  * For non-superusers, insist that the connstr specify a password or that the
     611             :  * user provided their own GSSAPI delegated credentials.  This
     612             :  * prevents a password from being picked up from .pgpass, a service file, the
     613             :  * environment, etc.  We don't want the postgres user's passwords,
     614             :  * certificates, etc to be accessible to non-superusers.  (See also
     615             :  * dblink_connstr_check in contrib/dblink.)
     616             :  */
     617             : static void
     618         130 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
     619             : {
     620             :     int         i;
     621             : 
     622             :     /* no check required if superuser */
     623         130 :     if (superuser_arg(user->userid))
     624         118 :         return;
     625             : 
     626             : #ifdef ENABLE_GSS
     627             :     /* ok if the user provided their own delegated credentials */
     628             :     if (be_gssapi_get_delegation(MyProcPort))
     629             :         return;
     630             : #endif
     631             : 
     632             :     /* ok if params contain a non-empty password */
     633          48 :     for (i = 0; keywords[i] != NULL; i++)
     634             :     {
     635          42 :         if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     636           6 :             return;
     637             :     }
     638             : 
     639             :     /* ok if the superuser explicitly said so at user mapping creation time */
     640           6 :     if (!UserMappingPasswordRequired(user))
     641           2 :         return;
     642             : 
     643           4 :     ereport(ERROR,
     644             :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     645             :              errmsg("password or GSSAPI delegated credentials required"),
     646             :              errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
     647             : }
     648             : 
     649             : /*
     650             :  * Issue SET commands to make sure remote session is configured properly.
     651             :  *
     652             :  * We do this just once at connection, assuming nothing will change the
     653             :  * values later.  Since we'll never send volatile function calls to the
     654             :  * remote, there shouldn't be any way to break this assumption from our end.
     655             :  * It's possible to think of ways to break it at the remote end, eg making
     656             :  * a foreign table point to a view that includes a set_config call ---
     657             :  * but once you admit the possibility of a malicious view definition,
     658             :  * there are any number of ways to break things.
     659             :  */
     660             : static void
     661         118 : configure_remote_session(PGconn *conn)
     662             : {
     663         118 :     int         remoteversion = PQserverVersion(conn);
     664             : 
     665             :     /* Force the search path to contain only pg_catalog (see deparse.c) */
     666         118 :     do_sql_command(conn, "SET search_path = pg_catalog");
     667             : 
     668             :     /*
     669             :      * Set remote timezone; this is basically just cosmetic, since all
     670             :      * transmitted and returned timestamptzs should specify a zone explicitly
     671             :      * anyway.  However it makes the regression test outputs more predictable.
     672             :      *
     673             :      * We don't risk setting remote zone equal to ours, since the remote
     674             :      * server might use a different timezone database.  Instead, use GMT
     675             :      * (quoted, because very old servers are picky about case).  That's
     676             :      * guaranteed to work regardless of the remote's timezone database,
     677             :      * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
     678             :      */
     679         118 :     do_sql_command(conn, "SET timezone = 'GMT'");
     680             : 
     681             :     /*
     682             :      * Set values needed to ensure unambiguous data output from remote.  (This
     683             :      * logic should match what pg_dump does.  See also set_transmission_modes
     684             :      * in postgres_fdw.c.)
     685             :      */
     686         118 :     do_sql_command(conn, "SET datestyle = ISO");
     687         118 :     if (remoteversion >= 80400)
     688         118 :         do_sql_command(conn, "SET intervalstyle = postgres");
     689         118 :     if (remoteversion >= 90000)
     690         118 :         do_sql_command(conn, "SET extra_float_digits = 3");
     691             :     else
     692           0 :         do_sql_command(conn, "SET extra_float_digits = 2");
     693         118 : }
     694             : 
     695             : /*
     696             :  * Convenience subroutine to issue a non-data-returning SQL command to remote
     697             :  */
     698             : void
     699        3422 : do_sql_command(PGconn *conn, const char *sql)
     700             : {
     701        3422 :     do_sql_command_begin(conn, sql);
     702        3422 :     do_sql_command_end(conn, sql, false);
     703        3416 : }
     704             : 
     705             : static void
     706        3458 : do_sql_command_begin(PGconn *conn, const char *sql)
     707             : {
     708        3458 :     if (!PQsendQuery(conn, sql))
     709           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     710        3458 : }
     711             : 
     712             : static void
     713        3458 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
     714             : {
     715             :     PGresult   *res;
     716             : 
     717             :     /*
     718             :      * If requested, consume whatever data is available from the socket. (Note
     719             :      * that if all data is available, this allows pgfdw_get_result to call
     720             :      * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
     721             :      * would be large compared to the overhead of PQconsumeInput.)
     722             :      */
     723        3458 :     if (consume_input && !PQconsumeInput(conn))
     724           0 :         pgfdw_report_error(ERROR, NULL, conn, false, sql);
     725        3458 :     res = pgfdw_get_result(conn);
     726        3458 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     727           6 :         pgfdw_report_error(ERROR, res, conn, true, sql);
     728        3452 :     PQclear(res);
     729        3452 : }
     730             : 
     731             : /*
     732             :  * Start remote transaction or subtransaction, if needed.
     733             :  *
     734             :  * Note that we always use at least REPEATABLE READ in the remote session.
     735             :  * This is so that, if a query initiates multiple scans of the same or
     736             :  * different foreign tables, we will get snapshot-consistent results from
     737             :  * those scans.  A disadvantage is that we can't provide sane emulation of
     738             :  * READ COMMITTED behavior --- it would be nice if we had some other way to
     739             :  * control which remote queries share a snapshot.
     740             :  */
     741             : static void
     742        4178 : begin_remote_xact(ConnCacheEntry *entry)
     743             : {
     744        4178 :     int         curlevel = GetCurrentTransactionNestLevel();
     745             : 
     746             :     /* Start main transaction if we haven't yet */
     747        4178 :     if (entry->xact_depth <= 0)
     748             :     {
     749             :         const char *sql;
     750             : 
     751        1442 :         elog(DEBUG3, "starting remote transaction on connection %p",
     752             :              entry->conn);
     753             : 
     754        1442 :         if (IsolationIsSerializable())
     755           0 :             sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
     756             :         else
     757        1442 :             sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
     758        1442 :         entry->changing_xact_state = true;
     759        1442 :         do_sql_command(entry->conn, sql);
     760        1440 :         entry->xact_depth = 1;
     761        1440 :         entry->changing_xact_state = false;
     762             :     }
     763             : 
     764             :     /*
     765             :      * If we're in a subtransaction, stack up savepoints to match our level.
     766             :      * This ensures we can rollback just the desired effects when a
     767             :      * subtransaction aborts.
     768             :      */
     769        4204 :     while (entry->xact_depth < curlevel)
     770             :     {
     771             :         char        sql[64];
     772             : 
     773          30 :         snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
     774          30 :         entry->changing_xact_state = true;
     775          30 :         do_sql_command(entry->conn, sql);
     776          28 :         entry->xact_depth++;
     777          28 :         entry->changing_xact_state = false;
     778             :     }
     779        4174 : }
     780             : 
     781             : /*
     782             :  * Release connection reference count created by calling GetConnection.
     783             :  */
     784             : void
     785        4068 : ReleaseConnection(PGconn *conn)
     786             : {
     787             :     /*
     788             :      * Currently, we don't actually track connection references because all
     789             :      * cleanup is managed on a transaction or subtransaction basis instead. So
     790             :      * there's nothing to do here.
     791             :      */
     792        4068 : }
     793             : 
     794             : /*
     795             :  * Assign a "unique" number for a cursor.
     796             :  *
     797             :  * These really only need to be unique per connection within a transaction.
     798             :  * For the moment we ignore the per-connection point and assign them across
     799             :  * all connections in the transaction, but we ask for the connection to be
     800             :  * supplied in case we want to refine that.
     801             :  *
     802             :  * Note that even if wraparound happens in a very long transaction, actual
     803             :  * collisions are highly improbable; just be sure to use %u not %d to print.
     804             :  */
     805             : unsigned int
     806        1018 : GetCursorNumber(PGconn *conn)
     807             : {
     808        1018 :     return ++cursor_number;
     809             : }
     810             : 
     811             : /*
     812             :  * Assign a "unique" number for a prepared statement.
     813             :  *
     814             :  * This works much like GetCursorNumber, except that we never reset the counter
     815             :  * within a session.  That's because we can't be 100% sure we've gotten rid
     816             :  * of all prepared statements on all connections, and it's not really worth
     817             :  * increasing the risk of prepared-statement name collisions by resetting.
     818             :  */
     819             : unsigned int
     820         348 : GetPrepStmtNumber(PGconn *conn)
     821             : {
     822         348 :     return ++prep_stmt_number;
     823             : }
     824             : 
     825             : /*
     826             :  * Submit a query and wait for the result.
     827             :  *
     828             :  * Since we don't use non-blocking mode, this can't process interrupts while
     829             :  * pushing the query text to the server.  That risk is relatively small, so we
     830             :  * ignore that for now.
     831             :  *
     832             :  * Caller is responsible for the error handling on the result.
     833             :  */
     834             : PGresult *
     835        7716 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
     836             : {
     837             :     /* First, process a pending asynchronous request, if any. */
     838        7716 :     if (state && state->pendingAreq)
     839           8 :         process_pending_request(state->pendingAreq);
     840             : 
     841        7716 :     if (!PQsendQuery(conn, query))
     842           0 :         return NULL;
     843        7716 :     return pgfdw_get_result(conn);
     844             : }
     845             : 
     846             : /*
     847             :  * Wrap libpqsrv_get_result_last(), adding wait event.
     848             :  *
     849             :  * Caller is responsible for the error handling on the result.
     850             :  */
     851             : PGresult *
     852       15598 : pgfdw_get_result(PGconn *conn)
     853             : {
     854       15598 :     return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
     855             : }
     856             : 
     857             : /*
     858             :  * Report an error we got from the remote server.
     859             :  *
     860             :  * elevel: error level to use (typically ERROR, but might be less)
     861             :  * res: PGresult containing the error
     862             :  * conn: connection we did the query on
     863             :  * clear: if true, PQclear the result (otherwise caller will handle it)
     864             :  * sql: NULL, or text of remote command we tried to execute
     865             :  *
     866             :  * Note: callers that choose not to throw ERROR for a remote error are
     867             :  * responsible for making sure that the associated ConnCacheEntry gets
     868             :  * marked with have_error = true.
     869             :  */
     870             : void
     871          32 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
     872             :                    bool clear, const char *sql)
     873             : {
     874             :     /* If requested, PGresult must be released before leaving this function. */
     875          32 :     PG_TRY();
     876             :     {
     877          32 :         char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
     878          32 :         char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
     879          32 :         char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
     880          32 :         char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
     881          32 :         char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
     882             :         int         sqlstate;
     883             : 
     884          32 :         if (diag_sqlstate)
     885          28 :             sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
     886             :                                      diag_sqlstate[1],
     887             :                                      diag_sqlstate[2],
     888             :                                      diag_sqlstate[3],
     889             :                                      diag_sqlstate[4]);
     890             :         else
     891           4 :             sqlstate = ERRCODE_CONNECTION_FAILURE;
     892             : 
     893             :         /*
     894             :          * If we don't get a message from the PGresult, try the PGconn.  This
     895             :          * is needed because for connection-level failures, PQgetResult may
     896             :          * just return NULL, not a PGresult at all.
     897             :          */
     898          32 :         if (message_primary == NULL)
     899           4 :             message_primary = pchomp(PQerrorMessage(conn));
     900             : 
     901          32 :         ereport(elevel,
     902             :                 (errcode(sqlstate),
     903             :                  (message_primary != NULL && message_primary[0] != '\0') ?
     904             :                  errmsg_internal("%s", message_primary) :
     905             :                  errmsg("could not obtain message string for remote error"),
     906             :                  message_detail ? errdetail_internal("%s", message_detail) : 0,
     907             :                  message_hint ? errhint("%s", message_hint) : 0,
     908             :                  message_context ? errcontext("%s", message_context) : 0,
     909             :                  sql ? errcontext("remote SQL command: %s", sql) : 0));
     910             :     }
     911          32 :     PG_FINALLY();
     912             :     {
     913          32 :         if (clear)
     914          30 :             PQclear(res);
     915             :     }
     916          32 :     PG_END_TRY();
     917           0 : }
     918             : 
     919             : /*
     920             :  * pgfdw_xact_callback --- cleanup at main-transaction end.
     921             :  *
     922             :  * This runs just late enough that it must not enter user-defined code
     923             :  * locally.  (Entering such code on the remote side is fine.  Its remote
     924             :  * COMMIT TRANSACTION may run deferred triggers.)
     925             :  */
     926             : static void
     927        7556 : pgfdw_xact_callback(XactEvent event, void *arg)
     928             : {
     929             :     HASH_SEQ_STATUS scan;
     930             :     ConnCacheEntry *entry;
     931        7556 :     List       *pending_entries = NIL;
     932        7556 :     List       *cancel_requested = NIL;
     933             : 
     934             :     /* Quick exit if no connections were touched in this transaction. */
     935        7556 :     if (!xact_got_connection)
     936        6178 :         return;
     937             : 
     938             :     /*
     939             :      * Scan all connection cache entries to find open remote transactions, and
     940             :      * close them.
     941             :      */
     942        1378 :     hash_seq_init(&scan, ConnectionHash);
     943        7104 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
     944             :     {
     945             :         PGresult   *res;
     946             : 
     947             :         /* Ignore cache entry if no open connection right now */
     948        5728 :         if (entry->conn == NULL)
     949        3240 :             continue;
     950             : 
     951             :         /* If it has an open remote transaction, try to close it */
     952        2488 :         if (entry->xact_depth > 0)
     953             :         {
     954        1442 :             elog(DEBUG3, "closing remote transaction on connection %p",
     955             :                  entry->conn);
     956             : 
     957        1442 :             switch (event)
     958             :             {
     959        1358 :                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
     960             :                 case XACT_EVENT_PRE_COMMIT:
     961             : 
     962             :                     /*
     963             :                      * If abort cleanup previously failed for this connection,
     964             :                      * we can't issue any more commands against it.
     965             :                      */
     966        1358 :                     pgfdw_reject_incomplete_xact_state_change(entry);
     967             : 
     968             :                     /* Commit all remote transactions during pre-commit */
     969        1358 :                     entry->changing_xact_state = true;
     970        1358 :                     if (entry->parallel_commit)
     971             :                     {
     972          32 :                         do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
     973          32 :                         pending_entries = lappend(pending_entries, entry);
     974          32 :                         continue;
     975             :                     }
     976        1326 :                     do_sql_command(entry->conn, "COMMIT TRANSACTION");
     977        1326 :                     entry->changing_xact_state = false;
     978             : 
     979             :                     /*
     980             :                      * If there were any errors in subtransactions, and we
     981             :                      * made prepared statements, do a DEALLOCATE ALL to make
     982             :                      * sure we get rid of all prepared statements. This is
     983             :                      * annoying and not terribly bulletproof, but it's
     984             :                      * probably not worth trying harder.
     985             :                      *
     986             :                      * DEALLOCATE ALL only exists in 8.3 and later, so this
     987             :                      * constrains how old a server postgres_fdw can
     988             :                      * communicate with.  We intentionally ignore errors in
     989             :                      * the DEALLOCATE, so that we can hobble along to some
     990             :                      * extent with older servers (leaking prepared statements
     991             :                      * as we go; but we don't really support update operations
     992             :                      * pre-8.3 anyway).
     993             :                      */
     994        1326 :                     if (entry->have_prep_stmt && entry->have_error)
     995             :                     {
     996           0 :                         res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
     997             :                                                NULL);
     998           0 :                         PQclear(res);
     999             :                     }
    1000        1326 :                     entry->have_prep_stmt = false;
    1001        1326 :                     entry->have_error = false;
    1002        1326 :                     break;
    1003           2 :                 case XACT_EVENT_PRE_PREPARE:
    1004             : 
    1005             :                     /*
    1006             :                      * We disallow any remote transactions, since it's not
    1007             :                      * very reasonable to hold them open until the prepared
    1008             :                      * transaction is committed.  For the moment, throw error
    1009             :                      * unconditionally; later we might allow read-only cases.
    1010             :                      * Note that the error will cause us to come right back
    1011             :                      * here with event == XACT_EVENT_ABORT, so we'll clean up
    1012             :                      * the connection state at that point.
    1013             :                      */
    1014           2 :                     ereport(ERROR,
    1015             :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1016             :                              errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
    1017             :                     break;
    1018           0 :                 case XACT_EVENT_PARALLEL_COMMIT:
    1019             :                 case XACT_EVENT_COMMIT:
    1020             :                 case XACT_EVENT_PREPARE:
    1021             :                     /* Pre-commit should have closed the open transaction */
    1022           0 :                     elog(ERROR, "missed cleaning up connection during pre-commit");
    1023             :                     break;
    1024          82 :                 case XACT_EVENT_PARALLEL_ABORT:
    1025             :                 case XACT_EVENT_ABORT:
    1026             :                     /* Rollback all remote transactions during abort */
    1027          82 :                     if (entry->parallel_abort)
    1028             :                     {
    1029           8 :                         if (pgfdw_abort_cleanup_begin(entry, true,
    1030             :                                                       &pending_entries,
    1031             :                                                       &cancel_requested))
    1032           8 :                             continue;
    1033             :                     }
    1034             :                     else
    1035          74 :                         pgfdw_abort_cleanup(entry, true);
    1036          74 :                     break;
    1037             :             }
    1038        1046 :         }
    1039             : 
    1040             :         /* Reset state to show we're out of a transaction */
    1041        2446 :         pgfdw_reset_xact_state(entry, true);
    1042             :     }
    1043             : 
    1044             :     /* If there are any pending connections, finish cleaning them up */
    1045        1376 :     if (pending_entries || cancel_requested)
    1046             :     {
    1047          30 :         if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
    1048             :             event == XACT_EVENT_PRE_COMMIT)
    1049             :         {
    1050             :             Assert(cancel_requested == NIL);
    1051          26 :             pgfdw_finish_pre_commit_cleanup(pending_entries);
    1052             :         }
    1053             :         else
    1054             :         {
    1055             :             Assert(event == XACT_EVENT_PARALLEL_ABORT ||
    1056             :                    event == XACT_EVENT_ABORT);
    1057           4 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1058             :                                        true);
    1059             :         }
    1060             :     }
    1061             : 
    1062             :     /*
    1063             :      * Regardless of the event type, we can now mark ourselves as out of the
    1064             :      * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
    1065             :      * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
    1066             :      */
    1067        1376 :     xact_got_connection = false;
    1068             : 
    1069             :     /* Also reset cursor numbering for next transaction */
    1070        1376 :     cursor_number = 0;
    1071             : }
    1072             : 
    1073             : /*
    1074             :  * pgfdw_subxact_callback --- cleanup at subtransaction end.
    1075             :  */
    1076             : static void
    1077          76 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
    1078             :                        SubTransactionId parentSubid, void *arg)
    1079             : {
    1080             :     HASH_SEQ_STATUS scan;
    1081             :     ConnCacheEntry *entry;
    1082             :     int         curlevel;
    1083          76 :     List       *pending_entries = NIL;
    1084          76 :     List       *cancel_requested = NIL;
    1085             : 
    1086             :     /* Nothing to do at subxact start, nor after commit. */
    1087          76 :     if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
    1088             :           event == SUBXACT_EVENT_ABORT_SUB))
    1089          46 :         return;
    1090             : 
    1091             :     /* Quick exit if no connections were touched in this transaction. */
    1092          30 :     if (!xact_got_connection)
    1093           0 :         return;
    1094             : 
    1095             :     /*
    1096             :      * Scan all connection cache entries to find open remote subtransactions
    1097             :      * of the current level, and close them.
    1098             :      */
    1099          30 :     curlevel = GetCurrentTransactionNestLevel();
    1100          30 :     hash_seq_init(&scan, ConnectionHash);
    1101         204 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1102             :     {
    1103             :         char        sql[100];
    1104             : 
    1105             :         /*
    1106             :          * We only care about connections with open remote subtransactions of
    1107             :          * the current level.
    1108             :          */
    1109         174 :         if (entry->conn == NULL || entry->xact_depth < curlevel)
    1110         158 :             continue;
    1111             : 
    1112          28 :         if (entry->xact_depth > curlevel)
    1113           0 :             elog(ERROR, "missed cleaning up remote subtransaction at level %d",
    1114             :                  entry->xact_depth);
    1115             : 
    1116          28 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1117             :         {
    1118             :             /*
    1119             :              * If abort cleanup previously failed for this connection, we
    1120             :              * can't issue any more commands against it.
    1121             :              */
    1122          14 :             pgfdw_reject_incomplete_xact_state_change(entry);
    1123             : 
    1124             :             /* Commit all remote subtransactions during pre-commit */
    1125          14 :             snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1126          14 :             entry->changing_xact_state = true;
    1127          14 :             if (entry->parallel_commit)
    1128             :             {
    1129           4 :                 do_sql_command_begin(entry->conn, sql);
    1130           4 :                 pending_entries = lappend(pending_entries, entry);
    1131           4 :                 continue;
    1132             :             }
    1133          10 :             do_sql_command(entry->conn, sql);
    1134          10 :             entry->changing_xact_state = false;
    1135             :         }
    1136             :         else
    1137             :         {
    1138             :             /* Rollback all remote subtransactions during abort */
    1139          14 :             if (entry->parallel_abort)
    1140             :             {
    1141           8 :                 if (pgfdw_abort_cleanup_begin(entry, false,
    1142             :                                               &pending_entries,
    1143             :                                               &cancel_requested))
    1144           8 :                     continue;
    1145             :             }
    1146             :             else
    1147           6 :                 pgfdw_abort_cleanup(entry, false);
    1148             :         }
    1149             : 
    1150             :         /* OK, we're outta that level of subtransaction */
    1151          16 :         pgfdw_reset_xact_state(entry, false);
    1152             :     }
    1153             : 
    1154             :     /* If there are any pending connections, finish cleaning them up */
    1155          30 :     if (pending_entries || cancel_requested)
    1156             :     {
    1157           6 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1158             :         {
    1159             :             Assert(cancel_requested == NIL);
    1160           2 :             pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
    1161             :         }
    1162             :         else
    1163             :         {
    1164             :             Assert(event == SUBXACT_EVENT_ABORT_SUB);
    1165           4 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1166             :                                        false);
    1167             :         }
    1168             :     }
    1169             : }
    1170             : 
    1171             : /*
    1172             :  * Connection invalidation callback function
    1173             :  *
    1174             :  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
    1175             :  * close connections depending on that entry immediately if current transaction
    1176             :  * has not used those connections yet. Otherwise, mark those connections as
    1177             :  * invalid and then make pgfdw_xact_callback() close them at the end of current
    1178             :  * transaction, since they cannot be closed in the midst of the transaction
    1179             :  * using them. Closed connections will be remade at the next opportunity if
    1180             :  * necessary.
    1181             :  *
    1182             :  * Although most cache invalidation callbacks blow away all the related stuff
    1183             :  * regardless of the given hashvalue, connections are expensive enough that
    1184             :  * it's worth trying to avoid that.
    1185             :  *
    1186             :  * NB: We could avoid unnecessary disconnection more strictly by examining
    1187             :  * individual option values, but it seems too much effort for the gain.
    1188             :  */
    1189             : static void
    1190         340 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
    1191             : {
    1192             :     HASH_SEQ_STATUS scan;
    1193             :     ConnCacheEntry *entry;
    1194             : 
    1195             :     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
    1196             : 
    1197             :     /* ConnectionHash must exist already, if we're registered */
    1198         340 :     hash_seq_init(&scan, ConnectionHash);
    1199        2292 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1200             :     {
    1201             :         /* Ignore invalid entries */
    1202        1952 :         if (entry->conn == NULL)
    1203        1588 :             continue;
    1204             : 
    1205             :         /* hashvalue == 0 means a cache reset, must clear all state */
    1206         364 :         if (hashvalue == 0 ||
    1207         258 :             (cacheid == FOREIGNSERVEROID &&
    1208         364 :              entry->server_hashvalue == hashvalue) ||
    1209         106 :             (cacheid == USERMAPPINGOID &&
    1210         106 :              entry->mapping_hashvalue == hashvalue))
    1211             :         {
    1212             :             /*
    1213             :              * Close the connection immediately if it's not used yet in this
    1214             :              * transaction. Otherwise mark it as invalid so that
    1215             :              * pgfdw_xact_callback() can close it at the end of this
    1216             :              * transaction.
    1217             :              */
    1218          98 :             if (entry->xact_depth == 0)
    1219             :             {
    1220          92 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    1221          92 :                 disconnect_pg_server(entry);
    1222             :             }
    1223             :             else
    1224           6 :                 entry->invalidated = true;
    1225             :         }
    1226             :     }
    1227         340 : }
    1228             : 
    1229             : /*
    1230             :  * Raise an error if the given connection cache entry is marked as being
    1231             :  * in the middle of an xact state change.  This should be called at which no
    1232             :  * such change is expected to be in progress; if one is found to be in
    1233             :  * progress, it means that we aborted in the middle of a previous state change
    1234             :  * and now don't know what the remote transaction state actually is.
    1235             :  * Such connections can't safely be further used.  Re-establishing the
    1236             :  * connection would change the snapshot and roll back any writes already
    1237             :  * performed, so that's not an option, either. Thus, we must abort.
    1238             :  */
    1239             : static void
    1240        5560 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
    1241             : {
    1242             :     ForeignServer *server;
    1243             : 
    1244             :     /* nothing to do for inactive entries and entries of sane state */
    1245        5560 :     if (entry->conn == NULL || !entry->changing_xact_state)
    1246        5560 :         return;
    1247             : 
    1248             :     /* make sure this entry is inactive */
    1249           0 :     disconnect_pg_server(entry);
    1250             : 
    1251             :     /* find server name to be shown in the message below */
    1252           0 :     server = GetForeignServer(entry->serverid);
    1253             : 
    1254           0 :     ereport(ERROR,
    1255             :             (errcode(ERRCODE_CONNECTION_EXCEPTION),
    1256             :              errmsg("connection to server \"%s\" was lost",
    1257             :                     server->servername)));
    1258             : }
    1259             : 
    1260             : /*
    1261             :  * Reset state to show we're out of a (sub)transaction.
    1262             :  */
    1263             : static void
    1264        2514 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
    1265             : {
    1266        2514 :     if (toplevel)
    1267             :     {
    1268             :         /* Reset state to show we're out of a transaction */
    1269        2486 :         entry->xact_depth = 0;
    1270             : 
    1271             :         /*
    1272             :          * If the connection isn't in a good idle state, it is marked as
    1273             :          * invalid or keep_connections option of its server is disabled, then
    1274             :          * discard it to recover. Next GetConnection will open a new
    1275             :          * connection.
    1276             :          */
    1277        4970 :         if (PQstatus(entry->conn) != CONNECTION_OK ||
    1278        2484 :             PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
    1279        2484 :             entry->changing_xact_state ||
    1280        2484 :             entry->invalidated ||
    1281        2480 :             !entry->keep_connections)
    1282             :         {
    1283           8 :             elog(DEBUG3, "discarding connection %p", entry->conn);
    1284           8 :             disconnect_pg_server(entry);
    1285             :         }
    1286             :     }
    1287             :     else
    1288             :     {
    1289             :         /* Reset state to show we're out of a subtransaction */
    1290          28 :         entry->xact_depth--;
    1291             :     }
    1292        2514 : }
    1293             : 
    1294             : /*
    1295             :  * Cancel the currently-in-progress query (whose query text we do not have)
    1296             :  * and ignore the result.  Returns true if we successfully cancel the query
    1297             :  * and discard any pending result, and false if not.
    1298             :  *
    1299             :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1300             :  * recursion trouble, we'll end up slamming the connection shut, which will
    1301             :  * necessitate failing the entire toplevel transaction even if subtransactions
    1302             :  * were used.  Try to use WARNING where we can.
    1303             :  *
    1304             :  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
    1305             :  * query text from the pendingAreq saved in the per-connection state, then
    1306             :  * report the query using it.
    1307             :  */
    1308             : static bool
    1309           2 : pgfdw_cancel_query(PGconn *conn)
    1310             : {
    1311             :     TimestampTz endtime;
    1312             : 
    1313             :     /*
    1314             :      * If it takes too long to cancel the query and discard the result, assume
    1315             :      * the connection is dead.
    1316             :      */
    1317           2 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1318             :                                           CONNECTION_CLEANUP_TIMEOUT);
    1319             : 
    1320           2 :     if (!pgfdw_cancel_query_begin(conn, endtime))
    1321           0 :         return false;
    1322           2 :     return pgfdw_cancel_query_end(conn, endtime, false);
    1323             : }
    1324             : 
    1325             : /*
    1326             :  * Submit a cancel request to the given connection, waiting only until
    1327             :  * the given time.
    1328             :  *
    1329             :  * We sleep interruptibly until we receive confirmation that the cancel
    1330             :  * request has been accepted, and if it is, return true; if the timeout
    1331             :  * lapses without that, or the request fails for whatever reason, return
    1332             :  * false.
    1333             :  */
    1334             : static bool
    1335           2 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
    1336             : {
    1337           2 :     const char *errormsg = libpqsrv_cancel(conn, endtime);
    1338             : 
    1339           2 :     if (errormsg != NULL)
    1340           0 :         ereport(WARNING,
    1341             :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1342             :                 errmsg("could not send cancel request: %s", errormsg));
    1343             : 
    1344           2 :     return errormsg == NULL;
    1345             : }
    1346             : 
    1347             : static bool
    1348           2 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
    1349             : {
    1350           2 :     PGresult   *result = NULL;
    1351             :     bool        timed_out;
    1352             : 
    1353             :     /*
    1354             :      * If requested, consume whatever data is available from the socket. (Note
    1355             :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1356             :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1357             :      * which would be large compared to the overhead of PQconsumeInput.)
    1358             :      */
    1359           2 :     if (consume_input && !PQconsumeInput(conn))
    1360             :     {
    1361           0 :         ereport(WARNING,
    1362             :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1363             :                  errmsg("could not get result of cancel request: %s",
    1364             :                         pchomp(PQerrorMessage(conn)))));
    1365           0 :         return false;
    1366             :     }
    1367             : 
    1368             :     /* Get and discard the result of the query. */
    1369           2 :     if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
    1370             :     {
    1371           0 :         if (timed_out)
    1372           0 :             ereport(WARNING,
    1373             :                     (errmsg("could not get result of cancel request due to timeout")));
    1374             :         else
    1375           0 :             ereport(WARNING,
    1376             :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1377             :                      errmsg("could not get result of cancel request: %s",
    1378             :                             pchomp(PQerrorMessage(conn)))));
    1379             : 
    1380           0 :         return false;
    1381             :     }
    1382           2 :     PQclear(result);
    1383             : 
    1384           2 :     return true;
    1385             : }
    1386             : 
    1387             : /*
    1388             :  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
    1389             :  * result.  If the query is executed without error, the return value is true.
    1390             :  * If the query is executed successfully but returns an error, the return
    1391             :  * value is true if and only if ignore_errors is set.  If the query can't be
    1392             :  * sent or times out, the return value is false.
    1393             :  *
    1394             :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1395             :  * recursion trouble, we'll end up slamming the connection shut, which will
    1396             :  * necessitate failing the entire toplevel transaction even if subtransactions
    1397             :  * were used.  Try to use WARNING where we can.
    1398             :  */
    1399             : static bool
    1400         106 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
    1401             : {
    1402             :     TimestampTz endtime;
    1403             : 
    1404             :     /*
    1405             :      * If it takes too long to execute a cleanup query, assume the connection
    1406             :      * is dead.  It's fairly likely that this is why we aborted in the first
    1407             :      * place (e.g. statement timeout, user cancel), so the timeout shouldn't
    1408             :      * be too long.
    1409             :      */
    1410         106 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1411             :                                           CONNECTION_CLEANUP_TIMEOUT);
    1412             : 
    1413         106 :     if (!pgfdw_exec_cleanup_query_begin(conn, query))
    1414           0 :         return false;
    1415         106 :     return pgfdw_exec_cleanup_query_end(conn, query, endtime,
    1416             :                                         false, ignore_errors);
    1417             : }
    1418             : 
    1419             : static bool
    1420         130 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
    1421             : {
    1422             :     Assert(query != NULL);
    1423             : 
    1424             :     /*
    1425             :      * Submit a query.  Since we don't use non-blocking mode, this also can
    1426             :      * block.  But its risk is relatively small, so we ignore that for now.
    1427             :      */
    1428         130 :     if (!PQsendQuery(conn, query))
    1429             :     {
    1430           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
    1431           0 :         return false;
    1432             :     }
    1433             : 
    1434         130 :     return true;
    1435             : }
    1436             : 
    1437             : static bool
    1438         130 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
    1439             :                              TimestampTz endtime, bool consume_input,
    1440             :                              bool ignore_errors)
    1441             : {
    1442         130 :     PGresult   *result = NULL;
    1443             :     bool        timed_out;
    1444             : 
    1445             :     Assert(query != NULL);
    1446             : 
    1447             :     /*
    1448             :      * If requested, consume whatever data is available from the socket. (Note
    1449             :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1450             :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1451             :      * which would be large compared to the overhead of PQconsumeInput.)
    1452             :      */
    1453         130 :     if (consume_input && !PQconsumeInput(conn))
    1454             :     {
    1455           0 :         pgfdw_report_error(WARNING, NULL, conn, false, query);
    1456           0 :         return false;
    1457             :     }
    1458             : 
    1459             :     /* Get the result of the query. */
    1460         130 :     if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
    1461             :     {
    1462           0 :         if (timed_out)
    1463           0 :             ereport(WARNING,
    1464             :                     (errmsg("could not get query result due to timeout"),
    1465             :                      errcontext("remote SQL command: %s", query)));
    1466             :         else
    1467           0 :             pgfdw_report_error(WARNING, NULL, conn, false, query);
    1468             : 
    1469           0 :         return false;
    1470             :     }
    1471             : 
    1472             :     /* Issue a warning if not successful. */
    1473         130 :     if (PQresultStatus(result) != PGRES_COMMAND_OK)
    1474             :     {
    1475           0 :         pgfdw_report_error(WARNING, result, conn, true, query);
    1476           0 :         return ignore_errors;
    1477             :     }
    1478         130 :     PQclear(result);
    1479             : 
    1480         130 :     return true;
    1481             : }
    1482             : 
    1483             : /*
    1484             :  * Get, during abort cleanup, the result of a query that is in progress.  This
    1485             :  * might be a query that is being interrupted by transaction abort, or it might
    1486             :  * be a query that was initiated as part of transaction abort to get the remote
    1487             :  * side back to the appropriate state.
    1488             :  *
    1489             :  * endtime is the time at which we should give up and assume the remote
    1490             :  * side is dead.  Returns true if the timeout expired or connection trouble
    1491             :  * occurred, false otherwise.  Sets *result except in case of a timeout.
    1492             :  * Sets timed_out to true only when the timeout expired.
    1493             :  */
    1494             : static bool
    1495         132 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
    1496             :                          bool *timed_out)
    1497             : {
    1498         132 :     volatile bool failed = false;
    1499         132 :     PGresult   *volatile last_res = NULL;
    1500             : 
    1501         132 :     *timed_out = false;
    1502             : 
    1503             :     /* In what follows, do not leak any PGresults on an error. */
    1504         132 :     PG_TRY();
    1505             :     {
    1506             :         for (;;)
    1507         146 :         {
    1508             :             PGresult   *res;
    1509             : 
    1510         400 :             while (PQisBusy(conn))
    1511             :             {
    1512             :                 int         wc;
    1513         122 :                 TimestampTz now = GetCurrentTimestamp();
    1514             :                 long        cur_timeout;
    1515             : 
    1516             :                 /* If timeout has expired, give up, else get sleep time. */
    1517         122 :                 cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
    1518         122 :                 if (cur_timeout <= 0)
    1519             :                 {
    1520           0 :                     *timed_out = true;
    1521           0 :                     failed = true;
    1522           0 :                     goto exit;
    1523             :                 }
    1524             : 
    1525             :                 /* first time, allocate or get the custom wait event */
    1526         122 :                 if (pgfdw_we_cleanup_result == 0)
    1527           2 :                     pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
    1528             : 
    1529             :                 /* Sleep until there's something to do */
    1530         122 :                 wc = WaitLatchOrSocket(MyLatch,
    1531             :                                        WL_LATCH_SET | WL_SOCKET_READABLE |
    1532             :                                        WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1533             :                                        PQsocket(conn),
    1534             :                                        cur_timeout, pgfdw_we_cleanup_result);
    1535         122 :                 ResetLatch(MyLatch);
    1536             : 
    1537         122 :                 CHECK_FOR_INTERRUPTS();
    1538             : 
    1539             :                 /* Data available in socket? */
    1540         122 :                 if (wc & WL_SOCKET_READABLE)
    1541             :                 {
    1542         122 :                     if (!PQconsumeInput(conn))
    1543             :                     {
    1544             :                         /* connection trouble */
    1545           0 :                         failed = true;
    1546           0 :                         goto exit;
    1547             :                     }
    1548             :                 }
    1549             :             }
    1550             : 
    1551         278 :             res = PQgetResult(conn);
    1552         278 :             if (res == NULL)
    1553         132 :                 break;          /* query is complete */
    1554             : 
    1555         146 :             PQclear(last_res);
    1556         146 :             last_res = res;
    1557             :         }
    1558         132 : exit:   ;
    1559             :     }
    1560           0 :     PG_CATCH();
    1561             :     {
    1562           0 :         PQclear(last_res);
    1563           0 :         PG_RE_THROW();
    1564             :     }
    1565         132 :     PG_END_TRY();
    1566             : 
    1567         132 :     if (failed)
    1568           0 :         PQclear(last_res);
    1569             :     else
    1570         132 :         *result = last_res;
    1571         132 :     return failed;
    1572             : }
    1573             : 
    1574             : /*
    1575             :  * Abort remote transaction or subtransaction.
    1576             :  *
    1577             :  * "toplevel" should be set to true if toplevel (main) transaction is
    1578             :  * rollbacked, false otherwise.
    1579             :  *
    1580             :  * Set entry->changing_xact_state to false on success, true on failure.
    1581             :  */
    1582             : static void
    1583          80 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
    1584             : {
    1585             :     char        sql[100];
    1586             : 
    1587             :     /*
    1588             :      * Don't try to clean up the connection if we're already in error
    1589             :      * recursion trouble.
    1590             :      */
    1591          80 :     if (in_error_recursion_trouble())
    1592           0 :         entry->changing_xact_state = true;
    1593             : 
    1594             :     /*
    1595             :      * If connection is already unsalvageable, don't touch it further.
    1596             :      */
    1597          80 :     if (entry->changing_xact_state)
    1598           2 :         return;
    1599             : 
    1600             :     /*
    1601             :      * Mark this connection as in the process of changing transaction state.
    1602             :      */
    1603          78 :     entry->changing_xact_state = true;
    1604             : 
    1605             :     /* Assume we might have lost track of prepared statements */
    1606          78 :     entry->have_error = true;
    1607             : 
    1608             :     /*
    1609             :      * If a command has been submitted to the remote server by using an
    1610             :      * asynchronous execution function, the command might not have yet
    1611             :      * completed.  Check to see if a command is still being processed by the
    1612             :      * remote server, and if so, request cancellation of the command.
    1613             :      */
    1614          78 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
    1615           2 :         !pgfdw_cancel_query(entry->conn))
    1616           0 :         return;                 /* Unable to cancel running query */
    1617             : 
    1618          78 :     CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1619          78 :     if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
    1620           0 :         return;                 /* Unable to abort remote (sub)transaction */
    1621             : 
    1622          78 :     if (toplevel)
    1623             :     {
    1624          72 :         if (entry->have_prep_stmt && entry->have_error &&
    1625          28 :             !pgfdw_exec_cleanup_query(entry->conn,
    1626             :                                       "DEALLOCATE ALL",
    1627             :                                       true))
    1628           0 :             return;             /* Trouble clearing prepared statements */
    1629             : 
    1630          72 :         entry->have_prep_stmt = false;
    1631          72 :         entry->have_error = false;
    1632             :     }
    1633             : 
    1634             :     /*
    1635             :      * If pendingAreq of the per-connection state is not NULL, it means that
    1636             :      * an asynchronous fetch begun by fetch_more_data_begin() was not done
    1637             :      * successfully and thus the per-connection state was not reset in
    1638             :      * fetch_more_data(); in that case reset the per-connection state here.
    1639             :      */
    1640          78 :     if (entry->state.pendingAreq)
    1641           0 :         memset(&entry->state, 0, sizeof(entry->state));
    1642             : 
    1643             :     /* Disarm changing_xact_state if it all worked */
    1644          78 :     entry->changing_xact_state = false;
    1645             : }
    1646             : 
    1647             : /*
    1648             :  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
    1649             :  * don't wait for the result.
    1650             :  *
    1651             :  * Returns true if the abort command or cancel request is successfully issued,
    1652             :  * false otherwise.  If the abort command is successfully issued, the given
    1653             :  * connection cache entry is appended to *pending_entries.  Otherwise, if the
    1654             :  * cancel request is successfully issued, it is appended to *cancel_requested.
    1655             :  */
    1656             : static bool
    1657          16 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
    1658             :                           List **pending_entries, List **cancel_requested)
    1659             : {
    1660             :     /*
    1661             :      * Don't try to clean up the connection if we're already in error
    1662             :      * recursion trouble.
    1663             :      */
    1664          16 :     if (in_error_recursion_trouble())
    1665           0 :         entry->changing_xact_state = true;
    1666             : 
    1667             :     /*
    1668             :      * If connection is already unsalvageable, don't touch it further.
    1669             :      */
    1670          16 :     if (entry->changing_xact_state)
    1671           0 :         return false;
    1672             : 
    1673             :     /*
    1674             :      * Mark this connection as in the process of changing transaction state.
    1675             :      */
    1676          16 :     entry->changing_xact_state = true;
    1677             : 
    1678             :     /* Assume we might have lost track of prepared statements */
    1679          16 :     entry->have_error = true;
    1680             : 
    1681             :     /*
    1682             :      * If a command has been submitted to the remote server by using an
    1683             :      * asynchronous execution function, the command might not have yet
    1684             :      * completed.  Check to see if a command is still being processed by the
    1685             :      * remote server, and if so, request cancellation of the command.
    1686             :      */
    1687          16 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
    1688             :     {
    1689             :         TimestampTz endtime;
    1690             : 
    1691           0 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1692             :                                               CONNECTION_CLEANUP_TIMEOUT);
    1693           0 :         if (!pgfdw_cancel_query_begin(entry->conn, endtime))
    1694           0 :             return false;       /* Unable to cancel running query */
    1695           0 :         *cancel_requested = lappend(*cancel_requested, entry);
    1696             :     }
    1697             :     else
    1698             :     {
    1699             :         char        sql[100];
    1700             : 
    1701          16 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1702          16 :         if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1703           0 :             return false;       /* Unable to abort remote transaction */
    1704          16 :         *pending_entries = lappend(*pending_entries, entry);
    1705             :     }
    1706             : 
    1707          16 :     return true;
    1708             : }
    1709             : 
    1710             : /*
    1711             :  * Finish pre-commit cleanup of connections on each of which we've sent a
    1712             :  * COMMIT command to the remote server.
    1713             :  */
    1714             : static void
    1715          26 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
    1716             : {
    1717             :     ConnCacheEntry *entry;
    1718          26 :     List       *pending_deallocs = NIL;
    1719             :     ListCell   *lc;
    1720             : 
    1721             :     Assert(pending_entries);
    1722             : 
    1723             :     /*
    1724             :      * Get the result of the COMMIT command for each of the pending entries
    1725             :      */
    1726          58 :     foreach(lc, pending_entries)
    1727             :     {
    1728          32 :         entry = (ConnCacheEntry *) lfirst(lc);
    1729             : 
    1730             :         Assert(entry->changing_xact_state);
    1731             : 
    1732             :         /*
    1733             :          * We might already have received the result on the socket, so pass
    1734             :          * consume_input=true to try to consume it first
    1735             :          */
    1736          32 :         do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
    1737          32 :         entry->changing_xact_state = false;
    1738             : 
    1739             :         /* Do a DEALLOCATE ALL in parallel if needed */
    1740          32 :         if (entry->have_prep_stmt && entry->have_error)
    1741             :         {
    1742             :             /* Ignore errors (see notes in pgfdw_xact_callback) */
    1743           4 :             if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
    1744             :             {
    1745           4 :                 pending_deallocs = lappend(pending_deallocs, entry);
    1746           4 :                 continue;
    1747             :             }
    1748             :         }
    1749          28 :         entry->have_prep_stmt = false;
    1750          28 :         entry->have_error = false;
    1751             : 
    1752          28 :         pgfdw_reset_xact_state(entry, true);
    1753             :     }
    1754             : 
    1755             :     /* No further work if no pending entries */
    1756          26 :     if (!pending_deallocs)
    1757          24 :         return;
    1758             : 
    1759             :     /*
    1760             :      * Get the result of the DEALLOCATE command for each of the pending
    1761             :      * entries
    1762             :      */
    1763           6 :     foreach(lc, pending_deallocs)
    1764             :     {
    1765             :         PGresult   *res;
    1766             : 
    1767           4 :         entry = (ConnCacheEntry *) lfirst(lc);
    1768             : 
    1769             :         /* Ignore errors (see notes in pgfdw_xact_callback) */
    1770           8 :         while ((res = PQgetResult(entry->conn)) != NULL)
    1771             :         {
    1772           4 :             PQclear(res);
    1773             :             /* Stop if the connection is lost (else we'll loop infinitely) */
    1774           4 :             if (PQstatus(entry->conn) == CONNECTION_BAD)
    1775           0 :                 break;
    1776             :         }
    1777           4 :         entry->have_prep_stmt = false;
    1778           4 :         entry->have_error = false;
    1779             : 
    1780           4 :         pgfdw_reset_xact_state(entry, true);
    1781             :     }
    1782             : }
    1783             : 
    1784             : /*
    1785             :  * Finish pre-subcommit cleanup of connections on each of which we've sent a
    1786             :  * RELEASE command to the remote server.
    1787             :  */
    1788             : static void
    1789           2 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
    1790             : {
    1791             :     ConnCacheEntry *entry;
    1792             :     char        sql[100];
    1793             :     ListCell   *lc;
    1794             : 
    1795             :     Assert(pending_entries);
    1796             : 
    1797             :     /*
    1798             :      * Get the result of the RELEASE command for each of the pending entries
    1799             :      */
    1800           2 :     snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1801           6 :     foreach(lc, pending_entries)
    1802             :     {
    1803           4 :         entry = (ConnCacheEntry *) lfirst(lc);
    1804             : 
    1805             :         Assert(entry->changing_xact_state);
    1806             : 
    1807             :         /*
    1808             :          * We might already have received the result on the socket, so pass
    1809             :          * consume_input=true to try to consume it first
    1810             :          */
    1811           4 :         do_sql_command_end(entry->conn, sql, true);
    1812           4 :         entry->changing_xact_state = false;
    1813             : 
    1814           4 :         pgfdw_reset_xact_state(entry, false);
    1815             :     }
    1816           2 : }
    1817             : 
    1818             : /*
    1819             :  * Finish abort cleanup of connections on each of which we've sent an abort
    1820             :  * command or cancel request to the remote server.
    1821             :  */
    1822             : static void
    1823           8 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
    1824             :                            bool toplevel)
    1825             : {
    1826           8 :     List       *pending_deallocs = NIL;
    1827             :     ListCell   *lc;
    1828             : 
    1829             :     /*
    1830             :      * For each of the pending cancel requests (if any), get and discard the
    1831             :      * result of the query, and submit an abort command to the remote server.
    1832             :      */
    1833           8 :     if (cancel_requested)
    1834             :     {
    1835           0 :         foreach(lc, cancel_requested)
    1836             :         {
    1837           0 :             ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1838             :             TimestampTz endtime;
    1839             :             char        sql[100];
    1840             : 
    1841             :             Assert(entry->changing_xact_state);
    1842             : 
    1843             :             /*
    1844             :              * Set end time.  You might think we should do this before issuing
    1845             :              * cancel request like in normal mode, but that is problematic,
    1846             :              * because if, for example, it took longer than 30 seconds to
    1847             :              * process the first few entries in the cancel_requested list, it
    1848             :              * would cause a timeout error when processing each of the
    1849             :              * remaining entries in the list, leading to slamming that entry's
    1850             :              * connection shut.
    1851             :              */
    1852           0 :             endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1853             :                                                   CONNECTION_CLEANUP_TIMEOUT);
    1854             : 
    1855           0 :             if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
    1856             :             {
    1857             :                 /* Unable to cancel running query */
    1858           0 :                 pgfdw_reset_xact_state(entry, toplevel);
    1859           0 :                 continue;
    1860             :             }
    1861             : 
    1862             :             /* Send an abort command in parallel if needed */
    1863           0 :             CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1864           0 :             if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1865             :             {
    1866             :                 /* Unable to abort remote (sub)transaction */
    1867           0 :                 pgfdw_reset_xact_state(entry, toplevel);
    1868             :             }
    1869             :             else
    1870           0 :                 pending_entries = lappend(pending_entries, entry);
    1871             :         }
    1872             :     }
    1873             : 
    1874             :     /* No further work if no pending entries */
    1875           8 :     if (!pending_entries)
    1876           0 :         return;
    1877             : 
    1878             :     /*
    1879             :      * Get the result of the abort command for each of the pending entries
    1880             :      */
    1881          24 :     foreach(lc, pending_entries)
    1882             :     {
    1883          16 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1884             :         TimestampTz endtime;
    1885             :         char        sql[100];
    1886             : 
    1887             :         Assert(entry->changing_xact_state);
    1888             : 
    1889             :         /*
    1890             :          * Set end time.  We do this now, not before issuing the command like
    1891             :          * in normal mode, for the same reason as for the cancel_requested
    1892             :          * entries.
    1893             :          */
    1894          16 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1895             :                                               CONNECTION_CLEANUP_TIMEOUT);
    1896             : 
    1897          16 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1898          16 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
    1899             :                                           true, false))
    1900             :         {
    1901             :             /* Unable to abort remote (sub)transaction */
    1902           0 :             pgfdw_reset_xact_state(entry, toplevel);
    1903           8 :             continue;
    1904             :         }
    1905             : 
    1906          16 :         if (toplevel)
    1907             :         {
    1908             :             /* Do a DEALLOCATE ALL in parallel if needed */
    1909           8 :             if (entry->have_prep_stmt && entry->have_error)
    1910             :             {
    1911           8 :                 if (!pgfdw_exec_cleanup_query_begin(entry->conn,
    1912             :                                                     "DEALLOCATE ALL"))
    1913             :                 {
    1914             :                     /* Trouble clearing prepared statements */
    1915           0 :                     pgfdw_reset_xact_state(entry, toplevel);
    1916             :                 }
    1917             :                 else
    1918           8 :                     pending_deallocs = lappend(pending_deallocs, entry);
    1919           8 :                 continue;
    1920             :             }
    1921           0 :             entry->have_prep_stmt = false;
    1922           0 :             entry->have_error = false;
    1923             :         }
    1924             : 
    1925             :         /* Reset the per-connection state if needed */
    1926           8 :         if (entry->state.pendingAreq)
    1927           0 :             memset(&entry->state, 0, sizeof(entry->state));
    1928             : 
    1929             :         /* We're done with this entry; unset the changing_xact_state flag */
    1930           8 :         entry->changing_xact_state = false;
    1931           8 :         pgfdw_reset_xact_state(entry, toplevel);
    1932             :     }
    1933             : 
    1934             :     /* No further work if no pending entries */
    1935           8 :     if (!pending_deallocs)
    1936           4 :         return;
    1937             :     Assert(toplevel);
    1938             : 
    1939             :     /*
    1940             :      * Get the result of the DEALLOCATE command for each of the pending
    1941             :      * entries
    1942             :      */
    1943          12 :     foreach(lc, pending_deallocs)
    1944             :     {
    1945           8 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    1946             :         TimestampTz endtime;
    1947             : 
    1948             :         Assert(entry->changing_xact_state);
    1949             :         Assert(entry->have_prep_stmt);
    1950             :         Assert(entry->have_error);
    1951             : 
    1952             :         /*
    1953             :          * Set end time.  We do this now, not before issuing the command like
    1954             :          * in normal mode, for the same reason as for the cancel_requested
    1955             :          * entries.
    1956             :          */
    1957           8 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1958             :                                               CONNECTION_CLEANUP_TIMEOUT);
    1959             : 
    1960           8 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
    1961             :                                           endtime, true, true))
    1962             :         {
    1963             :             /* Trouble clearing prepared statements */
    1964           0 :             pgfdw_reset_xact_state(entry, toplevel);
    1965           0 :             continue;
    1966             :         }
    1967           8 :         entry->have_prep_stmt = false;
    1968           8 :         entry->have_error = false;
    1969             : 
    1970             :         /* Reset the per-connection state if needed */
    1971           8 :         if (entry->state.pendingAreq)
    1972           0 :             memset(&entry->state, 0, sizeof(entry->state));
    1973             : 
    1974             :         /* We're done with this entry; unset the changing_xact_state flag */
    1975           8 :         entry->changing_xact_state = false;
    1976           8 :         pgfdw_reset_xact_state(entry, toplevel);
    1977             :     }
    1978             : }
    1979             : 
    1980             : /*
    1981             :  * List active foreign server connections.
    1982             :  *
    1983             :  * This function takes no input parameter and returns setof record made of
    1984             :  * following values:
    1985             :  * - server_name - server name of active connection. In case the foreign server
    1986             :  *   is dropped but still the connection is active, then the server name will
    1987             :  *   be NULL in output.
    1988             :  * - valid - true/false representing whether the connection is valid or not.
    1989             :  *   Note that the connections can get invalidated in pgfdw_inval_callback.
    1990             :  *
    1991             :  * No records are returned when there are no cached connections at all.
    1992             :  */
    1993             : Datum
    1994          22 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
    1995             : {
    1996             : #define POSTGRES_FDW_GET_CONNECTIONS_COLS   2
    1997          22 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    1998             :     HASH_SEQ_STATUS scan;
    1999             :     ConnCacheEntry *entry;
    2000             : 
    2001          22 :     InitMaterializedSRF(fcinfo, 0);
    2002             : 
    2003             :     /* If cache doesn't exist, we return no records */
    2004          22 :     if (!ConnectionHash)
    2005           0 :         PG_RETURN_VOID();
    2006             : 
    2007          22 :     hash_seq_init(&scan, ConnectionHash);
    2008         186 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2009             :     {
    2010             :         ForeignServer *server;
    2011         164 :         Datum       values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2012         164 :         bool        nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2013             : 
    2014             :         /* We only look for open remote connections */
    2015         164 :         if (!entry->conn)
    2016         142 :             continue;
    2017             : 
    2018          22 :         server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
    2019             : 
    2020             :         /*
    2021             :          * The foreign server may have been dropped in current explicit
    2022             :          * transaction. It is not possible to drop the server from another
    2023             :          * session when the connection associated with it is in use in the
    2024             :          * current transaction, if tried so, the drop query in another session
    2025             :          * blocks until the current transaction finishes.
    2026             :          *
    2027             :          * Even though the server is dropped in the current transaction, the
    2028             :          * cache can still have associated active connection entry, say we
    2029             :          * call such connections dangling. Since we can not fetch the server
    2030             :          * name from system catalogs for dangling connections, instead we show
    2031             :          * NULL value for server name in output.
    2032             :          *
    2033             :          * We could have done better by storing the server name in the cache
    2034             :          * entry instead of server oid so that it could be used in the output.
    2035             :          * But the server name in each cache entry requires 64 bytes of
    2036             :          * memory, which is huge, when there are many cached connections and
    2037             :          * the use case i.e. dropping the foreign server within the explicit
    2038             :          * current transaction seems rare. So, we chose to show NULL value for
    2039             :          * server name in output.
    2040             :          *
    2041             :          * Such dangling connections get closed either in next use or at the
    2042             :          * end of current explicit transaction in pgfdw_xact_callback.
    2043             :          */
    2044          22 :         if (!server)
    2045             :         {
    2046             :             /*
    2047             :              * If the server has been dropped in the current explicit
    2048             :              * transaction, then this entry would have been invalidated in
    2049             :              * pgfdw_inval_callback at the end of drop server command. Note
    2050             :              * that this connection would not have been closed in
    2051             :              * pgfdw_inval_callback because it is still being used in the
    2052             :              * current explicit transaction. So, assert that here.
    2053             :              */
    2054             :             Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
    2055             : 
    2056             :             /* Show null, if no server name was found */
    2057           2 :             nulls[0] = true;
    2058             :         }
    2059             :         else
    2060          20 :             values[0] = CStringGetTextDatum(server->servername);
    2061             : 
    2062          22 :         values[1] = BoolGetDatum(!entry->invalidated);
    2063             : 
    2064          22 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    2065             :     }
    2066             : 
    2067          22 :     PG_RETURN_VOID();
    2068             : }
    2069             : 
    2070             : /*
    2071             :  * Disconnect the specified cached connections.
    2072             :  *
    2073             :  * This function discards the open connections that are established by
    2074             :  * postgres_fdw from the local session to the foreign server with
    2075             :  * the given name. Note that there can be multiple connections to
    2076             :  * the given server using different user mappings. If the connections
    2077             :  * are used in the current local transaction, they are not disconnected
    2078             :  * and warning messages are reported. This function returns true
    2079             :  * if it disconnects at least one connection, otherwise false. If no
    2080             :  * foreign server with the given name is found, an error is reported.
    2081             :  */
    2082             : Datum
    2083           8 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
    2084             : {
    2085             :     ForeignServer *server;
    2086             :     char       *servername;
    2087             : 
    2088           8 :     servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
    2089           8 :     server = GetForeignServerByName(servername, false);
    2090             : 
    2091           6 :     PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
    2092             : }
    2093             : 
    2094             : /*
    2095             :  * Disconnect all the cached connections.
    2096             :  *
    2097             :  * This function discards all the open connections that are established by
    2098             :  * postgres_fdw from the local session to the foreign servers.
    2099             :  * If the connections are used in the current local transaction, they are
    2100             :  * not disconnected and warning messages are reported. This function
    2101             :  * returns true if it disconnects at least one connection, otherwise false.
    2102             :  */
    2103             : Datum
    2104           8 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
    2105             : {
    2106           8 :     PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
    2107             : }
    2108             : 
    2109             : /*
    2110             :  * Workhorse to disconnect cached connections.
    2111             :  *
    2112             :  * This function scans all the connection cache entries and disconnects
    2113             :  * the open connections whose foreign server OID matches with
    2114             :  * the specified one. If InvalidOid is specified, it disconnects all
    2115             :  * the cached connections.
    2116             :  *
    2117             :  * This function emits a warning for each connection that's used in
    2118             :  * the current transaction and doesn't close it. It returns true if
    2119             :  * it disconnects at least one connection, otherwise false.
    2120             :  *
    2121             :  * Note that this function disconnects even the connections that are
    2122             :  * established by other users in the same local session using different
    2123             :  * user mappings. This leads even non-superuser to be able to close
    2124             :  * the connections established by superusers in the same local session.
    2125             :  *
    2126             :  * XXX As of now we don't see any security risk doing this. But we should
    2127             :  * set some restrictions on that, for example, prevent non-superuser
    2128             :  * from closing the connections established by superusers even
    2129             :  * in the same session?
    2130             :  */
    2131             : static bool
    2132          14 : disconnect_cached_connections(Oid serverid)
    2133             : {
    2134             :     HASH_SEQ_STATUS scan;
    2135             :     ConnCacheEntry *entry;
    2136          14 :     bool        all = !OidIsValid(serverid);
    2137          14 :     bool        result = false;
    2138             : 
    2139             :     /*
    2140             :      * Connection cache hashtable has not been initialized yet in this
    2141             :      * session, so return false.
    2142             :      */
    2143          14 :     if (!ConnectionHash)
    2144           0 :         return false;
    2145             : 
    2146          14 :     hash_seq_init(&scan, ConnectionHash);
    2147         114 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2148             :     {
    2149             :         /* Ignore cache entry if no open connection right now. */
    2150         100 :         if (!entry->conn)
    2151          78 :             continue;
    2152             : 
    2153          22 :         if (all || entry->serverid == serverid)
    2154             :         {
    2155             :             /*
    2156             :              * Emit a warning because the connection to close is used in the
    2157             :              * current transaction and cannot be disconnected right now.
    2158             :              */
    2159          16 :             if (entry->xact_depth > 0)
    2160             :             {
    2161             :                 ForeignServer *server;
    2162             : 
    2163           6 :                 server = GetForeignServerExtended(entry->serverid,
    2164             :                                                   FSV_MISSING_OK);
    2165             : 
    2166           6 :                 if (!server)
    2167             :                 {
    2168             :                     /*
    2169             :                      * If the foreign server was dropped while its connection
    2170             :                      * was used in the current transaction, the connection
    2171             :                      * must have been marked as invalid by
    2172             :                      * pgfdw_inval_callback at the end of DROP SERVER command.
    2173             :                      */
    2174             :                     Assert(entry->invalidated);
    2175             : 
    2176           0 :                     ereport(WARNING,
    2177             :                             (errmsg("cannot close dropped server connection because it is still in use")));
    2178             :                 }
    2179             :                 else
    2180           6 :                     ereport(WARNING,
    2181             :                             (errmsg("cannot close connection for server \"%s\" because it is still in use",
    2182             :                                     server->servername)));
    2183             :             }
    2184             :             else
    2185             :             {
    2186          10 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    2187          10 :                 disconnect_pg_server(entry);
    2188          10 :                 result = true;
    2189             :             }
    2190             :         }
    2191             :     }
    2192             : 
    2193          14 :     return result;
    2194             : }

Generated by: LCOV version 1.14