LCOV - code coverage report
Current view: top level - contrib/postgres_fdw - connection.c (source / functions) Coverage Total Hit
Test: PostgreSQL 19beta1 Lines: 87.3 % 772 674
Test Date: 2026-06-27 00:16:44 Functions: 98.2 % 55 54
Legend: Lines:     hit not hit

            Line data    Source code
       1              : /*-------------------------------------------------------------------------
       2              :  *
       3              :  * connection.c
       4              :  *        Connection management functions for postgres_fdw
       5              :  *
       6              :  * Portions Copyright (c) 2012-2026, PostgreSQL Global Development Group
       7              :  *
       8              :  * IDENTIFICATION
       9              :  *        contrib/postgres_fdw/connection.c
      10              :  *
      11              :  *-------------------------------------------------------------------------
      12              :  */
      13              : #include "postgres.h"
      14              : 
      15              : #if HAVE_POLL_H
      16              : #include <poll.h>
      17              : #endif
      18              : 
      19              : #include "access/htup_details.h"
      20              : #include "access/xact.h"
      21              : #include "catalog/pg_user_mapping.h"
      22              : #include "commands/defrem.h"
      23              : #include "common/base64.h"
      24              : #include "funcapi.h"
      25              : #include "libpq/libpq-be.h"
      26              : #include "libpq/libpq-be-fe-helpers.h"
      27              : #include "mb/pg_wchar.h"
      28              : #include "miscadmin.h"
      29              : #include "pgstat.h"
      30              : #include "postgres_fdw.h"
      31              : #include "storage/latch.h"
      32              : #include "utils/builtins.h"
      33              : #include "utils/hsearch.h"
      34              : #include "utils/inval.h"
      35              : #include "utils/syscache.h"
      36              : #include "utils/tuplestore.h"
      37              : 
      38              : /*
      39              :  * Connection cache hash table entry
      40              :  *
      41              :  * The lookup key in this hash table is the user mapping OID. We use just one
      42              :  * connection per user mapping ID, which ensures that all the scans use the
      43              :  * same snapshot during a query.  Using the user mapping OID rather than
      44              :  * the foreign server OID + user OID avoids creating multiple connections when
      45              :  * the public user mapping applies to all user OIDs.
      46              :  *
      47              :  * The "conn" pointer can be NULL if we don't currently have a live connection.
      48              :  * When we do have a connection, xact_depth tracks the current depth of
      49              :  * transactions and subtransactions open on the remote side.  We need to issue
      50              :  * commands at the same nesting depth on the remote as we're executing at
      51              :  * ourselves, so that rolling back a subtransaction will kill the right
      52              :  * queries and not the wrong ones.
      53              :  */
      54              : typedef Oid ConnCacheKey;
      55              : 
      56              : typedef struct ConnCacheEntry
      57              : {
      58              :     ConnCacheKey key;           /* hash key (must be first) */
      59              :     PGconn     *conn;           /* connection to foreign server, or NULL */
      60              :     /* Remaining fields are invalid when conn is NULL: */
      61              :     int         xact_depth;     /* 0 = no xact open, 1 = main xact open, 2 =
      62              :                                  * one level of subxact open, etc */
      63              :     bool        xact_read_only; /* xact r/o state */
      64              :     bool        have_prep_stmt; /* have we prepared any stmts in this xact? */
      65              :     bool        have_error;     /* have any subxacts aborted in this xact? */
      66              :     bool        changing_xact_state;    /* xact state change in process */
      67              :     bool        parallel_commit;    /* do we commit (sub)xacts in parallel? */
      68              :     bool        parallel_abort; /* do we abort (sub)xacts in parallel? */
      69              :     bool        invalidated;    /* true if reconnect is pending */
      70              :     bool        keep_connections;   /* setting value of keep_connections
      71              :                                      * server option */
      72              :     Oid         serverid;       /* foreign server OID used to get server name */
      73              :     uint32      server_hashvalue;   /* hash value of foreign server OID */
      74              :     uint32      mapping_hashvalue;  /* hash value of user mapping OID */
      75              :     PgFdwConnState state;       /* extra per-connection state */
      76              : } ConnCacheEntry;
      77              : 
      78              : /*
      79              :  * Connection cache (initialized on first use)
      80              :  */
      81              : static HTAB *ConnectionHash = NULL;
      82              : 
      83              : /* for assigning cursor numbers and prepared statement numbers */
      84              : static unsigned int cursor_number = 0;
      85              : static unsigned int prep_stmt_number = 0;
      86              : 
      87              : /* tracks whether any work is needed in callback functions */
      88              : static bool xact_got_connection = false;
      89              : 
      90              : /*
      91              :  * tracks the topmost read-only local transaction's nesting level determined
      92              :  * by GetTopReadOnlyTransactionNestLevel()
      93              :  */
      94              : static int  read_only_level = 0;
      95              : 
      96              : /* custom wait event values, retrieved from shared memory */
      97              : static uint32 pgfdw_we_cleanup_result = 0;
      98              : static uint32 pgfdw_we_connect = 0;
      99              : static uint32 pgfdw_we_get_result = 0;
     100              : 
     101              : /*
     102              :  * Milliseconds to wait to cancel an in-progress query or execute a cleanup
     103              :  * query; if it takes longer than 30 seconds to do these, we assume the
     104              :  * connection is dead.
     105              :  */
     106              : #define CONNECTION_CLEANUP_TIMEOUT  30000
     107              : 
     108              : /*
     109              :  * Milliseconds to wait before issuing another cancel request.  This covers
     110              :  * the race condition where the remote session ignored our cancel request
     111              :  * because it arrived while idle.
     112              :  */
     113              : #define RETRY_CANCEL_TIMEOUT    1000
     114              : 
     115              : /* Macro for constructing abort command to be sent */
     116              : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
     117              :     do { \
     118              :         if (toplevel) \
     119              :             snprintf((sql), sizeof(sql), \
     120              :                      "ABORT TRANSACTION"); \
     121              :         else \
     122              :             snprintf((sql), sizeof(sql), \
     123              :                      "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
     124              :                      (entry)->xact_depth, (entry)->xact_depth); \
     125              :     } while(0)
     126              : 
     127              : /*
     128              :  * Extension version number, for supporting older extension versions' objects
     129              :  */
     130              : enum pgfdwVersion
     131              : {
     132              :     PGFDW_V1_1 = 0,
     133              :     PGFDW_V1_2,
     134              : };
     135              : 
     136              : /*
     137              :  * SQL functions
     138              :  */
     139            4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
     140            5 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
     141            5 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
     142            5 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
     143           13 : PG_FUNCTION_INFO_V1(postgres_fdw_connection);
     144              : 
     145              : /* prototypes of private functions */
     146              : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
     147              : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
     148              : static void disconnect_pg_server(ConnCacheEntry *entry);
     149              : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
     150              : static void configure_remote_session(PGconn *conn);
     151              : static void do_sql_command_begin(PGconn *conn, const char *sql);
     152              : static void do_sql_command_end(PGconn *conn, const char *sql,
     153              :                                bool consume_input);
     154              : static void begin_remote_xact(ConnCacheEntry *entry);
     155              : static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
     156              :                                   const char *sql);
     157              : static void pgfdw_xact_callback(XactEvent event, void *arg);
     158              : static void pgfdw_subxact_callback(SubXactEvent event,
     159              :                                    SubTransactionId mySubid,
     160              :                                    SubTransactionId parentSubid,
     161              :                                    void *arg);
     162              : static void pgfdw_inval_callback(Datum arg, SysCacheIdentifier cacheid,
     163              :                                  uint32 hashvalue);
     164              : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
     165              : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
     166              : static bool pgfdw_cancel_query(PGconn *conn);
     167              : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
     168              : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
     169              :                                    TimestampTz retrycanceltime,
     170              :                                    bool consume_input);
     171              : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
     172              :                                      bool ignore_errors);
     173              : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
     174              : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
     175              :                                          TimestampTz endtime,
     176              :                                          bool consume_input,
     177              :                                          bool ignore_errors);
     178              : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
     179              :                                      TimestampTz retrycanceltime,
     180              :                                      PGresult **result, bool *timed_out);
     181              : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
     182              : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
     183              :                                       List **pending_entries,
     184              :                                       List **cancel_requested);
     185              : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
     186              : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
     187              :                                                int curlevel);
     188              : static void pgfdw_finish_abort_cleanup(List *pending_entries,
     189              :                                        List *cancel_requested,
     190              :                                        bool toplevel);
     191              : static void pgfdw_security_check(const char **keywords, const char **values,
     192              :                                  UserMapping *user, PGconn *conn);
     193              : static bool UserMappingPasswordRequired(UserMapping *user);
     194              : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
     195              : static bool disconnect_cached_connections(Oid serverid);
     196              : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
     197              :                                                   enum pgfdwVersion api_version);
     198              : static int  pgfdw_conn_check(PGconn *conn);
     199              : static bool pgfdw_conn_checkable(void);
     200              : static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
     201              : 
     202              : /*
     203              :  * Get a PGconn which can be used to execute queries on the remote PostgreSQL
     204              :  * server with the user's authorization.  A new connection is established
     205              :  * if we don't already have a suitable one, and a transaction is opened at
     206              :  * the right subtransaction nesting depth if we didn't do that already.
     207              :  *
     208              :  * will_prep_stmt must be true if caller intends to create any prepared
     209              :  * statements.  Since those don't go away automatically at transaction end
     210              :  * (not even on error), we need this flag to cue manual cleanup.
     211              :  *
     212              :  * If state is not NULL, *state receives the per-connection state associated
     213              :  * with the PGconn.
     214              :  */
     215              : PGconn *
     216         2296 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
     217              : {
     218              :     bool        found;
     219         2296 :     bool        retry = false;
     220              :     ConnCacheEntry *entry;
     221              :     ConnCacheKey key;
     222         2296 :     MemoryContext ccxt = CurrentMemoryContext;
     223              : 
     224              :     /* First time through, initialize connection cache hashtable */
     225         2296 :     if (ConnectionHash == NULL)
     226              :     {
     227              :         HASHCTL     ctl;
     228              : 
     229           13 :         if (pgfdw_we_get_result == 0)
     230           13 :             pgfdw_we_get_result =
     231           13 :                 WaitEventExtensionNew("PostgresFdwGetResult");
     232              : 
     233           13 :         ctl.keysize = sizeof(ConnCacheKey);
     234           13 :         ctl.entrysize = sizeof(ConnCacheEntry);
     235           13 :         ConnectionHash = hash_create("postgres_fdw connections", 8,
     236              :                                      &ctl,
     237              :                                      HASH_ELEM | HASH_BLOBS);
     238              : 
     239              :         /*
     240              :          * Register some callback functions that manage connection cleanup.
     241              :          * This should be done just once in each backend.
     242              :          */
     243           13 :         RegisterXactCallback(pgfdw_xact_callback, NULL);
     244           13 :         RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
     245           13 :         CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
     246              :                                       pgfdw_inval_callback, (Datum) 0);
     247           13 :         CacheRegisterSyscacheCallback(USERMAPPINGOID,
     248              :                                       pgfdw_inval_callback, (Datum) 0);
     249              :     }
     250              : 
     251              :     /* Set flag that we did GetConnection during the current transaction */
     252         2296 :     xact_got_connection = true;
     253              : 
     254              :     /* Create hash key for the entry.  Assume no pad bytes in key struct */
     255         2296 :     key = user->umid;
     256              : 
     257              :     /*
     258              :      * Find or create cached entry for requested connection.
     259              :      */
     260         2296 :     entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
     261         2296 :     if (!found)
     262              :     {
     263              :         /*
     264              :          * We need only clear "conn" here; remaining fields will be filled
     265              :          * later when "conn" is set.
     266              :          */
     267           24 :         entry->conn = NULL;
     268              :     }
     269              : 
     270              :     /* Reject further use of connections which failed abort cleanup. */
     271         2296 :     pgfdw_reject_incomplete_xact_state_change(entry);
     272              : 
     273              :     /*
     274              :      * If the connection needs to be remade due to invalidation, disconnect as
     275              :      * soon as we're out of all transactions.
     276              :      */
     277         2294 :     if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
     278              :     {
     279            0 :         elog(DEBUG3, "closing connection %p for option changes to take effect",
     280              :              entry->conn);
     281            0 :         disconnect_pg_server(entry);
     282              :     }
     283              : 
     284              :     /*
     285              :      * If cache entry doesn't have a connection, we have to establish a new
     286              :      * connection.  (If connect_pg_server throws an error, the cache entry
     287              :      * will remain in a valid empty state, ie conn == NULL.)
     288              :      */
     289         2294 :     if (entry->conn == NULL)
     290           86 :         make_new_connection(entry, user);
     291              : 
     292              :     /*
     293              :      * We check the health of the cached connection here when using it.  In
     294              :      * cases where we're out of all transactions, if a broken connection is
     295              :      * detected, we try to reestablish a new connection later.
     296              :      */
     297         2285 :     PG_TRY();
     298              :     {
     299              :         /* Process a pending asynchronous request if any. */
     300         2285 :         if (entry->state.pendingAreq)
     301            0 :             process_pending_request(entry->state.pendingAreq);
     302              :         /* Start a new transaction or subtransaction if needed. */
     303         2285 :         begin_remote_xact(entry);
     304              :     }
     305            3 :     PG_CATCH();
     306              :     {
     307            3 :         MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
     308            3 :         ErrorData  *errdata = CopyErrorData();
     309              : 
     310              :         /*
     311              :          * Determine whether to try to reestablish the connection.
     312              :          *
     313              :          * After a broken connection is detected in libpq, any error other
     314              :          * than connection failure (e.g., out-of-memory) can be thrown
     315              :          * somewhere between return from libpq and the expected ereport() call
     316              :          * in pgfdw_report_error(). In this case, since PQstatus() indicates
     317              :          * CONNECTION_BAD, checking only PQstatus() causes the false detection
     318              :          * of connection failure. To avoid this, we also verify that the
     319              :          * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
     320              :          * checking only the sqlstate can cause another false detection
     321              :          * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
     322              :          * for any libpq-originated error condition.
     323              :          */
     324            3 :         if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
     325            3 :             PQstatus(entry->conn) != CONNECTION_BAD ||
     326            3 :             entry->xact_depth > 0)
     327              :         {
     328            1 :             MemoryContextSwitchTo(ecxt);
     329            1 :             PG_RE_THROW();
     330              :         }
     331              : 
     332              :         /* Clean up the error state */
     333            2 :         FlushErrorState();
     334            2 :         FreeErrorData(errdata);
     335            2 :         errdata = NULL;
     336              : 
     337            2 :         retry = true;
     338              :     }
     339         2284 :     PG_END_TRY();
     340              : 
     341              :     /*
     342              :      * If a broken connection is detected, disconnect it, reestablish a new
     343              :      * connection and retry a new remote transaction. If connection failure is
     344              :      * reported again, we give up getting a connection.
     345              :      */
     346         2284 :     if (retry)
     347              :     {
     348              :         Assert(entry->xact_depth == 0);
     349              : 
     350            2 :         ereport(DEBUG3,
     351              :                 (errmsg_internal("could not start remote transaction on connection %p",
     352              :                                  entry->conn)),
     353              :                 errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
     354              : 
     355            2 :         elog(DEBUG3, "closing connection %p to reestablish a new one",
     356              :              entry->conn);
     357            2 :         disconnect_pg_server(entry);
     358              : 
     359            2 :         make_new_connection(entry, user);
     360              : 
     361            2 :         begin_remote_xact(entry);
     362              :     }
     363              : 
     364              :     /* Remember if caller will prepare statements */
     365         2284 :     entry->have_prep_stmt |= will_prep_stmt;
     366              : 
     367              :     /* If caller needs access to the per-connection state, return it. */
     368         2284 :     if (state)
     369          782 :         *state = &entry->state;
     370              : 
     371         2284 :     return entry->conn;
     372              : }
     373              : 
     374              : /*
     375              :  * Reset all transient state fields in the cached connection entry and
     376              :  * establish new connection to the remote server.
     377              :  */
     378              : static void
     379           88 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
     380              : {
     381           88 :     ForeignServer *server = GetForeignServer(user->serverid);
     382              :     ListCell   *lc;
     383              : 
     384              :     Assert(entry->conn == NULL);
     385              : 
     386              :     /* Reset all transient state fields, to be sure all are clean */
     387           88 :     entry->xact_depth = 0;
     388           88 :     entry->xact_read_only = false;
     389           88 :     entry->have_prep_stmt = false;
     390           88 :     entry->have_error = false;
     391           88 :     entry->changing_xact_state = false;
     392           88 :     entry->invalidated = false;
     393           88 :     entry->serverid = server->serverid;
     394           88 :     entry->server_hashvalue =
     395           88 :         GetSysCacheHashValue1(FOREIGNSERVEROID,
     396              :                               ObjectIdGetDatum(server->serverid));
     397           88 :     entry->mapping_hashvalue =
     398           88 :         GetSysCacheHashValue1(USERMAPPINGOID,
     399              :                               ObjectIdGetDatum(user->umid));
     400           88 :     memset(&entry->state, 0, sizeof(entry->state));
     401              : 
     402              :     /*
     403              :      * Determine whether to keep the connection that we're about to make here
     404              :      * open even after the transaction using it ends, so that the subsequent
     405              :      * transactions can re-use it.
     406              :      *
     407              :      * By default, all the connections to any foreign servers are kept open.
     408              :      *
     409              :      * Also determine whether to commit/abort (sub)transactions opened on the
     410              :      * remote server in parallel at (sub)transaction end, which is disabled by
     411              :      * default.
     412              :      *
     413              :      * Note: it's enough to determine these only when making a new connection
     414              :      * because if these settings for it are changed, it will be closed and
     415              :      * re-made later.
     416              :      */
     417           88 :     entry->keep_connections = true;
     418           88 :     entry->parallel_commit = false;
     419           88 :     entry->parallel_abort = false;
     420          410 :     foreach(lc, server->options)
     421              :     {
     422          322 :         DefElem    *def = (DefElem *) lfirst(lc);
     423              : 
     424          322 :         if (strcmp(def->defname, "keep_connections") == 0)
     425           16 :             entry->keep_connections = defGetBoolean(def);
     426          306 :         else if (strcmp(def->defname, "parallel_commit") == 0)
     427            2 :             entry->parallel_commit = defGetBoolean(def);
     428          304 :         else if (strcmp(def->defname, "parallel_abort") == 0)
     429            2 :             entry->parallel_abort = defGetBoolean(def);
     430              :     }
     431              : 
     432              :     /* Now try to make the connection */
     433           88 :     entry->conn = connect_pg_server(server, user);
     434              : 
     435           79 :     elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
     436              :          entry->conn, server->servername, user->umid, user->userid);
     437           79 : }
     438              : 
     439              : /*
     440              :  * Check that non-superuser has used password or delegated credentials
     441              :  * to establish connection; otherwise, he's piggybacking on the
     442              :  * postgres server's user identity. See also dblink_security_check()
     443              :  * in contrib/dblink and check_conn_params.
     444              :  */
     445              : static void
     446           81 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
     447              : {
     448              :     /* Superusers bypass the check */
     449           81 :     if (superuser_arg(user->userid))
     450           73 :         return;
     451              : 
     452              : #ifdef ENABLE_GSS
     453              :     /* Connected via GSSAPI with delegated credentials- all good. */
     454              :     if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
     455              :         return;
     456              : #endif
     457              : 
     458              :     /* Ok if superuser set PW required false. */
     459            8 :     if (!UserMappingPasswordRequired(user))
     460            2 :         return;
     461              : 
     462              :     /* Connected via PW, with PW required true, and provided non-empty PW. */
     463            6 :     if (PQconnectionUsedPassword(conn))
     464              :     {
     465              :         /* ok if params contain a non-empty password */
     466           40 :         for (int i = 0; keywords[i] != NULL; i++)
     467              :         {
     468           36 :             if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     469            0 :                 return;
     470              :         }
     471              :     }
     472              : 
     473              :     /*
     474              :      * Ok if SCRAM pass-through is being used and all required SCRAM options
     475              :      * are set correctly. If pgfdw_has_required_scram_options returns true we
     476              :      * assume that UseScramPassthrough is also true since SCRAM options are
     477              :      * only set when UseScramPassthrough is enabled.
     478              :      */
     479            6 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
     480            4 :         return;
     481              : 
     482            2 :     ereport(ERROR,
     483              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     484              :              errmsg("password or GSSAPI delegated credentials required"),
     485              :              errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
     486              :              errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
     487              : }
     488              : 
     489              : /*
     490              :  * Construct connection params from generic options of ForeignServer and
     491              :  * UserMapping.  (Some of them might not be libpq options, in which case we'll
     492              :  * just waste a few array slots.)
     493              :  */
     494              : static void
     495           97 : construct_connection_params(ForeignServer *server, UserMapping *user,
     496              :                             const char ***p_keywords, const char ***p_values,
     497              :                             char **p_appname)
     498              : {
     499              :     const char **keywords;
     500              :     const char **values;
     501           97 :     char       *appname = NULL;
     502              :     int         n;
     503              : 
     504              :     /*
     505              :      * Add 4 extra slots for application_name, fallback_application_name,
     506              :      * client_encoding, end marker, and 3 extra slots for scram keys and
     507              :      * required scram pass-through options.
     508              :      */
     509           97 :     n = list_length(server->options) + list_length(user->options) + 4 + 3;
     510           97 :     keywords = (const char **) palloc(n * sizeof(char *));
     511           97 :     values = (const char **) palloc(n * sizeof(char *));
     512              : 
     513           97 :     n = 0;
     514          194 :     n += ExtractConnectionOptions(server->options,
     515           97 :                                   keywords + n, values + n);
     516          194 :     n += ExtractConnectionOptions(user->options,
     517           97 :                                   keywords + n, values + n);
     518              : 
     519              :     /*
     520              :      * Use pgfdw_application_name as application_name if set.
     521              :      *
     522              :      * PQconnectdbParams() processes the parameter arrays from start to end.
     523              :      * If any key word is repeated, the last value is used. Therefore note
     524              :      * that pgfdw_application_name must be added to the arrays after options
     525              :      * of ForeignServer are, so that it can override application_name set in
     526              :      * ForeignServer.
     527              :      */
     528           97 :     if (pgfdw_application_name && *pgfdw_application_name != '\0')
     529              :     {
     530            1 :         keywords[n] = "application_name";
     531            1 :         values[n] = pgfdw_application_name;
     532            1 :         n++;
     533              :     }
     534              : 
     535              :     /*
     536              :      * Search the parameter arrays to find application_name setting, and
     537              :      * replace escape sequences in it with status information if found.  The
     538              :      * arrays are searched backwards because the last value is used if
     539              :      * application_name is repeatedly set.
     540              :      */
     541          279 :     for (int i = n - 1; i >= 0; i--)
     542              :     {
     543          209 :         if (strcmp(keywords[i], "application_name") == 0 &&
     544           27 :             *(values[i]) != '\0')
     545              :         {
     546              :             /*
     547              :              * Use this application_name setting if it's not empty string even
     548              :              * after any escape sequences in it are replaced.
     549              :              */
     550           27 :             appname = process_pgfdw_appname(values[i]);
     551           27 :             if (appname[0] != '\0')
     552              :             {
     553           27 :                 values[i] = appname;
     554           27 :                 break;
     555              :             }
     556              : 
     557              :             /*
     558              :              * This empty application_name is not used, so we set values[i] to
     559              :              * NULL and keep searching the array to find the next one.
     560              :              */
     561            0 :             values[i] = NULL;
     562            0 :             pfree(appname);
     563            0 :             appname = NULL;
     564              :         }
     565              :     }
     566              : 
     567           97 :     *p_appname = appname;
     568              : 
     569              :     /* Use "postgres_fdw" as fallback_application_name */
     570           97 :     keywords[n] = "fallback_application_name";
     571           97 :     values[n] = "postgres_fdw";
     572           97 :     n++;
     573              : 
     574              :     /* Set client_encoding so that libpq can convert encoding properly. */
     575           97 :     keywords[n] = "client_encoding";
     576           97 :     values[n] = GetDatabaseEncodingName();
     577           97 :     n++;
     578              : 
     579              :     /* Add required SCRAM pass-through connection options if it's enabled. */
     580           97 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
     581              :     {
     582              :         int         len;
     583              :         int         encoded_len;
     584              : 
     585            6 :         keywords[n] = "scram_client_key";
     586            6 :         len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
     587              :         /* don't forget the zero-terminator */
     588            6 :         values[n] = palloc0(len + 1);
     589            6 :         encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
     590              :                                     sizeof(MyProcPort->scram_ClientKey),
     591            6 :                                     (char *) values[n], len);
     592            6 :         if (encoded_len < 0)
     593            0 :             elog(ERROR, "could not encode SCRAM client key");
     594            6 :         n++;
     595              : 
     596            6 :         keywords[n] = "scram_server_key";
     597            6 :         len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
     598              :         /* don't forget the zero-terminator */
     599            6 :         values[n] = palloc0(len + 1);
     600            6 :         encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
     601              :                                     sizeof(MyProcPort->scram_ServerKey),
     602            6 :                                     (char *) values[n], len);
     603            6 :         if (encoded_len < 0)
     604            0 :             elog(ERROR, "could not encode SCRAM server key");
     605            6 :         n++;
     606              : 
     607              :         /*
     608              :          * Require scram-sha-256 to ensure that no other auth method is used
     609              :          * when connecting with foreign server.
     610              :          */
     611            6 :         keywords[n] = "require_auth";
     612            6 :         values[n] = "scram-sha-256";
     613            6 :         n++;
     614              :     }
     615              : 
     616           97 :     keywords[n] = values[n] = NULL;
     617              : 
     618              :     /* Verify the set of connection parameters. */
     619           97 :     check_conn_params(keywords, values, user);
     620              : 
     621           94 :     *p_keywords = keywords;
     622           94 :     *p_values = values;
     623           94 : }
     624              : 
     625              : /*
     626              :  * Connect to remote server using specified server and user mapping properties.
     627              :  */
     628              : static PGconn *
     629           88 : connect_pg_server(ForeignServer *server, UserMapping *user)
     630              : {
     631           88 :     PGconn     *volatile conn = NULL;
     632              : 
     633              :     /*
     634              :      * Use PG_TRY block to ensure closing connection on error.
     635              :      */
     636           88 :     PG_TRY();
     637              :     {
     638              :         const char **keywords;
     639              :         const char **values;
     640              :         char       *appname;
     641              :         PGconn     *start_conn;
     642              : 
     643           88 :         construct_connection_params(server, user, &keywords, &values, &appname);
     644              : 
     645              :         /* first time, allocate or get the custom wait event */
     646           85 :         if (pgfdw_we_connect == 0)
     647           12 :             pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
     648              : 
     649              :         /* OK to make connection */
     650              :         start_conn =
     651           85 :             libpqsrv_connect_params_start(keywords, values,
     652              :                                            /* expand_dbname = */ false);
     653           85 :         PQsetNoticeReceiver(start_conn, libpqsrv_notice_receiver,
     654              :                             "received message via remote connection");
     655           85 :         libpqsrv_connect_complete(start_conn, pgfdw_we_connect);
     656           85 :         conn = start_conn;
     657              : 
     658           85 :         if (!conn || PQstatus(conn) != CONNECTION_OK)
     659            4 :             ereport(ERROR,
     660              :                     (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
     661              :                      errmsg("could not connect to server \"%s\"",
     662              :                             server->servername),
     663              :                      errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
     664              : 
     665              :         /* Perform post-connection security checks. */
     666           81 :         pgfdw_security_check(keywords, values, user, conn);
     667              : 
     668              :         /* Prepare new session for use */
     669           79 :         configure_remote_session(conn);
     670              : 
     671           79 :         if (appname != NULL)
     672           26 :             pfree(appname);
     673           79 :         pfree(keywords);
     674           79 :         pfree(values);
     675              :     }
     676            9 :     PG_CATCH();
     677              :     {
     678            9 :         libpqsrv_disconnect(conn);
     679            9 :         PG_RE_THROW();
     680              :     }
     681           79 :     PG_END_TRY();
     682              : 
     683           79 :     return conn;
     684              : }
     685              : 
     686              : /*
     687              :  * Disconnect any open connection for a connection cache entry.
     688              :  */
     689              : static void
     690           70 : disconnect_pg_server(ConnCacheEntry *entry)
     691              : {
     692           70 :     if (entry->conn != NULL)
     693              :     {
     694           70 :         libpqsrv_disconnect(entry->conn);
     695           70 :         entry->conn = NULL;
     696              :     }
     697           70 : }
     698              : 
     699              : /*
     700              :  * Check and return the value of password_required, if defined; otherwise,
     701              :  * return true, which is the default value of it.  The mapping has been
     702              :  * pre-validated.
     703              :  */
     704              : static bool
     705           18 : UserMappingPasswordRequired(UserMapping *user)
     706              : {
     707              :     ListCell   *cell;
     708              : 
     709           33 :     foreach(cell, user->options)
     710              :     {
     711           18 :         DefElem    *def = (DefElem *) lfirst(cell);
     712              : 
     713           18 :         if (strcmp(def->defname, "password_required") == 0)
     714            3 :             return defGetBoolean(def);
     715              :     }
     716              : 
     717           15 :     return true;
     718              : }
     719              : 
     720              : /*
     721              :  * Return whether SCRAM pass-through is enabled.
     722              :  *
     723              :  * If use_scram_passthrough is specified in both the foreign server
     724              :  * and the user mapping, the user mapping setting takes precedence.
     725              :  */
     726              : static bool
     727            7 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
     728              : {
     729              :     ListCell   *cell;
     730              : 
     731           14 :     foreach(cell, user->options)
     732              :     {
     733            8 :         DefElem    *def = (DefElem *) lfirst(cell);
     734              : 
     735            8 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
     736            1 :             return defGetBoolean(def);
     737              :     }
     738              : 
     739           24 :     foreach(cell, server->options)
     740              :     {
     741           24 :         DefElem    *def = (DefElem *) lfirst(cell);
     742              : 
     743           24 :         if (strcmp(def->defname, "use_scram_passthrough") == 0)
     744            6 :             return defGetBoolean(def);
     745              :     }
     746              : 
     747            0 :     return false;
     748              : }
     749              : 
     750              : /*
     751              :  * For non-superusers, insist that the connstr specify a password or that the
     752              :  * user provided their own GSSAPI delegated credentials.  This
     753              :  * prevents a password from being picked up from .pgpass, a service file, the
     754              :  * environment, etc.  We don't want the postgres user's passwords,
     755              :  * certificates, etc to be accessible to non-superusers.  (See also
     756              :  * dblink_connstr_check in contrib/dblink.)
     757              :  */
     758              : static void
     759           97 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
     760              : {
     761              :     int         i;
     762              : 
     763              :     /* no check required if superuser */
     764           97 :     if (superuser_arg(user->userid))
     765           84 :         return;
     766              : 
     767              : #ifdef ENABLE_GSS
     768              :     /* ok if the user provided their own delegated credentials */
     769              :     if (be_gssapi_get_delegation(MyProcPort))
     770              :         return;
     771              : #endif
     772              : 
     773              :     /* ok if params contain a non-empty password */
     774           91 :     for (i = 0; keywords[i] != NULL; i++)
     775              :     {
     776           81 :         if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
     777            3 :             return;
     778              :     }
     779              : 
     780              :     /* ok if the superuser explicitly said so at user mapping creation time */
     781           10 :     if (!UserMappingPasswordRequired(user))
     782            1 :         return;
     783              : 
     784              :     /*
     785              :      * Ok if SCRAM pass-through is being used and all required scram options
     786              :      * are set correctly. If pgfdw_has_required_scram_options returns true we
     787              :      * assume that UseScramPassthrough is also true since SCRAM options are
     788              :      * only set when UseScramPassthrough is enabled.
     789              :      */
     790            9 :     if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
     791            6 :         return;
     792              : 
     793            3 :     ereport(ERROR,
     794              :             (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
     795              :              errmsg("password or GSSAPI delegated credentials required"),
     796              :              errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
     797              : }
     798              : 
     799              : /*
     800              :  * Issue SET commands to make sure remote session is configured properly.
     801              :  *
     802              :  * We do this just once at connection, assuming nothing will change the
     803              :  * values later.  Since we'll never send volatile function calls to the
     804              :  * remote, there shouldn't be any way to break this assumption from our end.
     805              :  * It's possible to think of ways to break it at the remote end, eg making
     806              :  * a foreign table point to a view that includes a set_config call ---
     807              :  * but once you admit the possibility of a malicious view definition,
     808              :  * there are any number of ways to break things.
     809              :  */
     810              : static void
     811           79 : configure_remote_session(PGconn *conn)
     812              : {
     813           79 :     int         remoteversion = PQserverVersion(conn);
     814              : 
     815              :     /* Force the search path to contain only pg_catalog (see deparse.c) */
     816           79 :     do_sql_command(conn, "SET search_path = pg_catalog");
     817              : 
     818              :     /*
     819              :      * Set remote timezone; this is basically just cosmetic, since all
     820              :      * transmitted and returned timestamptzs should specify a zone explicitly
     821              :      * anyway.  However it makes the regression test outputs more predictable.
     822              :      *
     823              :      * We don't risk setting remote zone equal to ours, since the remote
     824              :      * server might use a different timezone database.  Instead, use GMT
     825              :      * (quoted, because very old servers are picky about case).  That's
     826              :      * guaranteed to work regardless of the remote's timezone database,
     827              :      * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
     828              :      */
     829           79 :     do_sql_command(conn, "SET timezone = 'GMT'");
     830              : 
     831              :     /*
     832              :      * Set values needed to ensure unambiguous data output from remote.  (This
     833              :      * logic should match what pg_dump does.  See also set_transmission_modes
     834              :      * in postgres_fdw.c.)
     835              :      */
     836           79 :     do_sql_command(conn, "SET datestyle = ISO");
     837           79 :     if (remoteversion >= 80400)
     838           79 :         do_sql_command(conn, "SET intervalstyle = postgres");
     839           79 :     if (remoteversion >= 90000)
     840           79 :         do_sql_command(conn, "SET extra_float_digits = 3");
     841              :     else
     842            0 :         do_sql_command(conn, "SET extra_float_digits = 2");
     843           79 : }
     844              : 
     845              : /*
     846              :  * Convenience subroutine to issue a non-data-returning SQL command to remote
     847              :  */
     848              : void
     849         1925 : do_sql_command(PGconn *conn, const char *sql)
     850              : {
     851         1925 :     do_sql_command_begin(conn, sql);
     852         1925 :     do_sql_command_end(conn, sql, false);
     853         1921 : }
     854              : 
     855              : static void
     856         1943 : do_sql_command_begin(PGconn *conn, const char *sql)
     857              : {
     858         1943 :     if (!PQsendQuery(conn, sql))
     859            0 :         pgfdw_report_error(NULL, conn, sql);
     860         1943 : }
     861              : 
     862              : static void
     863         1943 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
     864              : {
     865              :     PGresult   *res;
     866              : 
     867              :     /*
     868              :      * If requested, consume whatever data is available from the socket. (Note
     869              :      * that if all data is available, this allows pgfdw_get_result to call
     870              :      * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
     871              :      * would be large compared to the overhead of PQconsumeInput.)
     872              :      */
     873         1943 :     if (consume_input && !PQconsumeInput(conn))
     874            0 :         pgfdw_report_error(NULL, conn, sql);
     875         1943 :     res = pgfdw_get_result(conn);
     876         1943 :     if (PQresultStatus(res) != PGRES_COMMAND_OK)
     877            4 :         pgfdw_report_error(res, conn, sql);
     878         1939 :     PQclear(res);
     879         1939 : }
     880              : 
     881              : /*
     882              :  * Start remote transaction or subtransaction, if needed.
     883              :  *
     884              :  * Note that we always use at least REPEATABLE READ in the remote session.
     885              :  * This is so that, if a query initiates multiple scans of the same or
     886              :  * different foreign tables, we will get snapshot-consistent results from
     887              :  * those scans.  A disadvantage is that we can't provide sane emulation of
     888              :  * READ COMMITTED behavior --- it would be nice if we had some other way to
     889              :  * control which remote queries share a snapshot.
     890              :  *
     891              :  * Note also that we always start the remote transaction with the same
     892              :  * read/write and deferrable properties as the local transaction, and start
     893              :  * the remote subtransaction with the same read/write property as the local
     894              :  * subtransaction.
     895              :  */
     896              : static void
     897         2287 : begin_remote_xact(ConnCacheEntry *entry)
     898              : {
     899         2287 :     int         curlevel = GetCurrentTransactionNestLevel();
     900              : 
     901              :     /*
     902              :      * If the current local (sub)transaction is read-only, set the topmost
     903              :      * read-only local transaction's nesting level if we haven't yet.
     904              :      *
     905              :      * Note: once it's set, it's retained until the topmost read-only local
     906              :      * transaction is committed/aborted (see pgfdw_xact_callback and
     907              :      * pgfdw_subxact_callback).
     908              :      */
     909         2287 :     if (XactReadOnly)
     910              :     {
     911           10 :         if (read_only_level == 0)
     912            9 :             read_only_level = GetTopReadOnlyTransactionNestLevel();
     913              :         Assert(read_only_level > 0);
     914              :     }
     915              :     else
     916              :         Assert(read_only_level == 0);
     917              : 
     918              :     /*
     919              :      * Start main transaction if we haven't yet; otherwise, change the current
     920              :      * remote (sub)transaction's read/write mode if needed.
     921              :      */
     922         2287 :     if (entry->xact_depth <= 0)
     923              :     {
     924              :         /*
     925              :          * This is the case when we haven't yet started a main transaction.
     926              :          */
     927              :         StringInfoData sql;
     928          782 :         bool        ro = (read_only_level == 1);
     929              : 
     930          782 :         elog(DEBUG3, "starting remote transaction on connection %p",
     931              :              entry->conn);
     932              : 
     933          782 :         initStringInfo(&sql);
     934          782 :         appendStringInfoString(&sql, "START TRANSACTION ISOLATION LEVEL ");
     935          782 :         if (IsolationIsSerializable())
     936            3 :             appendStringInfoString(&sql, "SERIALIZABLE");
     937              :         else
     938          779 :             appendStringInfoString(&sql, "REPEATABLE READ");
     939          782 :         if (ro)
     940            3 :             appendStringInfoString(&sql, " READ ONLY");
     941          782 :         if (XactDeferrable)
     942            2 :             appendStringInfoString(&sql, " DEFERRABLE");
     943          782 :         entry->changing_xact_state = true;
     944          782 :         do_sql_command(entry->conn, sql.data);
     945          780 :         entry->xact_depth = 1;
     946          780 :         if (ro)
     947              :         {
     948              :             Assert(!entry->xact_read_only);
     949            3 :             entry->xact_read_only = true;
     950              :         }
     951          780 :         entry->changing_xact_state = false;
     952              :     }
     953         1505 :     else if (!entry->xact_read_only)
     954              :     {
     955              :         /*
     956              :          * The remote (sub)transaction has been opened in read-write mode.
     957              :          */
     958              :         Assert(read_only_level == 0 ||
     959              :                entry->xact_depth <= read_only_level);
     960              : 
     961              :         /*
     962              :          * If its nesting depth matches read_only_level, it means that the
     963              :          * local read-write (sub)transaction that started it has changed to
     964              :          * read-only after that; in which case change it to read-only as well.
     965              :          * Otherwise, the local (sub)transaction is still read-write, so there
     966              :          * is no need to do anything.
     967              :          */
     968         1504 :         if (entry->xact_depth == read_only_level)
     969              :         {
     970            4 :             entry->changing_xact_state = true;
     971            4 :             do_sql_command(entry->conn, "SET transaction_read_only = on");
     972            4 :             entry->xact_read_only = true;
     973            4 :             entry->changing_xact_state = false;
     974              :         }
     975              :     }
     976              :     else
     977              :     {
     978              :         /*
     979              :          * The remote (sub)transaction has been opened in read-only mode.
     980              :          */
     981              :         Assert(read_only_level > 0 &&
     982              :                entry->xact_depth >= read_only_level);
     983              : 
     984              :         /*
     985              :          * The local read-only (sub)transaction that started it is guaranteed
     986              :          * to be still read-only (see check_transaction_read_only), so there
     987              :          * is no need to do anything.
     988              :          */
     989              :     }
     990              : 
     991              :     /*
     992              :      * If we're in a subtransaction, stack up savepoints to match our level.
     993              :      * This ensures we can rollback just the desired effects when a
     994              :      * subtransaction aborts.
     995              :      */
     996         2306 :     while (entry->xact_depth < curlevel)
     997              :     {
     998              :         StringInfoData sql;
     999           22 :         bool        ro = (entry->xact_depth + 1 == read_only_level);
    1000              : 
    1001           22 :         initStringInfo(&sql);
    1002           22 :         appendStringInfo(&sql, "SAVEPOINT s%d", entry->xact_depth + 1);
    1003           22 :         if (ro)
    1004            2 :             appendStringInfoString(&sql, "; SET transaction_read_only = on");
    1005           22 :         entry->changing_xact_state = true;
    1006           22 :         do_sql_command(entry->conn, sql.data);
    1007           21 :         entry->xact_depth++;
    1008           21 :         if (ro)
    1009              :         {
    1010              :             Assert(!entry->xact_read_only);
    1011            2 :             entry->xact_read_only = true;
    1012              :         }
    1013           21 :         entry->changing_xact_state = false;
    1014              :     }
    1015         2284 : }
    1016              : 
    1017              : /*
    1018              :  * Release connection reference count created by calling GetConnection.
    1019              :  */
    1020              : void
    1021         2209 : ReleaseConnection(PGconn *conn)
    1022              : {
    1023              :     /*
    1024              :      * Currently, we don't actually track connection references because all
    1025              :      * cleanup is managed on a transaction or subtransaction basis instead. So
    1026              :      * there's nothing to do here.
    1027              :      */
    1028         2209 : }
    1029              : 
    1030              : /*
    1031              :  * Assign a "unique" number for a cursor.
    1032              :  *
    1033              :  * These really only need to be unique per connection within a transaction.
    1034              :  * For the moment we ignore the per-connection point and assign them across
    1035              :  * all connections in the transaction, but we ask for the connection to be
    1036              :  * supplied in case we want to refine that.
    1037              :  *
    1038              :  * Note that even if wraparound happens in a very long transaction, actual
    1039              :  * collisions are highly improbable; just be sure to use %u not %d to print.
    1040              :  */
    1041              : unsigned int
    1042          578 : GetCursorNumber(PGconn *conn)
    1043              : {
    1044          578 :     return ++cursor_number;
    1045              : }
    1046              : 
    1047              : /*
    1048              :  * Assign a "unique" number for a prepared statement.
    1049              :  *
    1050              :  * This works much like GetCursorNumber, except that we never reset the counter
    1051              :  * within a session.  That's because we can't be 100% sure we've gotten rid
    1052              :  * of all prepared statements on all connections, and it's not really worth
    1053              :  * increasing the risk of prepared-statement name collisions by resetting.
    1054              :  */
    1055              : unsigned int
    1056          188 : GetPrepStmtNumber(PGconn *conn)
    1057              : {
    1058          188 :     return ++prep_stmt_number;
    1059              : }
    1060              : 
    1061              : /*
    1062              :  * Submit a query and wait for the result.
    1063              :  *
    1064              :  * Since we don't use non-blocking mode, this can't process interrupts while
    1065              :  * pushing the query text to the server.  That risk is relatively small, so we
    1066              :  * ignore that for now.
    1067              :  *
    1068              :  * Caller is responsible for the error handling on the result.
    1069              :  */
    1070              : PGresult *
    1071         4188 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
    1072              : {
    1073              :     /* First, process a pending asynchronous request, if any. */
    1074         4188 :     if (state && state->pendingAreq)
    1075            4 :         process_pending_request(state->pendingAreq);
    1076              : 
    1077         4188 :     if (!PQsendQuery(conn, query))
    1078            1 :         return NULL;
    1079         4187 :     return pgfdw_get_result(conn);
    1080              : }
    1081              : 
    1082              : /*
    1083              :  * Wrap libpqsrv_get_result_last(), adding wait event.
    1084              :  *
    1085              :  * Caller is responsible for the error handling on the result.
    1086              :  */
    1087              : PGresult *
    1088         8453 : pgfdw_get_result(PGconn *conn)
    1089              : {
    1090         8453 :     return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
    1091              : }
    1092              : 
    1093              : /*
    1094              :  * Report an error we got from the remote server.
    1095              :  *
    1096              :  * Callers should use pgfdw_report_error() to throw an error, or use
    1097              :  * pgfdw_report() for lesser message levels.  (We make this distinction
    1098              :  * so that pgfdw_report_error() can be marked noreturn.)
    1099              :  *
    1100              :  * res: PGresult containing the error (might be NULL)
    1101              :  * conn: connection we did the query on
    1102              :  * sql: NULL, or text of remote command we tried to execute
    1103              :  *
    1104              :  * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
    1105              :  * in which case memory context cleanup will clear it eventually).
    1106              :  *
    1107              :  * Note: callers that choose not to throw ERROR for a remote error are
    1108              :  * responsible for making sure that the associated ConnCacheEntry gets
    1109              :  * marked with have_error = true.
    1110              :  */
    1111              : void
    1112           27 : pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
    1113              : {
    1114           27 :     pgfdw_report_internal(ERROR, res, conn, sql);
    1115            0 :     pg_unreachable();
    1116              : }
    1117              : 
    1118              : void
    1119            2 : pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
    1120              : {
    1121              :     Assert(elevel < ERROR);      /* use pgfdw_report_error for that */
    1122            2 :     pgfdw_report_internal(elevel, res, conn, sql);
    1123            2 : }
    1124              : 
    1125              : static void
    1126           29 : pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
    1127              :                       const char *sql)
    1128              : {
    1129           29 :     char       *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
    1130           29 :     char       *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
    1131           29 :     char       *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
    1132           29 :     char       *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
    1133           29 :     char       *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
    1134              :     int         sqlstate;
    1135              : 
    1136           29 :     if (diag_sqlstate)
    1137           22 :         sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
    1138              :                                  diag_sqlstate[1],
    1139              :                                  diag_sqlstate[2],
    1140              :                                  diag_sqlstate[3],
    1141              :                                  diag_sqlstate[4]);
    1142              :     else
    1143            7 :         sqlstate = ERRCODE_CONNECTION_FAILURE;
    1144              : 
    1145              :     /*
    1146              :      * If we don't get a message from the PGresult, try the PGconn.  This is
    1147              :      * needed because for connection-level failures, PQgetResult may just
    1148              :      * return NULL, not a PGresult at all.
    1149              :      */
    1150           29 :     if (message_primary == NULL)
    1151            7 :         message_primary = pchomp(PQerrorMessage(conn));
    1152              : 
    1153           29 :     ereport(elevel,
    1154              :             (errcode(sqlstate),
    1155              :              (message_primary != NULL && message_primary[0] != '\0') ?
    1156              :              errmsg_internal("%s", message_primary) :
    1157              :              errmsg("could not obtain message string for remote error"),
    1158              :              message_detail ? errdetail_internal("%s", message_detail) : 0,
    1159              :              message_hint ? errhint("%s", message_hint) : 0,
    1160              :              message_context ? errcontext("%s", message_context) : 0,
    1161              :              sql ? errcontext("remote SQL command: %s", sql) : 0));
    1162            2 :     PQclear(res);
    1163            2 : }
    1164              : 
    1165              : /*
    1166              :  * pgfdw_xact_callback --- cleanup at main-transaction end.
    1167              :  *
    1168              :  * This runs just late enough that it must not enter user-defined code
    1169              :  * locally.  (Entering such code on the remote side is fine.  Its remote
    1170              :  * COMMIT TRANSACTION may run deferred triggers.)
    1171              :  */
    1172              : static void
    1173         4263 : pgfdw_xact_callback(XactEvent event, void *arg)
    1174              : {
    1175              :     HASH_SEQ_STATUS scan;
    1176              :     ConnCacheEntry *entry;
    1177         4263 :     List       *pending_entries = NIL;
    1178         4263 :     List       *cancel_requested = NIL;
    1179              : 
    1180              :     /* Quick exit if no connections were touched in this transaction. */
    1181         4263 :     if (!xact_got_connection)
    1182         3512 :         return;
    1183              : 
    1184              :     /*
    1185              :      * Scan all connection cache entries to find open remote transactions, and
    1186              :      * close them.
    1187              :      */
    1188          751 :     hash_seq_init(&scan, ConnectionHash);
    1189         3960 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1190              :     {
    1191              :         PGresult   *res;
    1192              : 
    1193              :         /* Ignore cache entry if no open connection right now */
    1194         3210 :         if (entry->conn == NULL)
    1195         1881 :             continue;
    1196              : 
    1197              :         /* If it has an open remote transaction, try to close it */
    1198         1329 :         if (entry->xact_depth > 0)
    1199              :         {
    1200          781 :             elog(DEBUG3, "closing remote transaction on connection %p",
    1201              :                  entry->conn);
    1202              : 
    1203          781 :             switch (event)
    1204              :             {
    1205          721 :                 case XACT_EVENT_PARALLEL_PRE_COMMIT:
    1206              :                 case XACT_EVENT_PRE_COMMIT:
    1207              : 
    1208              :                     /*
    1209              :                      * If abort cleanup previously failed for this connection,
    1210              :                      * we can't issue any more commands against it.
    1211              :                      */
    1212          721 :                     pgfdw_reject_incomplete_xact_state_change(entry);
    1213              : 
    1214              :                     /* Commit all remote transactions during pre-commit */
    1215          721 :                     entry->changing_xact_state = true;
    1216          721 :                     if (entry->parallel_commit)
    1217              :                     {
    1218           16 :                         do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
    1219           16 :                         pending_entries = lappend(pending_entries, entry);
    1220           16 :                         continue;
    1221              :                     }
    1222          705 :                     do_sql_command(entry->conn, "COMMIT TRANSACTION");
    1223          705 :                     entry->changing_xact_state = false;
    1224              : 
    1225              :                     /*
    1226              :                      * If there were any errors in subtransactions, and we
    1227              :                      * made prepared statements, do a DEALLOCATE ALL to make
    1228              :                      * sure we get rid of all prepared statements. This is
    1229              :                      * annoying and not terribly bulletproof, but it's
    1230              :                      * probably not worth trying harder.
    1231              :                      *
    1232              :                      * DEALLOCATE ALL only exists in 8.3 and later, so this
    1233              :                      * constrains how old a server postgres_fdw can
    1234              :                      * communicate with.  We intentionally ignore errors in
    1235              :                      * the DEALLOCATE, so that we can hobble along to some
    1236              :                      * extent with older servers (leaking prepared statements
    1237              :                      * as we go; but we don't really support update operations
    1238              :                      * pre-8.3 anyway).
    1239              :                      */
    1240          705 :                     if (entry->have_prep_stmt && entry->have_error)
    1241              :                     {
    1242            0 :                         res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
    1243              :                                                NULL);
    1244            0 :                         PQclear(res);
    1245              :                     }
    1246          705 :                     entry->have_prep_stmt = false;
    1247          705 :                     entry->have_error = false;
    1248          705 :                     break;
    1249            1 :                 case XACT_EVENT_PRE_PREPARE:
    1250              : 
    1251              :                     /*
    1252              :                      * We disallow any remote transactions, since it's not
    1253              :                      * very reasonable to hold them open until the prepared
    1254              :                      * transaction is committed.  For the moment, throw error
    1255              :                      * unconditionally; later we might allow read-only cases.
    1256              :                      * Note that the error will cause us to come right back
    1257              :                      * here with event == XACT_EVENT_ABORT, so we'll clean up
    1258              :                      * the connection state at that point.
    1259              :                      */
    1260            1 :                     ereport(ERROR,
    1261              :                             (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
    1262              :                              errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
    1263              :                     break;
    1264            0 :                 case XACT_EVENT_PARALLEL_COMMIT:
    1265              :                 case XACT_EVENT_COMMIT:
    1266              :                 case XACT_EVENT_PREPARE:
    1267              :                     /* Pre-commit should have closed the open transaction */
    1268            0 :                     elog(ERROR, "missed cleaning up connection during pre-commit");
    1269              :                     break;
    1270           59 :                 case XACT_EVENT_PARALLEL_ABORT:
    1271              :                 case XACT_EVENT_ABORT:
    1272              :                     /* Rollback all remote transactions during abort */
    1273           59 :                     if (entry->parallel_abort)
    1274              :                     {
    1275            4 :                         if (pgfdw_abort_cleanup_begin(entry, true,
    1276              :                                                       &pending_entries,
    1277              :                                                       &cancel_requested))
    1278            4 :                             continue;
    1279              :                     }
    1280              :                     else
    1281           55 :                         pgfdw_abort_cleanup(entry, true);
    1282           55 :                     break;
    1283              :             }
    1284              :         }
    1285              : 
    1286              :         /* Reset state to show we're out of a transaction */
    1287         1308 :         pgfdw_reset_xact_state(entry, true);
    1288              :     }
    1289              : 
    1290              :     /* If there are any pending connections, finish cleaning them up */
    1291          750 :     if (pending_entries || cancel_requested)
    1292              :     {
    1293           15 :         if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
    1294              :             event == XACT_EVENT_PRE_COMMIT)
    1295              :         {
    1296              :             Assert(cancel_requested == NIL);
    1297           13 :             pgfdw_finish_pre_commit_cleanup(pending_entries);
    1298              :         }
    1299              :         else
    1300              :         {
    1301              :             Assert(event == XACT_EVENT_PARALLEL_ABORT ||
    1302              :                    event == XACT_EVENT_ABORT);
    1303            2 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1304              :                                        true);
    1305              :         }
    1306              :     }
    1307              : 
    1308              :     /*
    1309              :      * Regardless of the event type, we can now mark ourselves as out of the
    1310              :      * transaction.  (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
    1311              :      * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
    1312              :      */
    1313          750 :     xact_got_connection = false;
    1314              : 
    1315              :     /* Also reset cursor numbering for next transaction */
    1316          750 :     cursor_number = 0;
    1317              : 
    1318              :     /* Likewise for read_only_level */
    1319          750 :     read_only_level = 0;
    1320              : }
    1321              : 
    1322              : /*
    1323              :  * pgfdw_subxact_callback --- cleanup at subtransaction end.
    1324              :  */
    1325              : static void
    1326           72 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
    1327              :                        SubTransactionId parentSubid, void *arg)
    1328              : {
    1329              :     HASH_SEQ_STATUS scan;
    1330              :     ConnCacheEntry *entry;
    1331              :     int         curlevel;
    1332           72 :     List       *pending_entries = NIL;
    1333           72 :     List       *cancel_requested = NIL;
    1334              : 
    1335              :     /* Nothing to do at subxact start, nor after commit. */
    1336           72 :     if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
    1337              :           event == SUBXACT_EVENT_ABORT_SUB))
    1338           43 :         return;
    1339              : 
    1340              :     /* Quick exit if no connections were touched in this transaction. */
    1341           29 :     if (!xact_got_connection)
    1342            0 :         return;
    1343              : 
    1344              :     /*
    1345              :      * Scan all connection cache entries to find open remote subtransactions
    1346              :      * of the current level, and close them.
    1347              :      */
    1348           29 :     curlevel = GetCurrentTransactionNestLevel();
    1349           29 :     hash_seq_init(&scan, ConnectionHash);
    1350          242 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1351              :     {
    1352              :         char        sql[100];
    1353              : 
    1354              :         /*
    1355              :          * We only care about connections with open remote subtransactions of
    1356              :          * the current level.
    1357              :          */
    1358          213 :         if (entry->conn == NULL || entry->xact_depth < curlevel)
    1359          198 :             continue;
    1360              : 
    1361           21 :         if (entry->xact_depth > curlevel)
    1362            0 :             elog(ERROR, "missed cleaning up remote subtransaction at level %d",
    1363              :                  entry->xact_depth);
    1364              : 
    1365           21 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1366              :         {
    1367              :             /*
    1368              :              * If abort cleanup previously failed for this connection, we
    1369              :              * can't issue any more commands against it.
    1370              :              */
    1371            7 :             pgfdw_reject_incomplete_xact_state_change(entry);
    1372              : 
    1373              :             /* Commit all remote subtransactions during pre-commit */
    1374            7 :             snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    1375            7 :             entry->changing_xact_state = true;
    1376            7 :             if (entry->parallel_commit)
    1377              :             {
    1378            2 :                 do_sql_command_begin(entry->conn, sql);
    1379            2 :                 pending_entries = lappend(pending_entries, entry);
    1380            2 :                 continue;
    1381              :             }
    1382            5 :             do_sql_command(entry->conn, sql);
    1383            5 :             entry->changing_xact_state = false;
    1384              :         }
    1385              :         else
    1386              :         {
    1387              :             /* Rollback all remote subtransactions during abort */
    1388           14 :             if (entry->parallel_abort)
    1389              :             {
    1390            4 :                 if (pgfdw_abort_cleanup_begin(entry, false,
    1391              :                                               &pending_entries,
    1392              :                                               &cancel_requested))
    1393            4 :                     continue;
    1394              :             }
    1395              :             else
    1396           10 :                 pgfdw_abort_cleanup(entry, false);
    1397              :         }
    1398              : 
    1399              :         /* OK, we're outta that level of subtransaction */
    1400           15 :         pgfdw_reset_xact_state(entry, false);
    1401              :     }
    1402              : 
    1403              :     /* If there are any pending connections, finish cleaning them up */
    1404           29 :     if (pending_entries || cancel_requested)
    1405              :     {
    1406            3 :         if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
    1407              :         {
    1408              :             Assert(cancel_requested == NIL);
    1409            1 :             pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
    1410              :         }
    1411              :         else
    1412              :         {
    1413              :             Assert(event == SUBXACT_EVENT_ABORT_SUB);
    1414            2 :             pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
    1415              :                                        false);
    1416              :         }
    1417              :     }
    1418              : 
    1419              :     /* If in read_only_level, reset it */
    1420           29 :     if (curlevel == read_only_level)
    1421            3 :         read_only_level = 0;
    1422              : }
    1423              : 
    1424              : /*
    1425              :  * Connection invalidation callback function
    1426              :  *
    1427              :  * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
    1428              :  * close connections depending on that entry immediately if current transaction
    1429              :  * has not used those connections yet. Otherwise, mark those connections as
    1430              :  * invalid and then make pgfdw_xact_callback() close them at the end of current
    1431              :  * transaction, since they cannot be closed in the midst of the transaction
    1432              :  * using them. Closed connections will be remade at the next opportunity if
    1433              :  * necessary.
    1434              :  *
    1435              :  * Although most cache invalidation callbacks blow away all the related stuff
    1436              :  * regardless of the given hashvalue, connections are expensive enough that
    1437              :  * it's worth trying to avoid that.
    1438              :  *
    1439              :  * NB: We could avoid unnecessary disconnection more strictly by examining
    1440              :  * individual option values, but it seems too much effort for the gain.
    1441              :  */
    1442              : static void
    1443          188 : pgfdw_inval_callback(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
    1444              : {
    1445              :     HASH_SEQ_STATUS scan;
    1446              :     ConnCacheEntry *entry;
    1447              : 
    1448              :     Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
    1449              : 
    1450              :     /* ConnectionHash must exist already, if we're registered */
    1451          188 :     hash_seq_init(&scan, ConnectionHash);
    1452         1222 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    1453              :     {
    1454              :         /* Ignore invalid entries */
    1455         1034 :         if (entry->conn == NULL)
    1456          838 :             continue;
    1457              : 
    1458              :         /* hashvalue == 0 means a cache reset, must clear all state */
    1459          196 :         if (hashvalue == 0 ||
    1460          140 :             (cacheid == FOREIGNSERVEROID &&
    1461          196 :              entry->server_hashvalue == hashvalue) ||
    1462           56 :             (cacheid == USERMAPPINGOID &&
    1463           56 :              entry->mapping_hashvalue == hashvalue))
    1464              :         {
    1465              :             /*
    1466              :              * Close the connection immediately if it's not used yet in this
    1467              :              * transaction. Otherwise mark it as invalid so that
    1468              :              * pgfdw_xact_callback() can close it at the end of this
    1469              :              * transaction.
    1470              :              */
    1471           59 :             if (entry->xact_depth == 0)
    1472              :             {
    1473           56 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    1474           56 :                 disconnect_pg_server(entry);
    1475              :             }
    1476              :             else
    1477            3 :                 entry->invalidated = true;
    1478              :         }
    1479              :     }
    1480          188 : }
    1481              : 
    1482              : /*
    1483              :  * Raise an error if the given connection cache entry is marked as being
    1484              :  * in the middle of an xact state change.  This should be called at which no
    1485              :  * such change is expected to be in progress; if one is found to be in
    1486              :  * progress, it means that we aborted in the middle of a previous state change
    1487              :  * and now don't know what the remote transaction state actually is.
    1488              :  * Such connections can't safely be further used.  Re-establishing the
    1489              :  * connection would change the snapshot and roll back any writes already
    1490              :  * performed, so that's not an option, either. Thus, we must abort.
    1491              :  *
    1492              :  * Note: there might be open cursors that use the connection, so even if the
    1493              :  * connection cache entry is marked as such, we will retain it until abort
    1494              :  * cleanup of the main transaction, to ensure such open cursors can safely
    1495              :  * refer to the PGconn for the connection.
    1496              :  */
    1497              : static void
    1498         3024 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
    1499              : {
    1500              :     ForeignServer *server;
    1501              : 
    1502              :     /* nothing to do for inactive entries and entries of sane state */
    1503         3024 :     if (entry->conn == NULL || !entry->changing_xact_state)
    1504         3022 :         return;
    1505              : 
    1506              :     /* find server name to be shown in the message below */
    1507            2 :     server = GetForeignServer(entry->serverid);
    1508              : 
    1509            2 :     ereport(ERROR,
    1510              :             (errcode(ERRCODE_CONNECTION_EXCEPTION),
    1511              :              errmsg("connection to server \"%s\" cannot be used due to abort cleanup failure",
    1512              :                     server->servername)));
    1513              : }
    1514              : 
    1515              : /*
    1516              :  * Reset state to show we're out of a (sub)transaction.
    1517              :  */
    1518              : static void
    1519         1349 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
    1520              : {
    1521         1349 :     if (toplevel)
    1522              :     {
    1523              :         /* Reset state to show we're out of a transaction */
    1524         1328 :         entry->xact_depth = 0;
    1525              : 
    1526              :         /* Reset xact r/o state */
    1527         1328 :         entry->xact_read_only = false;
    1528              : 
    1529              :         /*
    1530              :          * If the connection isn't in a good idle state, it is marked as
    1531              :          * invalid or keep_connections option of its server is disabled, then
    1532              :          * discard it to recover. Next GetConnection will open a new
    1533              :          * connection.
    1534              :          */
    1535         2653 :         if (PQstatus(entry->conn) != CONNECTION_OK ||
    1536         1325 :             PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
    1537         1325 :             entry->changing_xact_state ||
    1538         1325 :             entry->invalidated ||
    1539         1323 :             !entry->keep_connections)
    1540              :         {
    1541            6 :             elog(DEBUG3, "discarding connection %p", entry->conn);
    1542            6 :             disconnect_pg_server(entry);
    1543              :         }
    1544              :     }
    1545              :     else
    1546              :     {
    1547              :         /* Reset state to show we're out of a subtransaction */
    1548           21 :         entry->xact_depth--;
    1549              : 
    1550              :         /* If in read_only_level, reset xact r/o state */
    1551           21 :         if (entry->xact_depth + 1 == read_only_level)
    1552            4 :             entry->xact_read_only = false;
    1553              :     }
    1554         1349 : }
    1555              : 
    1556              : /*
    1557              :  * Cancel the currently-in-progress query (whose query text we do not have)
    1558              :  * and ignore the result.  Returns true if we successfully cancel the query
    1559              :  * and discard any pending result, and false if not.
    1560              :  *
    1561              :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1562              :  * recursion trouble, we'll end up slamming the connection shut, which will
    1563              :  * necessitate failing the entire toplevel transaction even if subtransactions
    1564              :  * were used.  Try to use WARNING where we can.
    1565              :  *
    1566              :  * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
    1567              :  * query text from the pendingAreq saved in the per-connection state, then
    1568              :  * report the query using it.
    1569              :  */
    1570              : static bool
    1571            2 : pgfdw_cancel_query(PGconn *conn)
    1572              : {
    1573            2 :     TimestampTz now = GetCurrentTimestamp();
    1574              :     TimestampTz endtime;
    1575              :     TimestampTz retrycanceltime;
    1576              : 
    1577              :     /*
    1578              :      * If it takes too long to cancel the query and discard the result, assume
    1579              :      * the connection is dead.
    1580              :      */
    1581            2 :     endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
    1582              : 
    1583              :     /*
    1584              :      * Also, lose patience and re-issue the cancel request after a little bit.
    1585              :      * (This serves to close some race conditions.)
    1586              :      */
    1587            2 :     retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
    1588              : 
    1589            2 :     if (!pgfdw_cancel_query_begin(conn, endtime))
    1590            0 :         return false;
    1591            2 :     return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
    1592              : }
    1593              : 
    1594              : /*
    1595              :  * Submit a cancel request to the given connection, waiting only until
    1596              :  * the given time.
    1597              :  *
    1598              :  * We sleep interruptibly until we receive confirmation that the cancel
    1599              :  * request has been accepted, and if it is, return true; if the timeout
    1600              :  * lapses without that, or the request fails for whatever reason, return
    1601              :  * false.
    1602              :  */
    1603              : static bool
    1604            2 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
    1605              : {
    1606            2 :     const char *errormsg = libpqsrv_cancel(conn, endtime);
    1607              : 
    1608            2 :     if (errormsg != NULL)
    1609            0 :         ereport(WARNING,
    1610              :                 errcode(ERRCODE_CONNECTION_FAILURE),
    1611              :                 errmsg("could not send cancel request: %s", errormsg));
    1612              : 
    1613            2 :     return errormsg == NULL;
    1614              : }
    1615              : 
    1616              : static bool
    1617            2 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
    1618              :                        TimestampTz retrycanceltime, bool consume_input)
    1619              : {
    1620              :     PGresult   *result;
    1621              :     bool        timed_out;
    1622              : 
    1623              :     /*
    1624              :      * If requested, consume whatever data is available from the socket. (Note
    1625              :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1626              :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1627              :      * which would be large compared to the overhead of PQconsumeInput.)
    1628              :      */
    1629            2 :     if (consume_input && !PQconsumeInput(conn))
    1630              :     {
    1631            0 :         ereport(WARNING,
    1632              :                 (errcode(ERRCODE_CONNECTION_FAILURE),
    1633              :                  errmsg("could not get result of cancel request: %s",
    1634              :                         pchomp(PQerrorMessage(conn)))));
    1635            0 :         return false;
    1636              :     }
    1637              : 
    1638              :     /* Get and discard the result of the query. */
    1639            2 :     if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
    1640              :                                  &result, &timed_out))
    1641              :     {
    1642            0 :         if (timed_out)
    1643            0 :             ereport(WARNING,
    1644              :                     (errmsg("could not get result of cancel request due to timeout")));
    1645              :         else
    1646            0 :             ereport(WARNING,
    1647              :                     (errcode(ERRCODE_CONNECTION_FAILURE),
    1648              :                      errmsg("could not get result of cancel request: %s",
    1649              :                             pchomp(PQerrorMessage(conn)))));
    1650              : 
    1651            0 :         return false;
    1652              :     }
    1653            2 :     PQclear(result);
    1654              : 
    1655            2 :     return true;
    1656              : }
    1657              : 
    1658              : /*
    1659              :  * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
    1660              :  * result.  If the query is executed without error, the return value is true.
    1661              :  * If the query is executed successfully but returns an error, the return
    1662              :  * value is true if and only if ignore_errors is set.  If the query can't be
    1663              :  * sent or times out, the return value is false.
    1664              :  *
    1665              :  * It's not a huge problem if we throw an ERROR here, but if we get into error
    1666              :  * recursion trouble, we'll end up slamming the connection shut, which will
    1667              :  * necessitate failing the entire toplevel transaction even if subtransactions
    1668              :  * were used.  Try to use WARNING where we can.
    1669              :  */
    1670              : static bool
    1671           87 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
    1672              : {
    1673              :     TimestampTz endtime;
    1674              : 
    1675              :     /*
    1676              :      * If it takes too long to execute a cleanup query, assume the connection
    1677              :      * is dead.  It's fairly likely that this is why we aborted in the first
    1678              :      * place (e.g. statement timeout, user cancel), so the timeout shouldn't
    1679              :      * be too long.
    1680              :      */
    1681           87 :     endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1682              :                                           CONNECTION_CLEANUP_TIMEOUT);
    1683              : 
    1684           87 :     if (!pgfdw_exec_cleanup_query_begin(conn, query))
    1685            0 :         return false;
    1686           87 :     return pgfdw_exec_cleanup_query_end(conn, query, endtime,
    1687              :                                         false, ignore_errors);
    1688              : }
    1689              : 
    1690              : static bool
    1691           99 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
    1692              : {
    1693              :     Assert(query != NULL);
    1694              : 
    1695              :     /*
    1696              :      * Submit a query.  Since we don't use non-blocking mode, this also can
    1697              :      * block.  But its risk is relatively small, so we ignore that for now.
    1698              :      */
    1699           99 :     if (!PQsendQuery(conn, query))
    1700              :     {
    1701            0 :         pgfdw_report(WARNING, NULL, conn, query);
    1702            0 :         return false;
    1703              :     }
    1704              : 
    1705           99 :     return true;
    1706              : }
    1707              : 
    1708              : static bool
    1709           99 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
    1710              :                              TimestampTz endtime, bool consume_input,
    1711              :                              bool ignore_errors)
    1712              : {
    1713              :     PGresult   *result;
    1714              :     bool        timed_out;
    1715              : 
    1716              :     Assert(query != NULL);
    1717              : 
    1718              :     /*
    1719              :      * If requested, consume whatever data is available from the socket. (Note
    1720              :      * that if all data is available, this allows pgfdw_get_cleanup_result to
    1721              :      * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
    1722              :      * which would be large compared to the overhead of PQconsumeInput.)
    1723              :      */
    1724           99 :     if (consume_input && !PQconsumeInput(conn))
    1725              :     {
    1726            0 :         pgfdw_report(WARNING, NULL, conn, query);
    1727            0 :         return false;
    1728              :     }
    1729              : 
    1730              :     /* Get the result of the query. */
    1731           99 :     if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
    1732              :     {
    1733            2 :         if (timed_out)
    1734            0 :             ereport(WARNING,
    1735              :                     (errmsg("could not get query result due to timeout"),
    1736              :                      errcontext("remote SQL command: %s", query)));
    1737              :         else
    1738            2 :             pgfdw_report(WARNING, NULL, conn, query);
    1739              : 
    1740            2 :         return false;
    1741              :     }
    1742              : 
    1743              :     /* Issue a warning if not successful. */
    1744           97 :     if (PQresultStatus(result) != PGRES_COMMAND_OK)
    1745              :     {
    1746            0 :         pgfdw_report(WARNING, result, conn, query);
    1747            0 :         return ignore_errors;
    1748              :     }
    1749           97 :     PQclear(result);
    1750              : 
    1751           97 :     return true;
    1752              : }
    1753              : 
    1754              : /*
    1755              :  * Get, during abort cleanup, the result of a query that is in progress.
    1756              :  * This might be a query that is being interrupted by a cancel request or by
    1757              :  * transaction abort, or it might be a query that was initiated as part of
    1758              :  * transaction abort to get the remote side back to the appropriate state.
    1759              :  *
    1760              :  * endtime is the time at which we should give up and assume the remote side
    1761              :  * is dead.  retrycanceltime is the time at which we should issue a fresh
    1762              :  * cancel request (pass the same value as endtime if this is not wanted).
    1763              :  *
    1764              :  * Returns true if the timeout expired or connection trouble occurred,
    1765              :  * false otherwise.  Sets *result except in case of a true result.
    1766              :  * Sets *timed_out to true only when the timeout expired.
    1767              :  */
    1768              : static bool
    1769          101 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
    1770              :                          TimestampTz retrycanceltime,
    1771              :                          PGresult **result,
    1772              :                          bool *timed_out)
    1773              : {
    1774          101 :     bool        failed = false;
    1775          101 :     PGresult   *last_res = NULL;
    1776          101 :     int         canceldelta = RETRY_CANCEL_TIMEOUT * 2;
    1777              : 
    1778          101 :     *result = NULL;
    1779          101 :     *timed_out = false;
    1780              :     for (;;)
    1781          113 :     {
    1782              :         PGresult   *res;
    1783              : 
    1784          308 :         while (PQisBusy(conn))
    1785              :         {
    1786              :             int         wc;
    1787           96 :             TimestampTz now = GetCurrentTimestamp();
    1788              :             long        cur_timeout;
    1789              : 
    1790              :             /* If timeout has expired, give up. */
    1791           96 :             if (now >= endtime)
    1792              :             {
    1793            0 :                 *timed_out = true;
    1794            0 :                 failed = true;
    1795            0 :                 goto exit;
    1796              :             }
    1797              : 
    1798              :             /* If we need to re-issue the cancel request, do that. */
    1799           96 :             if (now >= retrycanceltime)
    1800              :             {
    1801              :                 /* We ignore failure to issue the repeated request. */
    1802            0 :                 (void) libpqsrv_cancel(conn, endtime);
    1803              : 
    1804              :                 /* Recompute "now" in case that took measurable time. */
    1805            0 :                 now = GetCurrentTimestamp();
    1806              : 
    1807              :                 /* Adjust re-cancel timeout in increasing steps. */
    1808            0 :                 retrycanceltime = TimestampTzPlusMilliseconds(now,
    1809              :                                                               canceldelta);
    1810            0 :                 canceldelta += canceldelta;
    1811              :             }
    1812              : 
    1813              :             /* If timeout has expired, give up, else get sleep time. */
    1814           96 :             cur_timeout = TimestampDifferenceMilliseconds(now,
    1815              :                                                           Min(endtime,
    1816              :                                                               retrycanceltime));
    1817           96 :             if (cur_timeout <= 0)
    1818              :             {
    1819            0 :                 *timed_out = true;
    1820            0 :                 failed = true;
    1821            0 :                 goto exit;
    1822              :             }
    1823              : 
    1824              :             /* first time, allocate or get the custom wait event */
    1825           96 :             if (pgfdw_we_cleanup_result == 0)
    1826            2 :                 pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
    1827              : 
    1828              :             /* Sleep until there's something to do */
    1829           96 :             wc = WaitLatchOrSocket(MyLatch,
    1830              :                                    WL_LATCH_SET | WL_SOCKET_READABLE |
    1831              :                                    WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
    1832              :                                    PQsocket(conn),
    1833              :                                    cur_timeout, pgfdw_we_cleanup_result);
    1834           96 :             ResetLatch(MyLatch);
    1835              : 
    1836           96 :             CHECK_FOR_INTERRUPTS();
    1837              : 
    1838              :             /* Data available in socket? */
    1839           96 :             if (wc & WL_SOCKET_READABLE)
    1840              :             {
    1841           96 :                 if (!PQconsumeInput(conn))
    1842              :                 {
    1843              :                     /* connection trouble */
    1844            2 :                     failed = true;
    1845            2 :                     goto exit;
    1846              :                 }
    1847              :             }
    1848              :         }
    1849              : 
    1850          212 :         res = PQgetResult(conn);
    1851          212 :         if (res == NULL)
    1852           99 :             break;              /* query is complete */
    1853              : 
    1854          113 :         PQclear(last_res);
    1855          113 :         last_res = res;
    1856              :     }
    1857          101 : exit:
    1858          101 :     if (failed)
    1859            2 :         PQclear(last_res);
    1860              :     else
    1861           99 :         *result = last_res;
    1862          101 :     return failed;
    1863              : }
    1864              : 
    1865              : /*
    1866              :  * Abort remote transaction or subtransaction.
    1867              :  *
    1868              :  * "toplevel" should be set to true if toplevel (main) transaction is
    1869              :  * rollbacked, false otherwise.
    1870              :  *
    1871              :  * Set entry->changing_xact_state to false on success, true on failure.
    1872              :  */
    1873              : static void
    1874           65 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
    1875              : {
    1876              :     char        sql[100];
    1877              : 
    1878              :     /*
    1879              :      * Don't try to clean up the connection if we're already in error
    1880              :      * recursion trouble.
    1881              :      */
    1882           65 :     if (in_error_recursion_trouble())
    1883            0 :         entry->changing_xact_state = true;
    1884              : 
    1885              :     /*
    1886              :      * If connection is already unsalvageable, don't touch it further.
    1887              :      */
    1888           65 :     if (entry->changing_xact_state)
    1889            5 :         return;
    1890              : 
    1891              :     /*
    1892              :      * Mark this connection as in the process of changing transaction state.
    1893              :      */
    1894           62 :     entry->changing_xact_state = true;
    1895              : 
    1896              :     /* Assume we might have lost track of prepared statements */
    1897           62 :     entry->have_error = true;
    1898              : 
    1899              :     /*
    1900              :      * If a command has been submitted to the remote server by using an
    1901              :      * asynchronous execution function, the command might not have yet
    1902              :      * completed.  Check to see if a command is still being processed by the
    1903              :      * remote server, and if so, request cancellation of the command.
    1904              :      */
    1905           62 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
    1906            2 :         !pgfdw_cancel_query(entry->conn))
    1907            0 :         return;                 /* Unable to cancel running query */
    1908              : 
    1909           62 :     CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1910           62 :     if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
    1911            2 :         return;                 /* Unable to abort remote (sub)transaction */
    1912              : 
    1913           60 :     if (toplevel)
    1914              :     {
    1915           52 :         if (entry->have_prep_stmt && entry->have_error &&
    1916           25 :             !pgfdw_exec_cleanup_query(entry->conn,
    1917              :                                       "DEALLOCATE ALL",
    1918              :                                       true))
    1919            0 :             return;             /* Trouble clearing prepared statements */
    1920              : 
    1921           52 :         entry->have_prep_stmt = false;
    1922           52 :         entry->have_error = false;
    1923              :     }
    1924              : 
    1925              :     /*
    1926              :      * If pendingAreq of the per-connection state is not NULL, it means that
    1927              :      * an asynchronous fetch begun by fetch_more_data_begin() was not done
    1928              :      * successfully and thus the per-connection state was not reset in
    1929              :      * fetch_more_data(); in that case reset the per-connection state here.
    1930              :      */
    1931           60 :     if (entry->state.pendingAreq)
    1932            1 :         memset(&entry->state, 0, sizeof(entry->state));
    1933              : 
    1934              :     /* Disarm changing_xact_state if it all worked */
    1935           60 :     entry->changing_xact_state = false;
    1936              : }
    1937              : 
    1938              : /*
    1939              :  * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
    1940              :  * don't wait for the result.
    1941              :  *
    1942              :  * Returns true if the abort command or cancel request is successfully issued,
    1943              :  * false otherwise.  If the abort command is successfully issued, the given
    1944              :  * connection cache entry is appended to *pending_entries.  Otherwise, if the
    1945              :  * cancel request is successfully issued, it is appended to *cancel_requested.
    1946              :  */
    1947              : static bool
    1948            8 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
    1949              :                           List **pending_entries, List **cancel_requested)
    1950              : {
    1951              :     /*
    1952              :      * Don't try to clean up the connection if we're already in error
    1953              :      * recursion trouble.
    1954              :      */
    1955            8 :     if (in_error_recursion_trouble())
    1956            0 :         entry->changing_xact_state = true;
    1957              : 
    1958              :     /*
    1959              :      * If connection is already unsalvageable, don't touch it further.
    1960              :      */
    1961            8 :     if (entry->changing_xact_state)
    1962            0 :         return false;
    1963              : 
    1964              :     /*
    1965              :      * Mark this connection as in the process of changing transaction state.
    1966              :      */
    1967            8 :     entry->changing_xact_state = true;
    1968              : 
    1969              :     /* Assume we might have lost track of prepared statements */
    1970            8 :     entry->have_error = true;
    1971              : 
    1972              :     /*
    1973              :      * If a command has been submitted to the remote server by using an
    1974              :      * asynchronous execution function, the command might not have yet
    1975              :      * completed.  Check to see if a command is still being processed by the
    1976              :      * remote server, and if so, request cancellation of the command.
    1977              :      */
    1978            8 :     if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
    1979              :     {
    1980              :         TimestampTz endtime;
    1981              : 
    1982            0 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    1983              :                                               CONNECTION_CLEANUP_TIMEOUT);
    1984            0 :         if (!pgfdw_cancel_query_begin(entry->conn, endtime))
    1985            0 :             return false;       /* Unable to cancel running query */
    1986            0 :         *cancel_requested = lappend(*cancel_requested, entry);
    1987              :     }
    1988              :     else
    1989              :     {
    1990              :         char        sql[100];
    1991              : 
    1992            8 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    1993            8 :         if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    1994            0 :             return false;       /* Unable to abort remote transaction */
    1995            8 :         *pending_entries = lappend(*pending_entries, entry);
    1996              :     }
    1997              : 
    1998            8 :     return true;
    1999              : }
    2000              : 
    2001              : /*
    2002              :  * Finish pre-commit cleanup of connections on each of which we've sent a
    2003              :  * COMMIT command to the remote server.
    2004              :  */
    2005              : static void
    2006           13 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
    2007              : {
    2008              :     ConnCacheEntry *entry;
    2009           13 :     List       *pending_deallocs = NIL;
    2010              :     ListCell   *lc;
    2011              : 
    2012              :     Assert(pending_entries);
    2013              : 
    2014              :     /*
    2015              :      * Get the result of the COMMIT command for each of the pending entries
    2016              :      */
    2017           29 :     foreach(lc, pending_entries)
    2018              :     {
    2019           16 :         entry = (ConnCacheEntry *) lfirst(lc);
    2020              : 
    2021              :         Assert(entry->changing_xact_state);
    2022              : 
    2023              :         /*
    2024              :          * We might already have received the result on the socket, so pass
    2025              :          * consume_input=true to try to consume it first
    2026              :          */
    2027           16 :         do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
    2028           16 :         entry->changing_xact_state = false;
    2029              : 
    2030              :         /* Do a DEALLOCATE ALL in parallel if needed */
    2031           16 :         if (entry->have_prep_stmt && entry->have_error)
    2032              :         {
    2033              :             /* Ignore errors (see notes in pgfdw_xact_callback) */
    2034            2 :             if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
    2035              :             {
    2036            2 :                 pending_deallocs = lappend(pending_deallocs, entry);
    2037            2 :                 continue;
    2038              :             }
    2039              :         }
    2040           14 :         entry->have_prep_stmt = false;
    2041           14 :         entry->have_error = false;
    2042              : 
    2043           14 :         pgfdw_reset_xact_state(entry, true);
    2044              :     }
    2045              : 
    2046              :     /* No further work if no pending entries */
    2047           13 :     if (!pending_deallocs)
    2048           12 :         return;
    2049              : 
    2050              :     /*
    2051              :      * Get the result of the DEALLOCATE command for each of the pending
    2052              :      * entries
    2053              :      */
    2054            3 :     foreach(lc, pending_deallocs)
    2055              :     {
    2056              :         PGresult   *res;
    2057              : 
    2058            2 :         entry = (ConnCacheEntry *) lfirst(lc);
    2059              : 
    2060              :         /* Ignore errors (see notes in pgfdw_xact_callback) */
    2061            4 :         while ((res = PQgetResult(entry->conn)) != NULL)
    2062              :         {
    2063            2 :             PQclear(res);
    2064              :             /* Stop if the connection is lost (else we'll loop infinitely) */
    2065            2 :             if (PQstatus(entry->conn) == CONNECTION_BAD)
    2066            0 :                 break;
    2067              :         }
    2068            2 :         entry->have_prep_stmt = false;
    2069            2 :         entry->have_error = false;
    2070              : 
    2071            2 :         pgfdw_reset_xact_state(entry, true);
    2072              :     }
    2073              : }
    2074              : 
    2075              : /*
    2076              :  * Finish pre-subcommit cleanup of connections on each of which we've sent a
    2077              :  * RELEASE command to the remote server.
    2078              :  */
    2079              : static void
    2080            1 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
    2081              : {
    2082              :     ConnCacheEntry *entry;
    2083              :     char        sql[100];
    2084              :     ListCell   *lc;
    2085              : 
    2086              :     Assert(pending_entries);
    2087              : 
    2088              :     /*
    2089              :      * Get the result of the RELEASE command for each of the pending entries
    2090              :      */
    2091            1 :     snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
    2092            3 :     foreach(lc, pending_entries)
    2093              :     {
    2094            2 :         entry = (ConnCacheEntry *) lfirst(lc);
    2095              : 
    2096              :         Assert(entry->changing_xact_state);
    2097              : 
    2098              :         /*
    2099              :          * We might already have received the result on the socket, so pass
    2100              :          * consume_input=true to try to consume it first
    2101              :          */
    2102            2 :         do_sql_command_end(entry->conn, sql, true);
    2103            2 :         entry->changing_xact_state = false;
    2104              : 
    2105            2 :         pgfdw_reset_xact_state(entry, false);
    2106              :     }
    2107            1 : }
    2108              : 
    2109              : /*
    2110              :  * Finish abort cleanup of connections on each of which we've sent an abort
    2111              :  * command or cancel request to the remote server.
    2112              :  */
    2113              : static void
    2114            4 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
    2115              :                            bool toplevel)
    2116              : {
    2117            4 :     List       *pending_deallocs = NIL;
    2118              :     ListCell   *lc;
    2119              : 
    2120              :     /*
    2121              :      * For each of the pending cancel requests (if any), get and discard the
    2122              :      * result of the query, and submit an abort command to the remote server.
    2123              :      */
    2124            4 :     if (cancel_requested)
    2125              :     {
    2126            0 :         foreach(lc, cancel_requested)
    2127              :         {
    2128            0 :             ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2129            0 :             TimestampTz now = GetCurrentTimestamp();
    2130              :             TimestampTz endtime;
    2131              :             TimestampTz retrycanceltime;
    2132              :             char        sql[100];
    2133              : 
    2134              :             Assert(entry->changing_xact_state);
    2135              : 
    2136              :             /*
    2137              :              * Set end time.  You might think we should do this before issuing
    2138              :              * cancel request like in normal mode, but that is problematic,
    2139              :              * because if, for example, it took longer than 30 seconds to
    2140              :              * process the first few entries in the cancel_requested list, it
    2141              :              * would cause a timeout error when processing each of the
    2142              :              * remaining entries in the list, leading to slamming that entry's
    2143              :              * connection shut.
    2144              :              */
    2145            0 :             endtime = TimestampTzPlusMilliseconds(now,
    2146              :                                                   CONNECTION_CLEANUP_TIMEOUT);
    2147            0 :             retrycanceltime = TimestampTzPlusMilliseconds(now,
    2148              :                                                           RETRY_CANCEL_TIMEOUT);
    2149              : 
    2150            0 :             if (!pgfdw_cancel_query_end(entry->conn, endtime,
    2151              :                                         retrycanceltime, true))
    2152              :             {
    2153              :                 /* Unable to cancel running query */
    2154            0 :                 pgfdw_reset_xact_state(entry, toplevel);
    2155            0 :                 continue;
    2156              :             }
    2157              : 
    2158              :             /* Send an abort command in parallel if needed */
    2159            0 :             CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    2160            0 :             if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
    2161              :             {
    2162              :                 /* Unable to abort remote (sub)transaction */
    2163            0 :                 pgfdw_reset_xact_state(entry, toplevel);
    2164              :             }
    2165              :             else
    2166            0 :                 pending_entries = lappend(pending_entries, entry);
    2167              :         }
    2168              :     }
    2169              : 
    2170              :     /* No further work if no pending entries */
    2171            4 :     if (!pending_entries)
    2172            0 :         return;
    2173              : 
    2174              :     /*
    2175              :      * Get the result of the abort command for each of the pending entries
    2176              :      */
    2177           12 :     foreach(lc, pending_entries)
    2178              :     {
    2179            8 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2180              :         TimestampTz endtime;
    2181              :         char        sql[100];
    2182              : 
    2183              :         Assert(entry->changing_xact_state);
    2184              : 
    2185              :         /*
    2186              :          * Set end time.  We do this now, not before issuing the command like
    2187              :          * in normal mode, for the same reason as for the cancel_requested
    2188              :          * entries.
    2189              :          */
    2190            8 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2191              :                                               CONNECTION_CLEANUP_TIMEOUT);
    2192              : 
    2193            8 :         CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
    2194            8 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
    2195              :                                           true, false))
    2196              :         {
    2197              :             /* Unable to abort remote (sub)transaction */
    2198            0 :             pgfdw_reset_xact_state(entry, toplevel);
    2199            4 :             continue;
    2200              :         }
    2201              : 
    2202            8 :         if (toplevel)
    2203              :         {
    2204              :             /* Do a DEALLOCATE ALL in parallel if needed */
    2205            4 :             if (entry->have_prep_stmt && entry->have_error)
    2206              :             {
    2207            4 :                 if (!pgfdw_exec_cleanup_query_begin(entry->conn,
    2208              :                                                     "DEALLOCATE ALL"))
    2209              :                 {
    2210              :                     /* Trouble clearing prepared statements */
    2211            0 :                     pgfdw_reset_xact_state(entry, toplevel);
    2212              :                 }
    2213              :                 else
    2214            4 :                     pending_deallocs = lappend(pending_deallocs, entry);
    2215            4 :                 continue;
    2216              :             }
    2217            0 :             entry->have_prep_stmt = false;
    2218            0 :             entry->have_error = false;
    2219              :         }
    2220              : 
    2221              :         /* Reset the per-connection state if needed */
    2222            4 :         if (entry->state.pendingAreq)
    2223            0 :             memset(&entry->state, 0, sizeof(entry->state));
    2224              : 
    2225              :         /* We're done with this entry; unset the changing_xact_state flag */
    2226            4 :         entry->changing_xact_state = false;
    2227            4 :         pgfdw_reset_xact_state(entry, toplevel);
    2228              :     }
    2229              : 
    2230              :     /* No further work if no pending entries */
    2231            4 :     if (!pending_deallocs)
    2232            2 :         return;
    2233              :     Assert(toplevel);
    2234              : 
    2235              :     /*
    2236              :      * Get the result of the DEALLOCATE command for each of the pending
    2237              :      * entries
    2238              :      */
    2239            6 :     foreach(lc, pending_deallocs)
    2240              :     {
    2241            4 :         ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
    2242              :         TimestampTz endtime;
    2243              : 
    2244              :         Assert(entry->changing_xact_state);
    2245              :         Assert(entry->have_prep_stmt);
    2246              :         Assert(entry->have_error);
    2247              : 
    2248              :         /*
    2249              :          * Set end time.  We do this now, not before issuing the command like
    2250              :          * in normal mode, for the same reason as for the cancel_requested
    2251              :          * entries.
    2252              :          */
    2253            4 :         endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
    2254              :                                               CONNECTION_CLEANUP_TIMEOUT);
    2255              : 
    2256            4 :         if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
    2257              :                                           endtime, true, true))
    2258              :         {
    2259              :             /* Trouble clearing prepared statements */
    2260            0 :             pgfdw_reset_xact_state(entry, toplevel);
    2261            0 :             continue;
    2262              :         }
    2263            4 :         entry->have_prep_stmt = false;
    2264            4 :         entry->have_error = false;
    2265              : 
    2266              :         /* Reset the per-connection state if needed */
    2267            4 :         if (entry->state.pendingAreq)
    2268            0 :             memset(&entry->state, 0, sizeof(entry->state));
    2269              : 
    2270              :         /* We're done with this entry; unset the changing_xact_state flag */
    2271            4 :         entry->changing_xact_state = false;
    2272            4 :         pgfdw_reset_xact_state(entry, toplevel);
    2273              :     }
    2274              : }
    2275              : 
    2276              : /* Number of output arguments (columns) for various API versions */
    2277              : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1  2
    2278              : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2  6
    2279              : #define POSTGRES_FDW_GET_CONNECTIONS_COLS   6   /* maximum of above */
    2280              : 
    2281              : /*
    2282              :  * Internal function used by postgres_fdw_get_connections variants.
    2283              :  *
    2284              :  * For API version 1.1, this function takes no input parameter and
    2285              :  * returns a set of records with the following values:
    2286              :  *
    2287              :  * - server_name - server name of active connection. In case the foreign server
    2288              :  *   is dropped but still the connection is active, then the server name will
    2289              :  *   be NULL in output.
    2290              :  * - valid - true/false representing whether the connection is valid or not.
    2291              :  *   Note that connections can become invalid in pgfdw_inval_callback.
    2292              :  *
    2293              :  * For API version 1.2 and later, this function takes an input parameter
    2294              :  * to check a connection status and returns the following
    2295              :  * additional values along with the four values from version 1.1:
    2296              :  *
    2297              :  * - user_name - the local user name of the active connection. In case the
    2298              :  *   user mapping is dropped but the connection is still active, then the
    2299              :  *   user name will be NULL in the output.
    2300              :  * - used_in_xact - true if the connection is used in the current transaction.
    2301              :  * - closed - true if the connection is closed.
    2302              :  * - remote_backend_pid - process ID of the remote backend, on the foreign
    2303              :  *   server, handling the connection.
    2304              :  *
    2305              :  * No records are returned when there are no cached connections at all.
    2306              :  */
    2307              : static void
    2308           13 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
    2309              :                                       enum pgfdwVersion api_version)
    2310              : {
    2311           13 :     ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
    2312              :     HASH_SEQ_STATUS scan;
    2313              :     ConnCacheEntry *entry;
    2314              : 
    2315           13 :     InitMaterializedSRF(fcinfo, 0);
    2316              : 
    2317              :     /* If cache doesn't exist, we return no records */
    2318           13 :     if (!ConnectionHash)
    2319            0 :         return;
    2320              : 
    2321              :     /* Check we have the expected number of output arguments */
    2322           13 :     switch (rsinfo->setDesc->natts)
    2323              :     {
    2324            0 :         case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
    2325            0 :             if (api_version != PGFDW_V1_1)
    2326            0 :                 elog(ERROR, "incorrect number of output arguments");
    2327            0 :             break;
    2328           13 :         case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
    2329           13 :             if (api_version != PGFDW_V1_2)
    2330            0 :                 elog(ERROR, "incorrect number of output arguments");
    2331           13 :             break;
    2332            0 :         default:
    2333            0 :             elog(ERROR, "incorrect number of output arguments");
    2334              :     }
    2335              : 
    2336           13 :     hash_seq_init(&scan, ConnectionHash);
    2337          113 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2338              :     {
    2339              :         ForeignServer *server;
    2340          100 :         Datum       values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2341          100 :         bool        nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
    2342          100 :         int         i = 0;
    2343              : 
    2344              :         /* We only look for open remote connections */
    2345          100 :         if (!entry->conn)
    2346           87 :             continue;
    2347              : 
    2348           13 :         server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
    2349              : 
    2350              :         /*
    2351              :          * The foreign server may have been dropped in current explicit
    2352              :          * transaction. It is not possible to drop the server from another
    2353              :          * session when the connection associated with it is in use in the
    2354              :          * current transaction, if tried so, the drop query in another session
    2355              :          * blocks until the current transaction finishes.
    2356              :          *
    2357              :          * Even though the server is dropped in the current transaction, the
    2358              :          * cache can still have associated active connection entry, say we
    2359              :          * call such connections dangling. Since we can not fetch the server
    2360              :          * name from system catalogs for dangling connections, instead we show
    2361              :          * NULL value for server name in output.
    2362              :          *
    2363              :          * We could have done better by storing the server name in the cache
    2364              :          * entry instead of server oid so that it could be used in the output.
    2365              :          * But the server name in each cache entry requires 64 bytes of
    2366              :          * memory, which is huge, when there are many cached connections and
    2367              :          * the use case i.e. dropping the foreign server within the explicit
    2368              :          * current transaction seems rare. So, we chose to show NULL value for
    2369              :          * server name in output.
    2370              :          *
    2371              :          * Such dangling connections get closed either in next use or at the
    2372              :          * end of current explicit transaction in pgfdw_xact_callback.
    2373              :          */
    2374           13 :         if (!server)
    2375              :         {
    2376              :             /*
    2377              :              * If the server has been dropped in the current explicit
    2378              :              * transaction, then this entry would have been invalidated in
    2379              :              * pgfdw_inval_callback at the end of drop server command. Note
    2380              :              * that this connection would not have been closed in
    2381              :              * pgfdw_inval_callback because it is still being used in the
    2382              :              * current explicit transaction. So, assert that here.
    2383              :              */
    2384              :             Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
    2385              : 
    2386              :             /* Show null, if no server name was found */
    2387            1 :             nulls[i++] = true;
    2388              :         }
    2389              :         else
    2390           12 :             values[i++] = CStringGetTextDatum(server->servername);
    2391              : 
    2392           13 :         if (api_version >= PGFDW_V1_2)
    2393              :         {
    2394              :             HeapTuple   tp;
    2395              : 
    2396              :             /* Use the system cache to obtain the user mapping */
    2397           13 :             tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
    2398              : 
    2399              :             /*
    2400              :              * Just like in the foreign server case, user mappings can also be
    2401              :              * dropped in the current explicit transaction. Therefore, the
    2402              :              * similar check as in the server case is required.
    2403              :              */
    2404           13 :             if (!HeapTupleIsValid(tp))
    2405              :             {
    2406              :                 /*
    2407              :                  * If we reach here, this entry must have been invalidated in
    2408              :                  * pgfdw_inval_callback, same as in the server case.
    2409              :                  */
    2410              :                 Assert(entry->conn && entry->xact_depth > 0 &&
    2411              :                        entry->invalidated);
    2412              : 
    2413            1 :                 nulls[i++] = true;
    2414              :             }
    2415              :             else
    2416              :             {
    2417              :                 Oid         userid;
    2418              : 
    2419           12 :                 userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
    2420           12 :                 values[i++] = CStringGetTextDatum(MappingUserName(userid));
    2421           12 :                 ReleaseSysCache(tp);
    2422              :             }
    2423              :         }
    2424              : 
    2425           13 :         values[i++] = BoolGetDatum(!entry->invalidated);
    2426              : 
    2427           13 :         if (api_version >= PGFDW_V1_2)
    2428              :         {
    2429           13 :             bool        check_conn = PG_GETARG_BOOL(0);
    2430              : 
    2431              :             /* Is this connection used in the current transaction? */
    2432           13 :             values[i++] = BoolGetDatum(entry->xact_depth > 0);
    2433              : 
    2434              :             /*
    2435              :              * If a connection status check is requested and supported, return
    2436              :              * whether the connection is closed. Otherwise, return NULL.
    2437              :              */
    2438           13 :             if (check_conn && pgfdw_conn_checkable())
    2439            2 :                 values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
    2440              :             else
    2441           11 :                 nulls[i++] = true;
    2442              : 
    2443              :             /* Return process ID of remote backend */
    2444           13 :             values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
    2445              :         }
    2446              : 
    2447           13 :         tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
    2448              :     }
    2449              : }
    2450              : 
    2451              : /*
    2452              :  * Values in connection strings must be enclosed in single quotes. Single
    2453              :  * quotes and backslashes must be escaped with backslash. NB: these rules are
    2454              :  * different from the rules for escaping a SQL literal.
    2455              :  */
    2456              : static void
    2457           65 : appendEscapedValue(StringInfo str, const char *val)
    2458              : {
    2459           65 :     appendStringInfoChar(str, '\'');
    2460          548 :     for (int i = 0; val[i] != '\0'; i++)
    2461              :     {
    2462          483 :         if (val[i] == '\\' || val[i] == '\'')
    2463            0 :             appendStringInfoChar(str, '\\');
    2464          483 :         appendStringInfoChar(str, val[i]);
    2465              :     }
    2466           65 :     appendStringInfoChar(str, '\'');
    2467           65 : }
    2468              : 
    2469              : Datum
    2470            9 : postgres_fdw_connection(PG_FUNCTION_ARGS)
    2471              : {
    2472            9 :     Oid         userid = PG_GETARG_OID(0);
    2473            9 :     Oid         serverid = PG_GETARG_OID(1);
    2474            9 :     ForeignServer *server = GetForeignServer(serverid);
    2475            9 :     UserMapping *user = GetUserMapping(userid, serverid);
    2476              :     StringInfoData str;
    2477              :     const char **keywords;
    2478              :     const char **values;
    2479              :     char       *appname;
    2480            9 :     char       *sep = "";
    2481              : 
    2482            9 :     construct_connection_params(server, user, &keywords, &values, &appname);
    2483              : 
    2484            9 :     initStringInfo(&str);
    2485           74 :     for (int i = 0; keywords[i] != NULL; i++)
    2486              :     {
    2487           65 :         if (values[i] == NULL)
    2488            0 :             continue;
    2489           65 :         appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
    2490           65 :         appendEscapedValue(&str, values[i]);
    2491           65 :         sep = " ";
    2492              :     }
    2493              : 
    2494            9 :     if (appname != NULL)
    2495            1 :         pfree(appname);
    2496            9 :     pfree(keywords);
    2497            9 :     pfree(values);
    2498            9 :     PG_RETURN_TEXT_P(cstring_to_text(str.data));
    2499              : }
    2500              : 
    2501              : /*
    2502              :  * List active foreign server connections.
    2503              :  *
    2504              :  * The SQL API of this function has changed multiple times, and will likely
    2505              :  * do so again in future.  To support the case where a newer version of this
    2506              :  * loadable module is being used with an old SQL declaration of the function,
    2507              :  * we continue to support the older API versions.
    2508              :  */
    2509              : Datum
    2510           13 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
    2511              : {
    2512           13 :     postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
    2513              : 
    2514           13 :     PG_RETURN_VOID();
    2515              : }
    2516              : 
    2517              : Datum
    2518            0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
    2519              : {
    2520            0 :     postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
    2521              : 
    2522            0 :     PG_RETURN_VOID();
    2523              : }
    2524              : 
    2525              : /*
    2526              :  * Disconnect the specified cached connections.
    2527              :  *
    2528              :  * This function discards the open connections that are established by
    2529              :  * postgres_fdw from the local session to the foreign server with
    2530              :  * the given name. Note that there can be multiple connections to
    2531              :  * the given server using different user mappings. If the connections
    2532              :  * are used in the current local transaction, they are not disconnected
    2533              :  * and warning messages are reported. This function returns true
    2534              :  * if it disconnects at least one connection, otherwise false. If no
    2535              :  * foreign server with the given name is found, an error is reported.
    2536              :  */
    2537              : Datum
    2538            4 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
    2539              : {
    2540              :     ForeignServer *server;
    2541              :     char       *servername;
    2542              : 
    2543            4 :     servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
    2544            4 :     server = GetForeignServerByName(servername, false);
    2545              : 
    2546            3 :     PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
    2547              : }
    2548              : 
    2549              : /*
    2550              :  * Disconnect all the cached connections.
    2551              :  *
    2552              :  * This function discards all the open connections that are established by
    2553              :  * postgres_fdw from the local session to the foreign servers.
    2554              :  * If the connections are used in the current local transaction, they are
    2555              :  * not disconnected and warning messages are reported. This function
    2556              :  * returns true if it disconnects at least one connection, otherwise false.
    2557              :  */
    2558              : Datum
    2559            5 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
    2560              : {
    2561            5 :     PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
    2562              : }
    2563              : 
    2564              : /*
    2565              :  * Workhorse to disconnect cached connections.
    2566              :  *
    2567              :  * This function scans all the connection cache entries and disconnects
    2568              :  * the open connections whose foreign server OID matches with
    2569              :  * the specified one. If InvalidOid is specified, it disconnects all
    2570              :  * the cached connections.
    2571              :  *
    2572              :  * This function emits a warning for each connection that's used in
    2573              :  * the current transaction and doesn't close it. It returns true if
    2574              :  * it disconnects at least one connection, otherwise false.
    2575              :  *
    2576              :  * Note that this function disconnects even the connections that are
    2577              :  * established by other users in the same local session using different
    2578              :  * user mappings. This leads even non-superuser to be able to close
    2579              :  * the connections established by superusers in the same local session.
    2580              :  *
    2581              :  * XXX As of now we don't see any security risk doing this. But we should
    2582              :  * set some restrictions on that, for example, prevent non-superuser
    2583              :  * from closing the connections established by superusers even
    2584              :  * in the same session?
    2585              :  */
    2586              : static bool
    2587            8 : disconnect_cached_connections(Oid serverid)
    2588              : {
    2589              :     HASH_SEQ_STATUS scan;
    2590              :     ConnCacheEntry *entry;
    2591            8 :     bool        all = !OidIsValid(serverid);
    2592            8 :     bool        result = false;
    2593              : 
    2594              :     /*
    2595              :      * Connection cache hashtable has not been initialized yet in this
    2596              :      * session, so return false.
    2597              :      */
    2598            8 :     if (!ConnectionHash)
    2599            0 :         return false;
    2600              : 
    2601            8 :     hash_seq_init(&scan, ConnectionHash);
    2602           67 :     while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
    2603              :     {
    2604              :         /* Ignore cache entry if no open connection right now. */
    2605           59 :         if (!entry->conn)
    2606           47 :             continue;
    2607              : 
    2608           12 :         if (all || entry->serverid == serverid)
    2609              :         {
    2610              :             /*
    2611              :              * Emit a warning because the connection to close is used in the
    2612              :              * current transaction and cannot be disconnected right now.
    2613              :              */
    2614            9 :             if (entry->xact_depth > 0)
    2615              :             {
    2616              :                 ForeignServer *server;
    2617              : 
    2618            3 :                 server = GetForeignServerExtended(entry->serverid,
    2619              :                                                   FSV_MISSING_OK);
    2620              : 
    2621            3 :                 if (!server)
    2622              :                 {
    2623              :                     /*
    2624              :                      * If the foreign server was dropped while its connection
    2625              :                      * was used in the current transaction, the connection
    2626              :                      * must have been marked as invalid by
    2627              :                      * pgfdw_inval_callback at the end of DROP SERVER command.
    2628              :                      */
    2629              :                     Assert(entry->invalidated);
    2630              : 
    2631            0 :                     ereport(WARNING,
    2632              :                             (errmsg("cannot close dropped server connection because it is still in use")));
    2633              :                 }
    2634              :                 else
    2635            3 :                     ereport(WARNING,
    2636              :                             (errmsg("cannot close connection for server \"%s\" because it is still in use",
    2637              :                                     server->servername)));
    2638              :             }
    2639              :             else
    2640              :             {
    2641            6 :                 elog(DEBUG3, "discarding connection %p", entry->conn);
    2642            6 :                 disconnect_pg_server(entry);
    2643            6 :                 result = true;
    2644              :             }
    2645              :         }
    2646              :     }
    2647              : 
    2648            8 :     return result;
    2649              : }
    2650              : 
    2651              : /*
    2652              :  * Check if the remote server closed the connection.
    2653              :  *
    2654              :  * Returns 1 if the connection is closed, -1 if an error occurred,
    2655              :  * and 0 if it's not closed or if the connection check is unavailable
    2656              :  * on this platform.
    2657              :  */
    2658              : static int
    2659            2 : pgfdw_conn_check(PGconn *conn)
    2660              : {
    2661            2 :     int         sock = PQsocket(conn);
    2662              : 
    2663            2 :     if (PQstatus(conn) != CONNECTION_OK || sock == -1)
    2664            0 :         return -1;
    2665              : 
    2666              : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2667              :     {
    2668              :         struct pollfd input_fd;
    2669              :         int         result;
    2670              : 
    2671            2 :         input_fd.fd = sock;
    2672            2 :         input_fd.events = POLLRDHUP;
    2673            2 :         input_fd.revents = 0;
    2674              : 
    2675              :         do
    2676            2 :             result = poll(&input_fd, 1, 0);
    2677            2 :         while (result < 0 && errno == EINTR);
    2678              : 
    2679            2 :         if (result < 0)
    2680            0 :             return -1;
    2681              : 
    2682            2 :         return (input_fd.revents &
    2683            2 :                 (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
    2684              :     }
    2685              : #else
    2686              :     return 0;
    2687              : #endif
    2688              : }
    2689              : 
    2690              : /*
    2691              :  * Check if connection status checking is available on this platform.
    2692              :  *
    2693              :  * Returns true if available, false otherwise.
    2694              :  */
    2695              : static bool
    2696            2 : pgfdw_conn_checkable(void)
    2697              : {
    2698              : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
    2699            2 :     return true;
    2700              : #else
    2701              :     return false;
    2702              : #endif
    2703              : }
    2704              : 
    2705              : /*
    2706              :  * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
    2707              :  * keys used to pass-through are coming from the initial connection from the
    2708              :  * client with the server.
    2709              :  *
    2710              :  * All required SCRAM options are set by postgres_fdw, so we just need to
    2711              :  * ensure that these options are not overwritten by the user.
    2712              :  */
    2713              : static bool
    2714           11 : pgfdw_has_required_scram_options(const char **keywords, const char **values)
    2715              : {
    2716           11 :     bool        has_scram_server_key = false;
    2717           11 :     bool        has_scram_client_key = false;
    2718           11 :     bool        has_require_auth = false;
    2719           11 :     bool        has_scram_keys = false;
    2720              : 
    2721              :     /*
    2722              :      * Continue iterating even if we found the keys that we need to validate
    2723              :      * to make sure that there is no other declaration of these keys that can
    2724              :      * overwrite the first.
    2725              :      */
    2726          107 :     for (int i = 0; keywords[i] != NULL; i++)
    2727              :     {
    2728           96 :         if (strcmp(keywords[i], "scram_client_key") == 0)
    2729              :         {
    2730           10 :             if (values[i] != NULL && values[i][0] != '\0')
    2731           10 :                 has_scram_client_key = true;
    2732              :             else
    2733            0 :                 has_scram_client_key = false;
    2734              :         }
    2735              : 
    2736           96 :         if (strcmp(keywords[i], "scram_server_key") == 0)
    2737              :         {
    2738           10 :             if (values[i] != NULL && values[i][0] != '\0')
    2739           10 :                 has_scram_server_key = true;
    2740              :             else
    2741            0 :                 has_scram_server_key = false;
    2742              :         }
    2743              : 
    2744           96 :         if (strcmp(keywords[i], "require_auth") == 0)
    2745              :         {
    2746           10 :             if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
    2747           10 :                 has_require_auth = true;
    2748              :             else
    2749            0 :                 has_require_auth = false;
    2750              :         }
    2751              :     }
    2752              : 
    2753           11 :     has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
    2754              : 
    2755           11 :     return (has_scram_keys && has_require_auth);
    2756              : }
        

Generated by: LCOV version 2.0-1