Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * connection.c
4 : * Connection management functions for postgres_fdw
5 : *
6 : * Portions Copyright (c) 2012-2026, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/postgres_fdw/connection.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #if HAVE_POLL_H
16 : #include <poll.h>
17 : #endif
18 :
19 : #include "access/htup_details.h"
20 : #include "access/xact.h"
21 : #include "catalog/pg_user_mapping.h"
22 : #include "commands/defrem.h"
23 : #include "common/base64.h"
24 : #include "funcapi.h"
25 : #include "libpq/libpq-be.h"
26 : #include "libpq/libpq-be-fe-helpers.h"
27 : #include "mb/pg_wchar.h"
28 : #include "miscadmin.h"
29 : #include "pgstat.h"
30 : #include "postgres_fdw.h"
31 : #include "storage/latch.h"
32 : #include "utils/builtins.h"
33 : #include "utils/hsearch.h"
34 : #include "utils/inval.h"
35 : #include "utils/syscache.h"
36 : #include "utils/tuplestore.h"
37 :
38 : /*
39 : * Connection cache hash table entry
40 : *
41 : * The lookup key in this hash table is the user mapping OID. We use just one
42 : * connection per user mapping ID, which ensures that all the scans use the
43 : * same snapshot during a query. Using the user mapping OID rather than
44 : * the foreign server OID + user OID avoids creating multiple connections when
45 : * the public user mapping applies to all user OIDs.
46 : *
47 : * The "conn" pointer can be NULL if we don't currently have a live connection.
48 : * When we do have a connection, xact_depth tracks the current depth of
49 : * transactions and subtransactions open on the remote side. We need to issue
50 : * commands at the same nesting depth on the remote as we're executing at
51 : * ourselves, so that rolling back a subtransaction will kill the right
52 : * queries and not the wrong ones.
53 : */
54 : typedef Oid ConnCacheKey;
55 :
56 : typedef struct ConnCacheEntry
57 : {
58 : ConnCacheKey key; /* hash key (must be first) */
59 : PGconn *conn; /* connection to foreign server, or NULL */
60 : /* Remaining fields are invalid when conn is NULL: */
61 : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
62 : * one level of subxact open, etc */
63 : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
64 : bool have_error; /* have any subxacts aborted in this xact? */
65 : bool changing_xact_state; /* xact state change in process */
66 : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
67 : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
68 : bool invalidated; /* true if reconnect is pending */
69 : bool keep_connections; /* setting value of keep_connections
70 : * server option */
71 : Oid serverid; /* foreign server OID used to get server name */
72 : uint32 server_hashvalue; /* hash value of foreign server OID */
73 : uint32 mapping_hashvalue; /* hash value of user mapping OID */
74 : PgFdwConnState state; /* extra per-connection state */
75 : } ConnCacheEntry;
76 :
77 : /*
78 : * Connection cache (initialized on first use)
79 : */
80 : static HTAB *ConnectionHash = NULL;
81 :
82 : /* for assigning cursor numbers and prepared statement numbers */
83 : static unsigned int cursor_number = 0;
84 : static unsigned int prep_stmt_number = 0;
85 :
86 : /* tracks whether any work is needed in callback functions */
87 : static bool xact_got_connection = false;
88 :
89 : /* custom wait event values, retrieved from shared memory */
90 : static uint32 pgfdw_we_cleanup_result = 0;
91 : static uint32 pgfdw_we_connect = 0;
92 : static uint32 pgfdw_we_get_result = 0;
93 :
94 : /*
95 : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
96 : * query; if it takes longer than 30 seconds to do these, we assume the
97 : * connection is dead.
98 : */
99 : #define CONNECTION_CLEANUP_TIMEOUT 30000
100 :
101 : /*
102 : * Milliseconds to wait before issuing another cancel request. This covers
103 : * the race condition where the remote session ignored our cancel request
104 : * because it arrived while idle.
105 : */
106 : #define RETRY_CANCEL_TIMEOUT 1000
107 :
108 : /* Macro for constructing abort command to be sent */
109 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
110 : do { \
111 : if (toplevel) \
112 : snprintf((sql), sizeof(sql), \
113 : "ABORT TRANSACTION"); \
114 : else \
115 : snprintf((sql), sizeof(sql), \
116 : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
117 : (entry)->xact_depth, (entry)->xact_depth); \
118 : } while(0)
119 :
120 : /*
121 : * Extension version number, for supporting older extension versions' objects
122 : */
123 : enum pgfdwVersion
124 : {
125 : PGFDW_V1_1 = 0,
126 : PGFDW_V1_2,
127 : };
128 :
129 : /*
130 : * SQL functions
131 : */
132 4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
133 5 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
134 5 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
135 5 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
136 9 : PG_FUNCTION_INFO_V1(postgres_fdw_connection);
137 :
138 : /* prototypes of private functions */
139 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
140 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
141 : static void disconnect_pg_server(ConnCacheEntry *entry);
142 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
143 : static void configure_remote_session(PGconn *conn);
144 : static void do_sql_command_begin(PGconn *conn, const char *sql);
145 : static void do_sql_command_end(PGconn *conn, const char *sql,
146 : bool consume_input);
147 : static void begin_remote_xact(ConnCacheEntry *entry);
148 : static void pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
149 : const char *sql);
150 : static void pgfdw_xact_callback(XactEvent event, void *arg);
151 : static void pgfdw_subxact_callback(SubXactEvent event,
152 : SubTransactionId mySubid,
153 : SubTransactionId parentSubid,
154 : void *arg);
155 : static void pgfdw_inval_callback(Datum arg, SysCacheIdentifier cacheid,
156 : uint32 hashvalue);
157 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
158 : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
159 : static bool pgfdw_cancel_query(PGconn *conn);
160 : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
161 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
162 : TimestampTz retrycanceltime,
163 : bool consume_input);
164 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
165 : bool ignore_errors);
166 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
167 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
168 : TimestampTz endtime,
169 : bool consume_input,
170 : bool ignore_errors);
171 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
172 : TimestampTz retrycanceltime,
173 : PGresult **result, bool *timed_out);
174 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
175 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
176 : List **pending_entries,
177 : List **cancel_requested);
178 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
179 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
180 : int curlevel);
181 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
182 : List *cancel_requested,
183 : bool toplevel);
184 : static void pgfdw_security_check(const char **keywords, const char **values,
185 : UserMapping *user, PGconn *conn);
186 : static bool UserMappingPasswordRequired(UserMapping *user);
187 : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
188 : static bool disconnect_cached_connections(Oid serverid);
189 : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
190 : enum pgfdwVersion api_version);
191 : static int pgfdw_conn_check(PGconn *conn);
192 : static bool pgfdw_conn_checkable(void);
193 : static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
194 :
195 : /*
196 : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
197 : * server with the user's authorization. A new connection is established
198 : * if we don't already have a suitable one, and a transaction is opened at
199 : * the right subtransaction nesting depth if we didn't do that already.
200 : *
201 : * will_prep_stmt must be true if caller intends to create any prepared
202 : * statements. Since those don't go away automatically at transaction end
203 : * (not even on error), we need this flag to cue manual cleanup.
204 : *
205 : * If state is not NULL, *state receives the per-connection state associated
206 : * with the PGconn.
207 : */
208 : PGconn *
209 2247 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
210 : {
211 : bool found;
212 2247 : bool retry = false;
213 : ConnCacheEntry *entry;
214 : ConnCacheKey key;
215 2247 : MemoryContext ccxt = CurrentMemoryContext;
216 :
217 : /* First time through, initialize connection cache hashtable */
218 2247 : if (ConnectionHash == NULL)
219 : {
220 : HASHCTL ctl;
221 :
222 11 : if (pgfdw_we_get_result == 0)
223 11 : pgfdw_we_get_result =
224 11 : WaitEventExtensionNew("PostgresFdwGetResult");
225 :
226 11 : ctl.keysize = sizeof(ConnCacheKey);
227 11 : ctl.entrysize = sizeof(ConnCacheEntry);
228 11 : ConnectionHash = hash_create("postgres_fdw connections", 8,
229 : &ctl,
230 : HASH_ELEM | HASH_BLOBS);
231 :
232 : /*
233 : * Register some callback functions that manage connection cleanup.
234 : * This should be done just once in each backend.
235 : */
236 11 : RegisterXactCallback(pgfdw_xact_callback, NULL);
237 11 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
238 11 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
239 : pgfdw_inval_callback, (Datum) 0);
240 11 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
241 : pgfdw_inval_callback, (Datum) 0);
242 : }
243 :
244 : /* Set flag that we did GetConnection during the current transaction */
245 2247 : xact_got_connection = true;
246 :
247 : /* Create hash key for the entry. Assume no pad bytes in key struct */
248 2247 : key = user->umid;
249 :
250 : /*
251 : * Find or create cached entry for requested connection.
252 : */
253 2247 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
254 2247 : if (!found)
255 : {
256 : /*
257 : * We need only clear "conn" here; remaining fields will be filled
258 : * later when "conn" is set.
259 : */
260 22 : entry->conn = NULL;
261 : }
262 :
263 : /* Reject further use of connections which failed abort cleanup. */
264 2247 : pgfdw_reject_incomplete_xact_state_change(entry);
265 :
266 : /*
267 : * If the connection needs to be remade due to invalidation, disconnect as
268 : * soon as we're out of all transactions.
269 : */
270 2247 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
271 : {
272 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
273 : entry->conn);
274 0 : disconnect_pg_server(entry);
275 : }
276 :
277 : /*
278 : * If cache entry doesn't have a connection, we have to establish a new
279 : * connection. (If connect_pg_server throws an error, the cache entry
280 : * will remain in a valid empty state, ie conn == NULL.)
281 : */
282 2247 : if (entry->conn == NULL)
283 82 : make_new_connection(entry, user);
284 :
285 : /*
286 : * We check the health of the cached connection here when using it. In
287 : * cases where we're out of all transactions, if a broken connection is
288 : * detected, we try to reestablish a new connection later.
289 : */
290 2240 : PG_TRY();
291 : {
292 : /* Process a pending asynchronous request if any. */
293 2240 : if (entry->state.pendingAreq)
294 0 : process_pending_request(entry->state.pendingAreq);
295 : /* Start a new transaction or subtransaction if needed. */
296 2240 : begin_remote_xact(entry);
297 : }
298 2 : PG_CATCH();
299 : {
300 2 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
301 2 : ErrorData *errdata = CopyErrorData();
302 :
303 : /*
304 : * Determine whether to try to reestablish the connection.
305 : *
306 : * After a broken connection is detected in libpq, any error other
307 : * than connection failure (e.g., out-of-memory) can be thrown
308 : * somewhere between return from libpq and the expected ereport() call
309 : * in pgfdw_report_error(). In this case, since PQstatus() indicates
310 : * CONNECTION_BAD, checking only PQstatus() causes the false detection
311 : * of connection failure. To avoid this, we also verify that the
312 : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
313 : * checking only the sqlstate can cause another false detection
314 : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
315 : * for any libpq-originated error condition.
316 : */
317 2 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
318 2 : PQstatus(entry->conn) != CONNECTION_BAD ||
319 2 : entry->xact_depth > 0)
320 : {
321 1 : MemoryContextSwitchTo(ecxt);
322 1 : PG_RE_THROW();
323 : }
324 :
325 : /* Clean up the error state */
326 1 : FlushErrorState();
327 1 : FreeErrorData(errdata);
328 1 : errdata = NULL;
329 :
330 1 : retry = true;
331 : }
332 2239 : PG_END_TRY();
333 :
334 : /*
335 : * If a broken connection is detected, disconnect it, reestablish a new
336 : * connection and retry a new remote transaction. If connection failure is
337 : * reported again, we give up getting a connection.
338 : */
339 2239 : if (retry)
340 : {
341 : Assert(entry->xact_depth == 0);
342 :
343 1 : ereport(DEBUG3,
344 : (errmsg_internal("could not start remote transaction on connection %p",
345 : entry->conn)),
346 : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
347 :
348 1 : elog(DEBUG3, "closing connection %p to reestablish a new one",
349 : entry->conn);
350 1 : disconnect_pg_server(entry);
351 :
352 1 : make_new_connection(entry, user);
353 :
354 1 : begin_remote_xact(entry);
355 : }
356 :
357 : /* Remember if caller will prepare statements */
358 2239 : entry->have_prep_stmt |= will_prep_stmt;
359 :
360 : /* If caller needs access to the per-connection state, return it. */
361 2239 : if (state)
362 758 : *state = &entry->state;
363 :
364 2239 : return entry->conn;
365 : }
366 :
367 : /*
368 : * Reset all transient state fields in the cached connection entry and
369 : * establish new connection to the remote server.
370 : */
371 : static void
372 83 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
373 : {
374 83 : ForeignServer *server = GetForeignServer(user->serverid);
375 : ListCell *lc;
376 :
377 : Assert(entry->conn == NULL);
378 :
379 : /* Reset all transient state fields, to be sure all are clean */
380 83 : entry->xact_depth = 0;
381 83 : entry->have_prep_stmt = false;
382 83 : entry->have_error = false;
383 83 : entry->changing_xact_state = false;
384 83 : entry->invalidated = false;
385 83 : entry->serverid = server->serverid;
386 83 : entry->server_hashvalue =
387 83 : GetSysCacheHashValue1(FOREIGNSERVEROID,
388 : ObjectIdGetDatum(server->serverid));
389 83 : entry->mapping_hashvalue =
390 83 : GetSysCacheHashValue1(USERMAPPINGOID,
391 : ObjectIdGetDatum(user->umid));
392 83 : memset(&entry->state, 0, sizeof(entry->state));
393 :
394 : /*
395 : * Determine whether to keep the connection that we're about to make here
396 : * open even after the transaction using it ends, so that the subsequent
397 : * transactions can re-use it.
398 : *
399 : * By default, all the connections to any foreign servers are kept open.
400 : *
401 : * Also determine whether to commit/abort (sub)transactions opened on the
402 : * remote server in parallel at (sub)transaction end, which is disabled by
403 : * default.
404 : *
405 : * Note: it's enough to determine these only when making a new connection
406 : * because if these settings for it are changed, it will be closed and
407 : * re-made later.
408 : */
409 83 : entry->keep_connections = true;
410 83 : entry->parallel_commit = false;
411 83 : entry->parallel_abort = false;
412 380 : foreach(lc, server->options)
413 : {
414 297 : DefElem *def = (DefElem *) lfirst(lc);
415 :
416 297 : if (strcmp(def->defname, "keep_connections") == 0)
417 13 : entry->keep_connections = defGetBoolean(def);
418 284 : else if (strcmp(def->defname, "parallel_commit") == 0)
419 2 : entry->parallel_commit = defGetBoolean(def);
420 282 : else if (strcmp(def->defname, "parallel_abort") == 0)
421 2 : entry->parallel_abort = defGetBoolean(def);
422 : }
423 :
424 : /* Now try to make the connection */
425 83 : entry->conn = connect_pg_server(server, user);
426 :
427 76 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
428 : entry->conn, server->servername, user->umid, user->userid);
429 76 : }
430 :
431 : /*
432 : * Check that non-superuser has used password or delegated credentials
433 : * to establish connection; otherwise, he's piggybacking on the
434 : * postgres server's user identity. See also dblink_security_check()
435 : * in contrib/dblink and check_conn_params.
436 : */
437 : static void
438 78 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
439 : {
440 : /* Superusers bypass the check */
441 78 : if (superuser_arg(user->userid))
442 70 : return;
443 :
444 : #ifdef ENABLE_GSS
445 : /* Connected via GSSAPI with delegated credentials- all good. */
446 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
447 : return;
448 : #endif
449 :
450 : /* Ok if superuser set PW required false. */
451 8 : if (!UserMappingPasswordRequired(user))
452 2 : return;
453 :
454 : /* Connected via PW, with PW required true, and provided non-empty PW. */
455 6 : if (PQconnectionUsedPassword(conn))
456 : {
457 : /* ok if params contain a non-empty password */
458 40 : for (int i = 0; keywords[i] != NULL; i++)
459 : {
460 36 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
461 0 : return;
462 : }
463 : }
464 :
465 : /*
466 : * Ok if SCRAM pass-through is being used and all required SCRAM options
467 : * are set correctly. If pgfdw_has_required_scram_options returns true we
468 : * assume that UseScramPassthrough is also true since SCRAM options are
469 : * only set when UseScramPassthrough is enabled.
470 : */
471 6 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
472 4 : return;
473 :
474 2 : ereport(ERROR,
475 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
476 : errmsg("password or GSSAPI delegated credentials required"),
477 : errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
478 : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
479 : }
480 :
481 : /*
482 : * Construct connection params from generic options of ForeignServer and
483 : * UserMapping. (Some of them might not be libpq options, in which case we'll
484 : * just waste a few array slots.)
485 : */
486 : static void
487 89 : construct_connection_params(ForeignServer *server, UserMapping *user,
488 : const char ***p_keywords, const char ***p_values,
489 : char **p_appname)
490 : {
491 : const char **keywords;
492 : const char **values;
493 89 : char *appname = NULL;
494 : int n;
495 :
496 : /*
497 : * Add 4 extra slots for application_name, fallback_application_name,
498 : * client_encoding, end marker, and 3 extra slots for scram keys and
499 : * required scram pass-through options.
500 : */
501 89 : n = list_length(server->options) + list_length(user->options) + 4 + 3;
502 89 : keywords = (const char **) palloc(n * sizeof(char *));
503 89 : values = (const char **) palloc(n * sizeof(char *));
504 :
505 89 : n = 0;
506 178 : n += ExtractConnectionOptions(server->options,
507 89 : keywords + n, values + n);
508 178 : n += ExtractConnectionOptions(user->options,
509 89 : keywords + n, values + n);
510 :
511 : /*
512 : * Use pgfdw_application_name as application_name if set.
513 : *
514 : * PQconnectdbParams() processes the parameter arrays from start to end.
515 : * If any key word is repeated, the last value is used. Therefore note
516 : * that pgfdw_application_name must be added to the arrays after options
517 : * of ForeignServer are, so that it can override application_name set in
518 : * ForeignServer.
519 : */
520 89 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
521 : {
522 1 : keywords[n] = "application_name";
523 1 : values[n] = pgfdw_application_name;
524 1 : n++;
525 : }
526 :
527 : /*
528 : * Search the parameter arrays to find application_name setting, and
529 : * replace escape sequences in it with status information if found. The
530 : * arrays are searched backwards because the last value is used if
531 : * application_name is repeatedly set.
532 : */
533 267 : for (int i = n - 1; i >= 0; i--)
534 : {
535 203 : if (strcmp(keywords[i], "application_name") == 0 &&
536 25 : *(values[i]) != '\0')
537 : {
538 : /*
539 : * Use this application_name setting if it's not empty string even
540 : * after any escape sequences in it are replaced.
541 : */
542 25 : appname = process_pgfdw_appname(values[i]);
543 25 : if (appname[0] != '\0')
544 : {
545 25 : values[i] = appname;
546 25 : break;
547 : }
548 :
549 : /*
550 : * This empty application_name is not used, so we set values[i] to
551 : * NULL and keep searching the array to find the next one.
552 : */
553 0 : values[i] = NULL;
554 0 : pfree(appname);
555 0 : appname = NULL;
556 : }
557 : }
558 :
559 89 : *p_appname = appname;
560 :
561 : /* Use "postgres_fdw" as fallback_application_name */
562 89 : keywords[n] = "fallback_application_name";
563 89 : values[n] = "postgres_fdw";
564 89 : n++;
565 :
566 : /* Set client_encoding so that libpq can convert encoding properly. */
567 89 : keywords[n] = "client_encoding";
568 89 : values[n] = GetDatabaseEncodingName();
569 89 : n++;
570 :
571 : /* Add required SCRAM pass-through connection options if it's enabled. */
572 89 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
573 : {
574 : int len;
575 : int encoded_len;
576 :
577 5 : keywords[n] = "scram_client_key";
578 5 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
579 : /* don't forget the zero-terminator */
580 5 : values[n] = palloc0(len + 1);
581 5 : encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
582 : sizeof(MyProcPort->scram_ClientKey),
583 5 : (char *) values[n], len);
584 5 : if (encoded_len < 0)
585 0 : elog(ERROR, "could not encode SCRAM client key");
586 5 : n++;
587 :
588 5 : keywords[n] = "scram_server_key";
589 5 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
590 : /* don't forget the zero-terminator */
591 5 : values[n] = palloc0(len + 1);
592 5 : encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
593 : sizeof(MyProcPort->scram_ServerKey),
594 5 : (char *) values[n], len);
595 5 : if (encoded_len < 0)
596 0 : elog(ERROR, "could not encode SCRAM server key");
597 5 : n++;
598 :
599 : /*
600 : * Require scram-sha-256 to ensure that no other auth method is used
601 : * when connecting with foreign server.
602 : */
603 5 : keywords[n] = "require_auth";
604 5 : values[n] = "scram-sha-256";
605 5 : n++;
606 : }
607 :
608 89 : keywords[n] = values[n] = NULL;
609 :
610 : /* Verify the set of connection parameters. */
611 89 : check_conn_params(keywords, values, user);
612 :
613 87 : *p_keywords = keywords;
614 87 : *p_values = values;
615 87 : }
616 :
617 : /*
618 : * Connect to remote server using specified server and user mapping properties.
619 : */
620 : static PGconn *
621 83 : connect_pg_server(ForeignServer *server, UserMapping *user)
622 : {
623 83 : PGconn *volatile conn = NULL;
624 :
625 : /*
626 : * Use PG_TRY block to ensure closing connection on error.
627 : */
628 83 : PG_TRY();
629 : {
630 : const char **keywords;
631 : const char **values;
632 : char *appname;
633 :
634 83 : construct_connection_params(server, user, &keywords, &values, &appname);
635 :
636 : /* first time, allocate or get the custom wait event */
637 81 : if (pgfdw_we_connect == 0)
638 11 : pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
639 :
640 : /* OK to make connection */
641 81 : conn = libpqsrv_connect_params(keywords, values,
642 : false, /* expand_dbname */
643 : pgfdw_we_connect);
644 :
645 81 : if (!conn || PQstatus(conn) != CONNECTION_OK)
646 3 : ereport(ERROR,
647 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
648 : errmsg("could not connect to server \"%s\"",
649 : server->servername),
650 : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
651 :
652 78 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
653 : "received message via remote connection");
654 :
655 : /* Perform post-connection security checks. */
656 78 : pgfdw_security_check(keywords, values, user, conn);
657 :
658 : /* Prepare new session for use */
659 76 : configure_remote_session(conn);
660 :
661 76 : if (appname != NULL)
662 23 : pfree(appname);
663 76 : pfree(keywords);
664 76 : pfree(values);
665 : }
666 7 : PG_CATCH();
667 : {
668 7 : libpqsrv_disconnect(conn);
669 7 : PG_RE_THROW();
670 : }
671 76 : PG_END_TRY();
672 :
673 76 : return conn;
674 : }
675 :
676 : /*
677 : * Disconnect any open connection for a connection cache entry.
678 : */
679 : static void
680 66 : disconnect_pg_server(ConnCacheEntry *entry)
681 : {
682 66 : if (entry->conn != NULL)
683 : {
684 66 : libpqsrv_disconnect(entry->conn);
685 66 : entry->conn = NULL;
686 : }
687 66 : }
688 :
689 : /*
690 : * Check and return the value of password_required, if defined; otherwise,
691 : * return true, which is the default value of it. The mapping has been
692 : * pre-validated.
693 : */
694 : static bool
695 16 : UserMappingPasswordRequired(UserMapping *user)
696 : {
697 : ListCell *cell;
698 :
699 28 : foreach(cell, user->options)
700 : {
701 15 : DefElem *def = (DefElem *) lfirst(cell);
702 :
703 15 : if (strcmp(def->defname, "password_required") == 0)
704 3 : return defGetBoolean(def);
705 : }
706 :
707 13 : return true;
708 : }
709 :
710 : static bool
711 5 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
712 : {
713 : ListCell *cell;
714 :
715 20 : foreach(cell, server->options)
716 : {
717 20 : DefElem *def = (DefElem *) lfirst(cell);
718 :
719 20 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
720 5 : return defGetBoolean(def);
721 : }
722 :
723 0 : foreach(cell, user->options)
724 : {
725 0 : DefElem *def = (DefElem *) lfirst(cell);
726 :
727 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
728 0 : return defGetBoolean(def);
729 : }
730 :
731 0 : return false;
732 : }
733 :
734 : /*
735 : * For non-superusers, insist that the connstr specify a password or that the
736 : * user provided their own GSSAPI delegated credentials. This
737 : * prevents a password from being picked up from .pgpass, a service file, the
738 : * environment, etc. We don't want the postgres user's passwords,
739 : * certificates, etc to be accessible to non-superusers. (See also
740 : * dblink_connstr_check in contrib/dblink.)
741 : */
742 : static void
743 89 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
744 : {
745 : int i;
746 :
747 : /* no check required if superuser */
748 89 : if (superuser_arg(user->userid))
749 78 : return;
750 :
751 : #ifdef ENABLE_GSS
752 : /* ok if the user provided their own delegated credentials */
753 : if (be_gssapi_get_delegation(MyProcPort))
754 : return;
755 : #endif
756 :
757 : /* ok if params contain a non-empty password */
758 74 : for (i = 0; keywords[i] != NULL; i++)
759 : {
760 66 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
761 3 : return;
762 : }
763 :
764 : /* ok if the superuser explicitly said so at user mapping creation time */
765 8 : if (!UserMappingPasswordRequired(user))
766 1 : return;
767 :
768 : /*
769 : * Ok if SCRAM pass-through is being used and all required scram options
770 : * are set correctly. If pgfdw_has_required_scram_options returns true we
771 : * assume that UseScramPassthrough is also true since SCRAM options are
772 : * only set when UseScramPassthrough is enabled.
773 : */
774 7 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
775 5 : return;
776 :
777 2 : ereport(ERROR,
778 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
779 : errmsg("password or GSSAPI delegated credentials required"),
780 : errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
781 : }
782 :
783 : /*
784 : * Issue SET commands to make sure remote session is configured properly.
785 : *
786 : * We do this just once at connection, assuming nothing will change the
787 : * values later. Since we'll never send volatile function calls to the
788 : * remote, there shouldn't be any way to break this assumption from our end.
789 : * It's possible to think of ways to break it at the remote end, eg making
790 : * a foreign table point to a view that includes a set_config call ---
791 : * but once you admit the possibility of a malicious view definition,
792 : * there are any number of ways to break things.
793 : */
794 : static void
795 76 : configure_remote_session(PGconn *conn)
796 : {
797 76 : int remoteversion = PQserverVersion(conn);
798 :
799 : /* Force the search path to contain only pg_catalog (see deparse.c) */
800 76 : do_sql_command(conn, "SET search_path = pg_catalog");
801 :
802 : /*
803 : * Set remote timezone; this is basically just cosmetic, since all
804 : * transmitted and returned timestamptzs should specify a zone explicitly
805 : * anyway. However it makes the regression test outputs more predictable.
806 : *
807 : * We don't risk setting remote zone equal to ours, since the remote
808 : * server might use a different timezone database. Instead, use GMT
809 : * (quoted, because very old servers are picky about case). That's
810 : * guaranteed to work regardless of the remote's timezone database,
811 : * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
812 : */
813 76 : do_sql_command(conn, "SET timezone = 'GMT'");
814 :
815 : /*
816 : * Set values needed to ensure unambiguous data output from remote. (This
817 : * logic should match what pg_dump does. See also set_transmission_modes
818 : * in postgres_fdw.c.)
819 : */
820 76 : do_sql_command(conn, "SET datestyle = ISO");
821 76 : if (remoteversion >= 80400)
822 76 : do_sql_command(conn, "SET intervalstyle = postgres");
823 76 : if (remoteversion >= 90000)
824 76 : do_sql_command(conn, "SET extra_float_digits = 3");
825 : else
826 0 : do_sql_command(conn, "SET extra_float_digits = 2");
827 76 : }
828 :
829 : /*
830 : * Convenience subroutine to issue a non-data-returning SQL command to remote
831 : */
832 : void
833 1853 : do_sql_command(PGconn *conn, const char *sql)
834 : {
835 1853 : do_sql_command_begin(conn, sql);
836 1853 : do_sql_command_end(conn, sql, false);
837 1850 : }
838 :
839 : static void
840 1871 : do_sql_command_begin(PGconn *conn, const char *sql)
841 : {
842 1871 : if (!PQsendQuery(conn, sql))
843 0 : pgfdw_report_error(NULL, conn, sql);
844 1871 : }
845 :
846 : static void
847 1871 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
848 : {
849 : PGresult *res;
850 :
851 : /*
852 : * If requested, consume whatever data is available from the socket. (Note
853 : * that if all data is available, this allows pgfdw_get_result to call
854 : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
855 : * would be large compared to the overhead of PQconsumeInput.)
856 : */
857 1871 : if (consume_input && !PQconsumeInput(conn))
858 0 : pgfdw_report_error(NULL, conn, sql);
859 1871 : res = pgfdw_get_result(conn);
860 1871 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
861 3 : pgfdw_report_error(res, conn, sql);
862 1868 : PQclear(res);
863 1868 : }
864 :
865 : /*
866 : * Start remote transaction or subtransaction, if needed.
867 : *
868 : * Note that we always use at least REPEATABLE READ in the remote session.
869 : * This is so that, if a query initiates multiple scans of the same or
870 : * different foreign tables, we will get snapshot-consistent results from
871 : * those scans. A disadvantage is that we can't provide sane emulation of
872 : * READ COMMITTED behavior --- it would be nice if we had some other way to
873 : * control which remote queries share a snapshot.
874 : */
875 : static void
876 2241 : begin_remote_xact(ConnCacheEntry *entry)
877 : {
878 2241 : int curlevel = GetCurrentTransactionNestLevel();
879 :
880 : /* Start main transaction if we haven't yet */
881 2241 : if (entry->xact_depth <= 0)
882 : {
883 : const char *sql;
884 :
885 755 : elog(DEBUG3, "starting remote transaction on connection %p",
886 : entry->conn);
887 :
888 755 : if (IsolationIsSerializable())
889 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
890 : else
891 755 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
892 755 : entry->changing_xact_state = true;
893 755 : do_sql_command(entry->conn, sql);
894 754 : entry->xact_depth = 1;
895 754 : entry->changing_xact_state = false;
896 : }
897 :
898 : /*
899 : * If we're in a subtransaction, stack up savepoints to match our level.
900 : * This ensures we can rollback just the desired effects when a
901 : * subtransaction aborts.
902 : */
903 2254 : while (entry->xact_depth < curlevel)
904 : {
905 : char sql[64];
906 :
907 15 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
908 15 : entry->changing_xact_state = true;
909 15 : do_sql_command(entry->conn, sql);
910 14 : entry->xact_depth++;
911 14 : entry->changing_xact_state = false;
912 : }
913 2239 : }
914 :
915 : /*
916 : * Release connection reference count created by calling GetConnection.
917 : */
918 : void
919 2174 : ReleaseConnection(PGconn *conn)
920 : {
921 : /*
922 : * Currently, we don't actually track connection references because all
923 : * cleanup is managed on a transaction or subtransaction basis instead. So
924 : * there's nothing to do here.
925 : */
926 2174 : }
927 :
928 : /*
929 : * Assign a "unique" number for a cursor.
930 : *
931 : * These really only need to be unique per connection within a transaction.
932 : * For the moment we ignore the per-connection point and assign them across
933 : * all connections in the transaction, but we ask for the connection to be
934 : * supplied in case we want to refine that.
935 : *
936 : * Note that even if wraparound happens in a very long transaction, actual
937 : * collisions are highly improbable; just be sure to use %u not %d to print.
938 : */
939 : unsigned int
940 551 : GetCursorNumber(PGconn *conn)
941 : {
942 551 : return ++cursor_number;
943 : }
944 :
945 : /*
946 : * Assign a "unique" number for a prepared statement.
947 : *
948 : * This works much like GetCursorNumber, except that we never reset the counter
949 : * within a session. That's because we can't be 100% sure we've gotten rid
950 : * of all prepared statements on all connections, and it's not really worth
951 : * increasing the risk of prepared-statement name collisions by resetting.
952 : */
953 : unsigned int
954 187 : GetPrepStmtNumber(PGconn *conn)
955 : {
956 187 : return ++prep_stmt_number;
957 : }
958 :
959 : /*
960 : * Submit a query and wait for the result.
961 : *
962 : * Since we don't use non-blocking mode, this can't process interrupts while
963 : * pushing the query text to the server. That risk is relatively small, so we
964 : * ignore that for now.
965 : *
966 : * Caller is responsible for the error handling on the result.
967 : */
968 : PGresult *
969 4113 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
970 : {
971 : /* First, process a pending asynchronous request, if any. */
972 4113 : if (state && state->pendingAreq)
973 4 : process_pending_request(state->pendingAreq);
974 :
975 4113 : if (!PQsendQuery(conn, query))
976 0 : return NULL;
977 4113 : return pgfdw_get_result(conn);
978 : }
979 :
980 : /*
981 : * Wrap libpqsrv_get_result_last(), adding wait event.
982 : *
983 : * Caller is responsible for the error handling on the result.
984 : */
985 : PGresult *
986 8284 : pgfdw_get_result(PGconn *conn)
987 : {
988 8284 : return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
989 : }
990 :
991 : /*
992 : * Report an error we got from the remote server.
993 : *
994 : * Callers should use pgfdw_report_error() to throw an error, or use
995 : * pgfdw_report() for lesser message levels. (We make this distinction
996 : * so that pgfdw_report_error() can be marked noreturn.)
997 : *
998 : * res: PGresult containing the error (might be NULL)
999 : * conn: connection we did the query on
1000 : * sql: NULL, or text of remote command we tried to execute
1001 : *
1002 : * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
1003 : * in which case memory context cleanup will clear it eventually).
1004 : *
1005 : * Note: callers that choose not to throw ERROR for a remote error are
1006 : * responsible for making sure that the associated ConnCacheEntry gets
1007 : * marked with have_error = true.
1008 : */
1009 : void
1010 16 : pgfdw_report_error(PGresult *res, PGconn *conn, const char *sql)
1011 : {
1012 16 : pgfdw_report_internal(ERROR, res, conn, sql);
1013 0 : pg_unreachable();
1014 : }
1015 :
1016 : void
1017 0 : pgfdw_report(int elevel, PGresult *res, PGconn *conn, const char *sql)
1018 : {
1019 : Assert(elevel < ERROR); /* use pgfdw_report_error for that */
1020 0 : pgfdw_report_internal(elevel, res, conn, sql);
1021 0 : }
1022 :
1023 : static void
1024 16 : pgfdw_report_internal(int elevel, PGresult *res, PGconn *conn,
1025 : const char *sql)
1026 : {
1027 16 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
1028 16 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
1029 16 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
1030 16 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
1031 16 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
1032 : int sqlstate;
1033 :
1034 16 : if (diag_sqlstate)
1035 14 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1036 : diag_sqlstate[1],
1037 : diag_sqlstate[2],
1038 : diag_sqlstate[3],
1039 : diag_sqlstate[4]);
1040 : else
1041 2 : sqlstate = ERRCODE_CONNECTION_FAILURE;
1042 :
1043 : /*
1044 : * If we don't get a message from the PGresult, try the PGconn. This is
1045 : * needed because for connection-level failures, PQgetResult may just
1046 : * return NULL, not a PGresult at all.
1047 : */
1048 16 : if (message_primary == NULL)
1049 2 : message_primary = pchomp(PQerrorMessage(conn));
1050 :
1051 16 : ereport(elevel,
1052 : (errcode(sqlstate),
1053 : (message_primary != NULL && message_primary[0] != '\0') ?
1054 : errmsg_internal("%s", message_primary) :
1055 : errmsg("could not obtain message string for remote error"),
1056 : message_detail ? errdetail_internal("%s", message_detail) : 0,
1057 : message_hint ? errhint("%s", message_hint) : 0,
1058 : message_context ? errcontext("%s", message_context) : 0,
1059 : sql ? errcontext("remote SQL command: %s", sql) : 0));
1060 0 : PQclear(res);
1061 0 : }
1062 :
1063 : /*
1064 : * pgfdw_xact_callback --- cleanup at main-transaction end.
1065 : *
1066 : * This runs just late enough that it must not enter user-defined code
1067 : * locally. (Entering such code on the remote side is fine. Its remote
1068 : * COMMIT TRANSACTION may run deferred triggers.)
1069 : */
1070 : static void
1071 4083 : pgfdw_xact_callback(XactEvent event, void *arg)
1072 : {
1073 : HASH_SEQ_STATUS scan;
1074 : ConnCacheEntry *entry;
1075 4083 : List *pending_entries = NIL;
1076 4083 : List *cancel_requested = NIL;
1077 :
1078 : /* Quick exit if no connections were touched in this transaction. */
1079 4083 : if (!xact_got_connection)
1080 3359 : return;
1081 :
1082 : /*
1083 : * Scan all connection cache entries to find open remote transactions, and
1084 : * close them.
1085 : */
1086 724 : hash_seq_init(&scan, ConnectionHash);
1087 3725 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1088 : {
1089 : PGresult *res;
1090 :
1091 : /* Ignore cache entry if no open connection right now */
1092 3002 : if (entry->conn == NULL)
1093 1709 : continue;
1094 :
1095 : /* If it has an open remote transaction, try to close it */
1096 1293 : if (entry->xact_depth > 0)
1097 : {
1098 755 : elog(DEBUG3, "closing remote transaction on connection %p",
1099 : entry->conn);
1100 :
1101 755 : switch (event)
1102 : {
1103 702 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
1104 : case XACT_EVENT_PRE_COMMIT:
1105 :
1106 : /*
1107 : * If abort cleanup previously failed for this connection,
1108 : * we can't issue any more commands against it.
1109 : */
1110 702 : pgfdw_reject_incomplete_xact_state_change(entry);
1111 :
1112 : /* Commit all remote transactions during pre-commit */
1113 702 : entry->changing_xact_state = true;
1114 702 : if (entry->parallel_commit)
1115 : {
1116 16 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
1117 16 : pending_entries = lappend(pending_entries, entry);
1118 16 : continue;
1119 : }
1120 686 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
1121 686 : entry->changing_xact_state = false;
1122 :
1123 : /*
1124 : * If there were any errors in subtransactions, and we
1125 : * made prepared statements, do a DEALLOCATE ALL to make
1126 : * sure we get rid of all prepared statements. This is
1127 : * annoying and not terribly bulletproof, but it's
1128 : * probably not worth trying harder.
1129 : *
1130 : * DEALLOCATE ALL only exists in 8.3 and later, so this
1131 : * constrains how old a server postgres_fdw can
1132 : * communicate with. We intentionally ignore errors in
1133 : * the DEALLOCATE, so that we can hobble along to some
1134 : * extent with older servers (leaking prepared statements
1135 : * as we go; but we don't really support update operations
1136 : * pre-8.3 anyway).
1137 : */
1138 686 : if (entry->have_prep_stmt && entry->have_error)
1139 : {
1140 0 : res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1141 : NULL);
1142 0 : PQclear(res);
1143 : }
1144 686 : entry->have_prep_stmt = false;
1145 686 : entry->have_error = false;
1146 686 : break;
1147 1 : case XACT_EVENT_PRE_PREPARE:
1148 :
1149 : /*
1150 : * We disallow any remote transactions, since it's not
1151 : * very reasonable to hold them open until the prepared
1152 : * transaction is committed. For the moment, throw error
1153 : * unconditionally; later we might allow read-only cases.
1154 : * Note that the error will cause us to come right back
1155 : * here with event == XACT_EVENT_ABORT, so we'll clean up
1156 : * the connection state at that point.
1157 : */
1158 1 : ereport(ERROR,
1159 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1160 : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1161 : break;
1162 0 : case XACT_EVENT_PARALLEL_COMMIT:
1163 : case XACT_EVENT_COMMIT:
1164 : case XACT_EVENT_PREPARE:
1165 : /* Pre-commit should have closed the open transaction */
1166 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1167 : break;
1168 52 : case XACT_EVENT_PARALLEL_ABORT:
1169 : case XACT_EVENT_ABORT:
1170 : /* Rollback all remote transactions during abort */
1171 52 : if (entry->parallel_abort)
1172 : {
1173 4 : if (pgfdw_abort_cleanup_begin(entry, true,
1174 : &pending_entries,
1175 : &cancel_requested))
1176 4 : continue;
1177 : }
1178 : else
1179 48 : pgfdw_abort_cleanup(entry, true);
1180 48 : break;
1181 : }
1182 : }
1183 :
1184 : /* Reset state to show we're out of a transaction */
1185 1272 : pgfdw_reset_xact_state(entry, true);
1186 : }
1187 :
1188 : /* If there are any pending connections, finish cleaning them up */
1189 723 : if (pending_entries || cancel_requested)
1190 : {
1191 15 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1192 : event == XACT_EVENT_PRE_COMMIT)
1193 : {
1194 : Assert(cancel_requested == NIL);
1195 13 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1196 : }
1197 : else
1198 : {
1199 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1200 : event == XACT_EVENT_ABORT);
1201 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1202 : true);
1203 : }
1204 : }
1205 :
1206 : /*
1207 : * Regardless of the event type, we can now mark ourselves as out of the
1208 : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1209 : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1210 : */
1211 723 : xact_got_connection = false;
1212 :
1213 : /* Also reset cursor numbering for next transaction */
1214 723 : cursor_number = 0;
1215 : }
1216 :
1217 : /*
1218 : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1219 : */
1220 : static void
1221 38 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1222 : SubTransactionId parentSubid, void *arg)
1223 : {
1224 : HASH_SEQ_STATUS scan;
1225 : ConnCacheEntry *entry;
1226 : int curlevel;
1227 38 : List *pending_entries = NIL;
1228 38 : List *cancel_requested = NIL;
1229 :
1230 : /* Nothing to do at subxact start, nor after commit. */
1231 38 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1232 : event == SUBXACT_EVENT_ABORT_SUB))
1233 23 : return;
1234 :
1235 : /* Quick exit if no connections were touched in this transaction. */
1236 15 : if (!xact_got_connection)
1237 0 : return;
1238 :
1239 : /*
1240 : * Scan all connection cache entries to find open remote subtransactions
1241 : * of the current level, and close them.
1242 : */
1243 15 : curlevel = GetCurrentTransactionNestLevel();
1244 15 : hash_seq_init(&scan, ConnectionHash);
1245 102 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1246 : {
1247 : char sql[100];
1248 :
1249 : /*
1250 : * We only care about connections with open remote subtransactions of
1251 : * the current level.
1252 : */
1253 87 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1254 79 : continue;
1255 :
1256 14 : if (entry->xact_depth > curlevel)
1257 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1258 : entry->xact_depth);
1259 :
1260 14 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1261 : {
1262 : /*
1263 : * If abort cleanup previously failed for this connection, we
1264 : * can't issue any more commands against it.
1265 : */
1266 7 : pgfdw_reject_incomplete_xact_state_change(entry);
1267 :
1268 : /* Commit all remote subtransactions during pre-commit */
1269 7 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1270 7 : entry->changing_xact_state = true;
1271 7 : if (entry->parallel_commit)
1272 : {
1273 2 : do_sql_command_begin(entry->conn, sql);
1274 2 : pending_entries = lappend(pending_entries, entry);
1275 2 : continue;
1276 : }
1277 5 : do_sql_command(entry->conn, sql);
1278 5 : entry->changing_xact_state = false;
1279 : }
1280 : else
1281 : {
1282 : /* Rollback all remote subtransactions during abort */
1283 7 : if (entry->parallel_abort)
1284 : {
1285 4 : if (pgfdw_abort_cleanup_begin(entry, false,
1286 : &pending_entries,
1287 : &cancel_requested))
1288 4 : continue;
1289 : }
1290 : else
1291 3 : pgfdw_abort_cleanup(entry, false);
1292 : }
1293 :
1294 : /* OK, we're outta that level of subtransaction */
1295 8 : pgfdw_reset_xact_state(entry, false);
1296 : }
1297 :
1298 : /* If there are any pending connections, finish cleaning them up */
1299 15 : if (pending_entries || cancel_requested)
1300 : {
1301 3 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1302 : {
1303 : Assert(cancel_requested == NIL);
1304 1 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1305 : }
1306 : else
1307 : {
1308 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1309 2 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1310 : false);
1311 : }
1312 : }
1313 : }
1314 :
1315 : /*
1316 : * Connection invalidation callback function
1317 : *
1318 : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1319 : * close connections depending on that entry immediately if current transaction
1320 : * has not used those connections yet. Otherwise, mark those connections as
1321 : * invalid and then make pgfdw_xact_callback() close them at the end of current
1322 : * transaction, since they cannot be closed in the midst of the transaction
1323 : * using them. Closed connections will be remade at the next opportunity if
1324 : * necessary.
1325 : *
1326 : * Although most cache invalidation callbacks blow away all the related stuff
1327 : * regardless of the given hashvalue, connections are expensive enough that
1328 : * it's worth trying to avoid that.
1329 : *
1330 : * NB: We could avoid unnecessary disconnection more strictly by examining
1331 : * individual option values, but it seems too much effort for the gain.
1332 : */
1333 : static void
1334 188 : pgfdw_inval_callback(Datum arg, SysCacheIdentifier cacheid, uint32 hashvalue)
1335 : {
1336 : HASH_SEQ_STATUS scan;
1337 : ConnCacheEntry *entry;
1338 :
1339 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1340 :
1341 : /* ConnectionHash must exist already, if we're registered */
1342 188 : hash_seq_init(&scan, ConnectionHash);
1343 1222 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1344 : {
1345 : /* Ignore invalid entries */
1346 1034 : if (entry->conn == NULL)
1347 839 : continue;
1348 :
1349 : /* hashvalue == 0 means a cache reset, must clear all state */
1350 195 : if (hashvalue == 0 ||
1351 139 : (cacheid == FOREIGNSERVEROID &&
1352 195 : entry->server_hashvalue == hashvalue) ||
1353 56 : (cacheid == USERMAPPINGOID &&
1354 56 : entry->mapping_hashvalue == hashvalue))
1355 : {
1356 : /*
1357 : * Close the connection immediately if it's not used yet in this
1358 : * transaction. Otherwise mark it as invalid so that
1359 : * pgfdw_xact_callback() can close it at the end of this
1360 : * transaction.
1361 : */
1362 58 : if (entry->xact_depth == 0)
1363 : {
1364 55 : elog(DEBUG3, "discarding connection %p", entry->conn);
1365 55 : disconnect_pg_server(entry);
1366 : }
1367 : else
1368 3 : entry->invalidated = true;
1369 : }
1370 : }
1371 188 : }
1372 :
1373 : /*
1374 : * Raise an error if the given connection cache entry is marked as being
1375 : * in the middle of an xact state change. This should be called at which no
1376 : * such change is expected to be in progress; if one is found to be in
1377 : * progress, it means that we aborted in the middle of a previous state change
1378 : * and now don't know what the remote transaction state actually is.
1379 : * Such connections can't safely be further used. Re-establishing the
1380 : * connection would change the snapshot and roll back any writes already
1381 : * performed, so that's not an option, either. Thus, we must abort.
1382 : */
1383 : static void
1384 2956 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1385 : {
1386 : ForeignServer *server;
1387 :
1388 : /* nothing to do for inactive entries and entries of sane state */
1389 2956 : if (entry->conn == NULL || !entry->changing_xact_state)
1390 2956 : return;
1391 :
1392 : /* make sure this entry is inactive */
1393 0 : disconnect_pg_server(entry);
1394 :
1395 : /* find server name to be shown in the message below */
1396 0 : server = GetForeignServer(entry->serverid);
1397 :
1398 0 : ereport(ERROR,
1399 : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1400 : errmsg("connection to server \"%s\" was lost",
1401 : server->servername)));
1402 : }
1403 :
1404 : /*
1405 : * Reset state to show we're out of a (sub)transaction.
1406 : */
1407 : static void
1408 1306 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1409 : {
1410 1306 : if (toplevel)
1411 : {
1412 : /* Reset state to show we're out of a transaction */
1413 1292 : entry->xact_depth = 0;
1414 :
1415 : /*
1416 : * If the connection isn't in a good idle state, it is marked as
1417 : * invalid or keep_connections option of its server is disabled, then
1418 : * discard it to recover. Next GetConnection will open a new
1419 : * connection.
1420 : */
1421 2583 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1422 1291 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1423 1291 : entry->changing_xact_state ||
1424 1291 : entry->invalidated ||
1425 1289 : !entry->keep_connections)
1426 : {
1427 4 : elog(DEBUG3, "discarding connection %p", entry->conn);
1428 4 : disconnect_pg_server(entry);
1429 : }
1430 : }
1431 : else
1432 : {
1433 : /* Reset state to show we're out of a subtransaction */
1434 14 : entry->xact_depth--;
1435 : }
1436 1306 : }
1437 :
1438 : /*
1439 : * Cancel the currently-in-progress query (whose query text we do not have)
1440 : * and ignore the result. Returns true if we successfully cancel the query
1441 : * and discard any pending result, and false if not.
1442 : *
1443 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1444 : * recursion trouble, we'll end up slamming the connection shut, which will
1445 : * necessitate failing the entire toplevel transaction even if subtransactions
1446 : * were used. Try to use WARNING where we can.
1447 : *
1448 : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1449 : * query text from the pendingAreq saved in the per-connection state, then
1450 : * report the query using it.
1451 : */
1452 : static bool
1453 2 : pgfdw_cancel_query(PGconn *conn)
1454 : {
1455 2 : TimestampTz now = GetCurrentTimestamp();
1456 : TimestampTz endtime;
1457 : TimestampTz retrycanceltime;
1458 :
1459 : /*
1460 : * If it takes too long to cancel the query and discard the result, assume
1461 : * the connection is dead.
1462 : */
1463 2 : endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
1464 :
1465 : /*
1466 : * Also, lose patience and re-issue the cancel request after a little bit.
1467 : * (This serves to close some race conditions.)
1468 : */
1469 2 : retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
1470 :
1471 2 : if (!pgfdw_cancel_query_begin(conn, endtime))
1472 0 : return false;
1473 2 : return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
1474 : }
1475 :
1476 : /*
1477 : * Submit a cancel request to the given connection, waiting only until
1478 : * the given time.
1479 : *
1480 : * We sleep interruptibly until we receive confirmation that the cancel
1481 : * request has been accepted, and if it is, return true; if the timeout
1482 : * lapses without that, or the request fails for whatever reason, return
1483 : * false.
1484 : */
1485 : static bool
1486 2 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1487 : {
1488 2 : const char *errormsg = libpqsrv_cancel(conn, endtime);
1489 :
1490 2 : if (errormsg != NULL)
1491 0 : ereport(WARNING,
1492 : errcode(ERRCODE_CONNECTION_FAILURE),
1493 : errmsg("could not send cancel request: %s", errormsg));
1494 :
1495 2 : return errormsg == NULL;
1496 : }
1497 :
1498 : static bool
1499 2 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
1500 : TimestampTz retrycanceltime, bool consume_input)
1501 : {
1502 : PGresult *result;
1503 : bool timed_out;
1504 :
1505 : /*
1506 : * If requested, consume whatever data is available from the socket. (Note
1507 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1508 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1509 : * which would be large compared to the overhead of PQconsumeInput.)
1510 : */
1511 2 : if (consume_input && !PQconsumeInput(conn))
1512 : {
1513 0 : ereport(WARNING,
1514 : (errcode(ERRCODE_CONNECTION_FAILURE),
1515 : errmsg("could not get result of cancel request: %s",
1516 : pchomp(PQerrorMessage(conn)))));
1517 0 : return false;
1518 : }
1519 :
1520 : /* Get and discard the result of the query. */
1521 2 : if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
1522 : &result, &timed_out))
1523 : {
1524 0 : if (timed_out)
1525 0 : ereport(WARNING,
1526 : (errmsg("could not get result of cancel request due to timeout")));
1527 : else
1528 0 : ereport(WARNING,
1529 : (errcode(ERRCODE_CONNECTION_FAILURE),
1530 : errmsg("could not get result of cancel request: %s",
1531 : pchomp(PQerrorMessage(conn)))));
1532 :
1533 0 : return false;
1534 : }
1535 2 : PQclear(result);
1536 :
1537 2 : return true;
1538 : }
1539 :
1540 : /*
1541 : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1542 : * result. If the query is executed without error, the return value is true.
1543 : * If the query is executed successfully but returns an error, the return
1544 : * value is true if and only if ignore_errors is set. If the query can't be
1545 : * sent or times out, the return value is false.
1546 : *
1547 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1548 : * recursion trouble, we'll end up slamming the connection shut, which will
1549 : * necessitate failing the entire toplevel transaction even if subtransactions
1550 : * were used. Try to use WARNING where we can.
1551 : */
1552 : static bool
1553 75 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1554 : {
1555 : TimestampTz endtime;
1556 :
1557 : /*
1558 : * If it takes too long to execute a cleanup query, assume the connection
1559 : * is dead. It's fairly likely that this is why we aborted in the first
1560 : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1561 : * be too long.
1562 : */
1563 75 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1564 : CONNECTION_CLEANUP_TIMEOUT);
1565 :
1566 75 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
1567 0 : return false;
1568 75 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1569 : false, ignore_errors);
1570 : }
1571 :
1572 : static bool
1573 87 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1574 : {
1575 : Assert(query != NULL);
1576 :
1577 : /*
1578 : * Submit a query. Since we don't use non-blocking mode, this also can
1579 : * block. But its risk is relatively small, so we ignore that for now.
1580 : */
1581 87 : if (!PQsendQuery(conn, query))
1582 : {
1583 0 : pgfdw_report(WARNING, NULL, conn, query);
1584 0 : return false;
1585 : }
1586 :
1587 87 : return true;
1588 : }
1589 :
1590 : static bool
1591 87 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1592 : TimestampTz endtime, bool consume_input,
1593 : bool ignore_errors)
1594 : {
1595 : PGresult *result;
1596 : bool timed_out;
1597 :
1598 : Assert(query != NULL);
1599 :
1600 : /*
1601 : * If requested, consume whatever data is available from the socket. (Note
1602 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1603 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1604 : * which would be large compared to the overhead of PQconsumeInput.)
1605 : */
1606 87 : if (consume_input && !PQconsumeInput(conn))
1607 : {
1608 0 : pgfdw_report(WARNING, NULL, conn, query);
1609 0 : return false;
1610 : }
1611 :
1612 : /* Get the result of the query. */
1613 87 : if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
1614 : {
1615 0 : if (timed_out)
1616 0 : ereport(WARNING,
1617 : (errmsg("could not get query result due to timeout"),
1618 : errcontext("remote SQL command: %s", query)));
1619 : else
1620 0 : pgfdw_report(WARNING, NULL, conn, query);
1621 :
1622 0 : return false;
1623 : }
1624 :
1625 : /* Issue a warning if not successful. */
1626 87 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1627 : {
1628 0 : pgfdw_report(WARNING, result, conn, query);
1629 0 : return ignore_errors;
1630 : }
1631 87 : PQclear(result);
1632 :
1633 87 : return true;
1634 : }
1635 :
1636 : /*
1637 : * Get, during abort cleanup, the result of a query that is in progress.
1638 : * This might be a query that is being interrupted by a cancel request or by
1639 : * transaction abort, or it might be a query that was initiated as part of
1640 : * transaction abort to get the remote side back to the appropriate state.
1641 : *
1642 : * endtime is the time at which we should give up and assume the remote side
1643 : * is dead. retrycanceltime is the time at which we should issue a fresh
1644 : * cancel request (pass the same value as endtime if this is not wanted).
1645 : *
1646 : * Returns true if the timeout expired or connection trouble occurred,
1647 : * false otherwise. Sets *result except in case of a true result.
1648 : * Sets *timed_out to true only when the timeout expired.
1649 : */
1650 : static bool
1651 89 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
1652 : TimestampTz retrycanceltime,
1653 : PGresult **result,
1654 : bool *timed_out)
1655 : {
1656 89 : bool failed = false;
1657 89 : PGresult *last_res = NULL;
1658 89 : int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
1659 :
1660 89 : *result = NULL;
1661 89 : *timed_out = false;
1662 : for (;;)
1663 96 : {
1664 : PGresult *res;
1665 :
1666 264 : while (PQisBusy(conn))
1667 : {
1668 : int wc;
1669 79 : TimestampTz now = GetCurrentTimestamp();
1670 : long cur_timeout;
1671 :
1672 : /* If timeout has expired, give up. */
1673 79 : if (now >= endtime)
1674 : {
1675 0 : *timed_out = true;
1676 0 : failed = true;
1677 0 : goto exit;
1678 : }
1679 :
1680 : /* If we need to re-issue the cancel request, do that. */
1681 79 : if (now >= retrycanceltime)
1682 : {
1683 : /* We ignore failure to issue the repeated request. */
1684 0 : (void) libpqsrv_cancel(conn, endtime);
1685 :
1686 : /* Recompute "now" in case that took measurable time. */
1687 0 : now = GetCurrentTimestamp();
1688 :
1689 : /* Adjust re-cancel timeout in increasing steps. */
1690 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
1691 : canceldelta);
1692 0 : canceldelta += canceldelta;
1693 : }
1694 :
1695 : /* If timeout has expired, give up, else get sleep time. */
1696 79 : cur_timeout = TimestampDifferenceMilliseconds(now,
1697 : Min(endtime,
1698 : retrycanceltime));
1699 79 : if (cur_timeout <= 0)
1700 : {
1701 0 : *timed_out = true;
1702 0 : failed = true;
1703 0 : goto exit;
1704 : }
1705 :
1706 : /* first time, allocate or get the custom wait event */
1707 79 : if (pgfdw_we_cleanup_result == 0)
1708 2 : pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1709 :
1710 : /* Sleep until there's something to do */
1711 79 : wc = WaitLatchOrSocket(MyLatch,
1712 : WL_LATCH_SET | WL_SOCKET_READABLE |
1713 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1714 : PQsocket(conn),
1715 : cur_timeout, pgfdw_we_cleanup_result);
1716 79 : ResetLatch(MyLatch);
1717 :
1718 79 : CHECK_FOR_INTERRUPTS();
1719 :
1720 : /* Data available in socket? */
1721 79 : if (wc & WL_SOCKET_READABLE)
1722 : {
1723 79 : if (!PQconsumeInput(conn))
1724 : {
1725 : /* connection trouble */
1726 0 : failed = true;
1727 0 : goto exit;
1728 : }
1729 : }
1730 : }
1731 :
1732 185 : res = PQgetResult(conn);
1733 185 : if (res == NULL)
1734 89 : break; /* query is complete */
1735 :
1736 96 : PQclear(last_res);
1737 96 : last_res = res;
1738 : }
1739 89 : exit:
1740 89 : if (failed)
1741 0 : PQclear(last_res);
1742 : else
1743 89 : *result = last_res;
1744 89 : return failed;
1745 : }
1746 :
1747 : /*
1748 : * Abort remote transaction or subtransaction.
1749 : *
1750 : * "toplevel" should be set to true if toplevel (main) transaction is
1751 : * rollbacked, false otherwise.
1752 : *
1753 : * Set entry->changing_xact_state to false on success, true on failure.
1754 : */
1755 : static void
1756 51 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1757 : {
1758 : char sql[100];
1759 :
1760 : /*
1761 : * Don't try to clean up the connection if we're already in error
1762 : * recursion trouble.
1763 : */
1764 51 : if (in_error_recursion_trouble())
1765 0 : entry->changing_xact_state = true;
1766 :
1767 : /*
1768 : * If connection is already unsalvageable, don't touch it further.
1769 : */
1770 51 : if (entry->changing_xact_state)
1771 1 : return;
1772 :
1773 : /*
1774 : * Mark this connection as in the process of changing transaction state.
1775 : */
1776 50 : entry->changing_xact_state = true;
1777 :
1778 : /* Assume we might have lost track of prepared statements */
1779 50 : entry->have_error = true;
1780 :
1781 : /*
1782 : * If a command has been submitted to the remote server by using an
1783 : * asynchronous execution function, the command might not have yet
1784 : * completed. Check to see if a command is still being processed by the
1785 : * remote server, and if so, request cancellation of the command.
1786 : */
1787 50 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1788 2 : !pgfdw_cancel_query(entry->conn))
1789 0 : return; /* Unable to cancel running query */
1790 :
1791 50 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1792 50 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1793 0 : return; /* Unable to abort remote (sub)transaction */
1794 :
1795 50 : if (toplevel)
1796 : {
1797 47 : if (entry->have_prep_stmt && entry->have_error &&
1798 25 : !pgfdw_exec_cleanup_query(entry->conn,
1799 : "DEALLOCATE ALL",
1800 : true))
1801 0 : return; /* Trouble clearing prepared statements */
1802 :
1803 47 : entry->have_prep_stmt = false;
1804 47 : entry->have_error = false;
1805 : }
1806 :
1807 : /*
1808 : * If pendingAreq of the per-connection state is not NULL, it means that
1809 : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1810 : * successfully and thus the per-connection state was not reset in
1811 : * fetch_more_data(); in that case reset the per-connection state here.
1812 : */
1813 50 : if (entry->state.pendingAreq)
1814 1 : memset(&entry->state, 0, sizeof(entry->state));
1815 :
1816 : /* Disarm changing_xact_state if it all worked */
1817 50 : entry->changing_xact_state = false;
1818 : }
1819 :
1820 : /*
1821 : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1822 : * don't wait for the result.
1823 : *
1824 : * Returns true if the abort command or cancel request is successfully issued,
1825 : * false otherwise. If the abort command is successfully issued, the given
1826 : * connection cache entry is appended to *pending_entries. Otherwise, if the
1827 : * cancel request is successfully issued, it is appended to *cancel_requested.
1828 : */
1829 : static bool
1830 8 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1831 : List **pending_entries, List **cancel_requested)
1832 : {
1833 : /*
1834 : * Don't try to clean up the connection if we're already in error
1835 : * recursion trouble.
1836 : */
1837 8 : if (in_error_recursion_trouble())
1838 0 : entry->changing_xact_state = true;
1839 :
1840 : /*
1841 : * If connection is already unsalvageable, don't touch it further.
1842 : */
1843 8 : if (entry->changing_xact_state)
1844 0 : return false;
1845 :
1846 : /*
1847 : * Mark this connection as in the process of changing transaction state.
1848 : */
1849 8 : entry->changing_xact_state = true;
1850 :
1851 : /* Assume we might have lost track of prepared statements */
1852 8 : entry->have_error = true;
1853 :
1854 : /*
1855 : * If a command has been submitted to the remote server by using an
1856 : * asynchronous execution function, the command might not have yet
1857 : * completed. Check to see if a command is still being processed by the
1858 : * remote server, and if so, request cancellation of the command.
1859 : */
1860 8 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1861 : {
1862 : TimestampTz endtime;
1863 :
1864 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1865 : CONNECTION_CLEANUP_TIMEOUT);
1866 0 : if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1867 0 : return false; /* Unable to cancel running query */
1868 0 : *cancel_requested = lappend(*cancel_requested, entry);
1869 : }
1870 : else
1871 : {
1872 : char sql[100];
1873 :
1874 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1875 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1876 0 : return false; /* Unable to abort remote transaction */
1877 8 : *pending_entries = lappend(*pending_entries, entry);
1878 : }
1879 :
1880 8 : return true;
1881 : }
1882 :
1883 : /*
1884 : * Finish pre-commit cleanup of connections on each of which we've sent a
1885 : * COMMIT command to the remote server.
1886 : */
1887 : static void
1888 13 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1889 : {
1890 : ConnCacheEntry *entry;
1891 13 : List *pending_deallocs = NIL;
1892 : ListCell *lc;
1893 :
1894 : Assert(pending_entries);
1895 :
1896 : /*
1897 : * Get the result of the COMMIT command for each of the pending entries
1898 : */
1899 29 : foreach(lc, pending_entries)
1900 : {
1901 16 : entry = (ConnCacheEntry *) lfirst(lc);
1902 :
1903 : Assert(entry->changing_xact_state);
1904 :
1905 : /*
1906 : * We might already have received the result on the socket, so pass
1907 : * consume_input=true to try to consume it first
1908 : */
1909 16 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1910 16 : entry->changing_xact_state = false;
1911 :
1912 : /* Do a DEALLOCATE ALL in parallel if needed */
1913 16 : if (entry->have_prep_stmt && entry->have_error)
1914 : {
1915 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1916 2 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1917 : {
1918 2 : pending_deallocs = lappend(pending_deallocs, entry);
1919 2 : continue;
1920 : }
1921 : }
1922 14 : entry->have_prep_stmt = false;
1923 14 : entry->have_error = false;
1924 :
1925 14 : pgfdw_reset_xact_state(entry, true);
1926 : }
1927 :
1928 : /* No further work if no pending entries */
1929 13 : if (!pending_deallocs)
1930 12 : return;
1931 :
1932 : /*
1933 : * Get the result of the DEALLOCATE command for each of the pending
1934 : * entries
1935 : */
1936 3 : foreach(lc, pending_deallocs)
1937 : {
1938 : PGresult *res;
1939 :
1940 2 : entry = (ConnCacheEntry *) lfirst(lc);
1941 :
1942 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1943 4 : while ((res = PQgetResult(entry->conn)) != NULL)
1944 : {
1945 2 : PQclear(res);
1946 : /* Stop if the connection is lost (else we'll loop infinitely) */
1947 2 : if (PQstatus(entry->conn) == CONNECTION_BAD)
1948 0 : break;
1949 : }
1950 2 : entry->have_prep_stmt = false;
1951 2 : entry->have_error = false;
1952 :
1953 2 : pgfdw_reset_xact_state(entry, true);
1954 : }
1955 : }
1956 :
1957 : /*
1958 : * Finish pre-subcommit cleanup of connections on each of which we've sent a
1959 : * RELEASE command to the remote server.
1960 : */
1961 : static void
1962 1 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1963 : {
1964 : ConnCacheEntry *entry;
1965 : char sql[100];
1966 : ListCell *lc;
1967 :
1968 : Assert(pending_entries);
1969 :
1970 : /*
1971 : * Get the result of the RELEASE command for each of the pending entries
1972 : */
1973 1 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1974 3 : foreach(lc, pending_entries)
1975 : {
1976 2 : entry = (ConnCacheEntry *) lfirst(lc);
1977 :
1978 : Assert(entry->changing_xact_state);
1979 :
1980 : /*
1981 : * We might already have received the result on the socket, so pass
1982 : * consume_input=true to try to consume it first
1983 : */
1984 2 : do_sql_command_end(entry->conn, sql, true);
1985 2 : entry->changing_xact_state = false;
1986 :
1987 2 : pgfdw_reset_xact_state(entry, false);
1988 : }
1989 1 : }
1990 :
1991 : /*
1992 : * Finish abort cleanup of connections on each of which we've sent an abort
1993 : * command or cancel request to the remote server.
1994 : */
1995 : static void
1996 4 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1997 : bool toplevel)
1998 : {
1999 4 : List *pending_deallocs = NIL;
2000 : ListCell *lc;
2001 :
2002 : /*
2003 : * For each of the pending cancel requests (if any), get and discard the
2004 : * result of the query, and submit an abort command to the remote server.
2005 : */
2006 4 : if (cancel_requested)
2007 : {
2008 0 : foreach(lc, cancel_requested)
2009 : {
2010 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2011 0 : TimestampTz now = GetCurrentTimestamp();
2012 : TimestampTz endtime;
2013 : TimestampTz retrycanceltime;
2014 : char sql[100];
2015 :
2016 : Assert(entry->changing_xact_state);
2017 :
2018 : /*
2019 : * Set end time. You might think we should do this before issuing
2020 : * cancel request like in normal mode, but that is problematic,
2021 : * because if, for example, it took longer than 30 seconds to
2022 : * process the first few entries in the cancel_requested list, it
2023 : * would cause a timeout error when processing each of the
2024 : * remaining entries in the list, leading to slamming that entry's
2025 : * connection shut.
2026 : */
2027 0 : endtime = TimestampTzPlusMilliseconds(now,
2028 : CONNECTION_CLEANUP_TIMEOUT);
2029 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
2030 : RETRY_CANCEL_TIMEOUT);
2031 :
2032 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime,
2033 : retrycanceltime, true))
2034 : {
2035 : /* Unable to cancel running query */
2036 0 : pgfdw_reset_xact_state(entry, toplevel);
2037 0 : continue;
2038 : }
2039 :
2040 : /* Send an abort command in parallel if needed */
2041 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2042 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
2043 : {
2044 : /* Unable to abort remote (sub)transaction */
2045 0 : pgfdw_reset_xact_state(entry, toplevel);
2046 : }
2047 : else
2048 0 : pending_entries = lappend(pending_entries, entry);
2049 : }
2050 : }
2051 :
2052 : /* No further work if no pending entries */
2053 4 : if (!pending_entries)
2054 0 : return;
2055 :
2056 : /*
2057 : * Get the result of the abort command for each of the pending entries
2058 : */
2059 12 : foreach(lc, pending_entries)
2060 : {
2061 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2062 : TimestampTz endtime;
2063 : char sql[100];
2064 :
2065 : Assert(entry->changing_xact_state);
2066 :
2067 : /*
2068 : * Set end time. We do this now, not before issuing the command like
2069 : * in normal mode, for the same reason as for the cancel_requested
2070 : * entries.
2071 : */
2072 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2073 : CONNECTION_CLEANUP_TIMEOUT);
2074 :
2075 8 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2076 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
2077 : true, false))
2078 : {
2079 : /* Unable to abort remote (sub)transaction */
2080 0 : pgfdw_reset_xact_state(entry, toplevel);
2081 4 : continue;
2082 : }
2083 :
2084 8 : if (toplevel)
2085 : {
2086 : /* Do a DEALLOCATE ALL in parallel if needed */
2087 4 : if (entry->have_prep_stmt && entry->have_error)
2088 : {
2089 4 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
2090 : "DEALLOCATE ALL"))
2091 : {
2092 : /* Trouble clearing prepared statements */
2093 0 : pgfdw_reset_xact_state(entry, toplevel);
2094 : }
2095 : else
2096 4 : pending_deallocs = lappend(pending_deallocs, entry);
2097 4 : continue;
2098 : }
2099 0 : entry->have_prep_stmt = false;
2100 0 : entry->have_error = false;
2101 : }
2102 :
2103 : /* Reset the per-connection state if needed */
2104 4 : if (entry->state.pendingAreq)
2105 0 : memset(&entry->state, 0, sizeof(entry->state));
2106 :
2107 : /* We're done with this entry; unset the changing_xact_state flag */
2108 4 : entry->changing_xact_state = false;
2109 4 : pgfdw_reset_xact_state(entry, toplevel);
2110 : }
2111 :
2112 : /* No further work if no pending entries */
2113 4 : if (!pending_deallocs)
2114 2 : return;
2115 : Assert(toplevel);
2116 :
2117 : /*
2118 : * Get the result of the DEALLOCATE command for each of the pending
2119 : * entries
2120 : */
2121 6 : foreach(lc, pending_deallocs)
2122 : {
2123 4 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2124 : TimestampTz endtime;
2125 :
2126 : Assert(entry->changing_xact_state);
2127 : Assert(entry->have_prep_stmt);
2128 : Assert(entry->have_error);
2129 :
2130 : /*
2131 : * Set end time. We do this now, not before issuing the command like
2132 : * in normal mode, for the same reason as for the cancel_requested
2133 : * entries.
2134 : */
2135 4 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2136 : CONNECTION_CLEANUP_TIMEOUT);
2137 :
2138 4 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
2139 : endtime, true, true))
2140 : {
2141 : /* Trouble clearing prepared statements */
2142 0 : pgfdw_reset_xact_state(entry, toplevel);
2143 0 : continue;
2144 : }
2145 4 : entry->have_prep_stmt = false;
2146 4 : entry->have_error = false;
2147 :
2148 : /* Reset the per-connection state if needed */
2149 4 : if (entry->state.pendingAreq)
2150 0 : memset(&entry->state, 0, sizeof(entry->state));
2151 :
2152 : /* We're done with this entry; unset the changing_xact_state flag */
2153 4 : entry->changing_xact_state = false;
2154 4 : pgfdw_reset_xact_state(entry, toplevel);
2155 : }
2156 : }
2157 :
2158 : /* Number of output arguments (columns) for various API versions */
2159 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2160 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 6
2161 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 6 /* maximum of above */
2162 :
2163 : /*
2164 : * Internal function used by postgres_fdw_get_connections variants.
2165 : *
2166 : * For API version 1.1, this function takes no input parameter and
2167 : * returns a set of records with the following values:
2168 : *
2169 : * - server_name - server name of active connection. In case the foreign server
2170 : * is dropped but still the connection is active, then the server name will
2171 : * be NULL in output.
2172 : * - valid - true/false representing whether the connection is valid or not.
2173 : * Note that connections can become invalid in pgfdw_inval_callback.
2174 : *
2175 : * For API version 1.2 and later, this function takes an input parameter
2176 : * to check a connection status and returns the following
2177 : * additional values along with the four values from version 1.1:
2178 : *
2179 : * - user_name - the local user name of the active connection. In case the
2180 : * user mapping is dropped but the connection is still active, then the
2181 : * user name will be NULL in the output.
2182 : * - used_in_xact - true if the connection is used in the current transaction.
2183 : * - closed - true if the connection is closed.
2184 : * - remote_backend_pid - process ID of the remote backend, on the foreign
2185 : * server, handling the connection.
2186 : *
2187 : * No records are returned when there are no cached connections at all.
2188 : */
2189 : static void
2190 13 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
2191 : enum pgfdwVersion api_version)
2192 : {
2193 13 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2194 : HASH_SEQ_STATUS scan;
2195 : ConnCacheEntry *entry;
2196 :
2197 13 : InitMaterializedSRF(fcinfo, 0);
2198 :
2199 : /* If cache doesn't exist, we return no records */
2200 13 : if (!ConnectionHash)
2201 0 : return;
2202 :
2203 : /* Check we have the expected number of output arguments */
2204 13 : switch (rsinfo->setDesc->natts)
2205 : {
2206 0 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
2207 0 : if (api_version != PGFDW_V1_1)
2208 0 : elog(ERROR, "incorrect number of output arguments");
2209 0 : break;
2210 13 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
2211 13 : if (api_version != PGFDW_V1_2)
2212 0 : elog(ERROR, "incorrect number of output arguments");
2213 13 : break;
2214 0 : default:
2215 0 : elog(ERROR, "incorrect number of output arguments");
2216 : }
2217 :
2218 13 : hash_seq_init(&scan, ConnectionHash);
2219 113 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2220 : {
2221 : ForeignServer *server;
2222 100 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2223 100 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2224 100 : int i = 0;
2225 :
2226 : /* We only look for open remote connections */
2227 100 : if (!entry->conn)
2228 87 : continue;
2229 :
2230 13 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2231 :
2232 : /*
2233 : * The foreign server may have been dropped in current explicit
2234 : * transaction. It is not possible to drop the server from another
2235 : * session when the connection associated with it is in use in the
2236 : * current transaction, if tried so, the drop query in another session
2237 : * blocks until the current transaction finishes.
2238 : *
2239 : * Even though the server is dropped in the current transaction, the
2240 : * cache can still have associated active connection entry, say we
2241 : * call such connections dangling. Since we can not fetch the server
2242 : * name from system catalogs for dangling connections, instead we show
2243 : * NULL value for server name in output.
2244 : *
2245 : * We could have done better by storing the server name in the cache
2246 : * entry instead of server oid so that it could be used in the output.
2247 : * But the server name in each cache entry requires 64 bytes of
2248 : * memory, which is huge, when there are many cached connections and
2249 : * the use case i.e. dropping the foreign server within the explicit
2250 : * current transaction seems rare. So, we chose to show NULL value for
2251 : * server name in output.
2252 : *
2253 : * Such dangling connections get closed either in next use or at the
2254 : * end of current explicit transaction in pgfdw_xact_callback.
2255 : */
2256 13 : if (!server)
2257 : {
2258 : /*
2259 : * If the server has been dropped in the current explicit
2260 : * transaction, then this entry would have been invalidated in
2261 : * pgfdw_inval_callback at the end of drop server command. Note
2262 : * that this connection would not have been closed in
2263 : * pgfdw_inval_callback because it is still being used in the
2264 : * current explicit transaction. So, assert that here.
2265 : */
2266 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2267 :
2268 : /* Show null, if no server name was found */
2269 1 : nulls[i++] = true;
2270 : }
2271 : else
2272 12 : values[i++] = CStringGetTextDatum(server->servername);
2273 :
2274 13 : if (api_version >= PGFDW_V1_2)
2275 : {
2276 : HeapTuple tp;
2277 :
2278 : /* Use the system cache to obtain the user mapping */
2279 13 : tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
2280 :
2281 : /*
2282 : * Just like in the foreign server case, user mappings can also be
2283 : * dropped in the current explicit transaction. Therefore, the
2284 : * similar check as in the server case is required.
2285 : */
2286 13 : if (!HeapTupleIsValid(tp))
2287 : {
2288 : /*
2289 : * If we reach here, this entry must have been invalidated in
2290 : * pgfdw_inval_callback, same as in the server case.
2291 : */
2292 : Assert(entry->conn && entry->xact_depth > 0 &&
2293 : entry->invalidated);
2294 :
2295 1 : nulls[i++] = true;
2296 : }
2297 : else
2298 : {
2299 : Oid userid;
2300 :
2301 12 : userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2302 12 : values[i++] = CStringGetTextDatum(MappingUserName(userid));
2303 12 : ReleaseSysCache(tp);
2304 : }
2305 : }
2306 :
2307 13 : values[i++] = BoolGetDatum(!entry->invalidated);
2308 :
2309 13 : if (api_version >= PGFDW_V1_2)
2310 : {
2311 13 : bool check_conn = PG_GETARG_BOOL(0);
2312 :
2313 : /* Is this connection used in the current transaction? */
2314 13 : values[i++] = BoolGetDatum(entry->xact_depth > 0);
2315 :
2316 : /*
2317 : * If a connection status check is requested and supported, return
2318 : * whether the connection is closed. Otherwise, return NULL.
2319 : */
2320 13 : if (check_conn && pgfdw_conn_checkable())
2321 2 : values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2322 : else
2323 11 : nulls[i++] = true;
2324 :
2325 : /* Return process ID of remote backend */
2326 13 : values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
2327 : }
2328 :
2329 13 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2330 : }
2331 : }
2332 :
2333 : /*
2334 : * Values in connection strings must be enclosed in single quotes. Single
2335 : * quotes and backslashes must be escaped with backslash. NB: these rules are
2336 : * different from the rules for escaping a SQL literal.
2337 : */
2338 : static void
2339 70 : appendEscapedValue(StringInfo str, const char *val)
2340 : {
2341 70 : appendStringInfoChar(str, '\'');
2342 508 : for (int i = 0; val[i] != '\0'; i++)
2343 : {
2344 438 : if (val[i] == '\\' || val[i] == '\'')
2345 0 : appendStringInfoChar(str, '\\');
2346 438 : appendStringInfoChar(str, val[i]);
2347 : }
2348 70 : appendStringInfoChar(str, '\'');
2349 70 : }
2350 :
2351 : Datum
2352 6 : postgres_fdw_connection(PG_FUNCTION_ARGS)
2353 : {
2354 6 : Oid userid = PG_GETARG_OID(0);
2355 6 : Oid serverid = PG_GETARG_OID(1);
2356 6 : ForeignServer *server = GetForeignServer(serverid);
2357 6 : UserMapping *user = GetUserMapping(userid, serverid);
2358 : StringInfoData str;
2359 : const char **keywords;
2360 : const char **values;
2361 : char *appname;
2362 6 : char *sep = "";
2363 :
2364 6 : construct_connection_params(server, user, &keywords, &values, &appname);
2365 :
2366 6 : initStringInfo(&str);
2367 76 : for (int i = 0; keywords[i] != NULL; i++)
2368 : {
2369 70 : if (values[i] == NULL)
2370 0 : continue;
2371 70 : appendStringInfo(&str, "%s%s = ", sep, keywords[i]);
2372 70 : appendEscapedValue(&str, values[i]);
2373 70 : sep = " ";
2374 : }
2375 :
2376 6 : if (appname != NULL)
2377 2 : pfree(appname);
2378 6 : pfree(keywords);
2379 6 : pfree(values);
2380 6 : PG_RETURN_TEXT_P(cstring_to_text(str.data));
2381 : }
2382 :
2383 : /*
2384 : * List active foreign server connections.
2385 : *
2386 : * The SQL API of this function has changed multiple times, and will likely
2387 : * do so again in future. To support the case where a newer version of this
2388 : * loadable module is being used with an old SQL declaration of the function,
2389 : * we continue to support the older API versions.
2390 : */
2391 : Datum
2392 13 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
2393 : {
2394 13 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
2395 :
2396 13 : PG_RETURN_VOID();
2397 : }
2398 :
2399 : Datum
2400 0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
2401 : {
2402 0 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
2403 :
2404 0 : PG_RETURN_VOID();
2405 : }
2406 :
2407 : /*
2408 : * Disconnect the specified cached connections.
2409 : *
2410 : * This function discards the open connections that are established by
2411 : * postgres_fdw from the local session to the foreign server with
2412 : * the given name. Note that there can be multiple connections to
2413 : * the given server using different user mappings. If the connections
2414 : * are used in the current local transaction, they are not disconnected
2415 : * and warning messages are reported. This function returns true
2416 : * if it disconnects at least one connection, otherwise false. If no
2417 : * foreign server with the given name is found, an error is reported.
2418 : */
2419 : Datum
2420 4 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2421 : {
2422 : ForeignServer *server;
2423 : char *servername;
2424 :
2425 4 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2426 4 : server = GetForeignServerByName(servername, false);
2427 :
2428 3 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2429 : }
2430 :
2431 : /*
2432 : * Disconnect all the cached connections.
2433 : *
2434 : * This function discards all the open connections that are established by
2435 : * postgres_fdw from the local session to the foreign servers.
2436 : * If the connections are used in the current local transaction, they are
2437 : * not disconnected and warning messages are reported. This function
2438 : * returns true if it disconnects at least one connection, otherwise false.
2439 : */
2440 : Datum
2441 5 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2442 : {
2443 5 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2444 : }
2445 :
2446 : /*
2447 : * Workhorse to disconnect cached connections.
2448 : *
2449 : * This function scans all the connection cache entries and disconnects
2450 : * the open connections whose foreign server OID matches with
2451 : * the specified one. If InvalidOid is specified, it disconnects all
2452 : * the cached connections.
2453 : *
2454 : * This function emits a warning for each connection that's used in
2455 : * the current transaction and doesn't close it. It returns true if
2456 : * it disconnects at least one connection, otherwise false.
2457 : *
2458 : * Note that this function disconnects even the connections that are
2459 : * established by other users in the same local session using different
2460 : * user mappings. This leads even non-superuser to be able to close
2461 : * the connections established by superusers in the same local session.
2462 : *
2463 : * XXX As of now we don't see any security risk doing this. But we should
2464 : * set some restrictions on that, for example, prevent non-superuser
2465 : * from closing the connections established by superusers even
2466 : * in the same session?
2467 : */
2468 : static bool
2469 8 : disconnect_cached_connections(Oid serverid)
2470 : {
2471 : HASH_SEQ_STATUS scan;
2472 : ConnCacheEntry *entry;
2473 8 : bool all = !OidIsValid(serverid);
2474 8 : bool result = false;
2475 :
2476 : /*
2477 : * Connection cache hashtable has not been initialized yet in this
2478 : * session, so return false.
2479 : */
2480 8 : if (!ConnectionHash)
2481 0 : return false;
2482 :
2483 8 : hash_seq_init(&scan, ConnectionHash);
2484 67 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2485 : {
2486 : /* Ignore cache entry if no open connection right now. */
2487 59 : if (!entry->conn)
2488 47 : continue;
2489 :
2490 12 : if (all || entry->serverid == serverid)
2491 : {
2492 : /*
2493 : * Emit a warning because the connection to close is used in the
2494 : * current transaction and cannot be disconnected right now.
2495 : */
2496 9 : if (entry->xact_depth > 0)
2497 : {
2498 : ForeignServer *server;
2499 :
2500 3 : server = GetForeignServerExtended(entry->serverid,
2501 : FSV_MISSING_OK);
2502 :
2503 3 : if (!server)
2504 : {
2505 : /*
2506 : * If the foreign server was dropped while its connection
2507 : * was used in the current transaction, the connection
2508 : * must have been marked as invalid by
2509 : * pgfdw_inval_callback at the end of DROP SERVER command.
2510 : */
2511 : Assert(entry->invalidated);
2512 :
2513 0 : ereport(WARNING,
2514 : (errmsg("cannot close dropped server connection because it is still in use")));
2515 : }
2516 : else
2517 3 : ereport(WARNING,
2518 : (errmsg("cannot close connection for server \"%s\" because it is still in use",
2519 : server->servername)));
2520 : }
2521 : else
2522 : {
2523 6 : elog(DEBUG3, "discarding connection %p", entry->conn);
2524 6 : disconnect_pg_server(entry);
2525 6 : result = true;
2526 : }
2527 : }
2528 : }
2529 :
2530 8 : return result;
2531 : }
2532 :
2533 : /*
2534 : * Check if the remote server closed the connection.
2535 : *
2536 : * Returns 1 if the connection is closed, -1 if an error occurred,
2537 : * and 0 if it's not closed or if the connection check is unavailable
2538 : * on this platform.
2539 : */
2540 : static int
2541 2 : pgfdw_conn_check(PGconn *conn)
2542 : {
2543 2 : int sock = PQsocket(conn);
2544 :
2545 2 : if (PQstatus(conn) != CONNECTION_OK || sock == -1)
2546 0 : return -1;
2547 :
2548 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2549 : {
2550 : struct pollfd input_fd;
2551 : int result;
2552 :
2553 2 : input_fd.fd = sock;
2554 2 : input_fd.events = POLLRDHUP;
2555 2 : input_fd.revents = 0;
2556 :
2557 : do
2558 2 : result = poll(&input_fd, 1, 0);
2559 2 : while (result < 0 && errno == EINTR);
2560 :
2561 2 : if (result < 0)
2562 0 : return -1;
2563 :
2564 2 : return (input_fd.revents &
2565 2 : (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2566 : }
2567 : #else
2568 : return 0;
2569 : #endif
2570 : }
2571 :
2572 : /*
2573 : * Check if connection status checking is available on this platform.
2574 : *
2575 : * Returns true if available, false otherwise.
2576 : */
2577 : static bool
2578 2 : pgfdw_conn_checkable(void)
2579 : {
2580 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2581 2 : return true;
2582 : #else
2583 : return false;
2584 : #endif
2585 : }
2586 :
2587 : /*
2588 : * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
2589 : * keys used to pass-through are coming from the initial connection from the
2590 : * client with the server.
2591 : *
2592 : * All required SCRAM options are set by postgres_fdw, so we just need to
2593 : * ensure that these options are not overwritten by the user.
2594 : */
2595 : static bool
2596 9 : pgfdw_has_required_scram_options(const char **keywords, const char **values)
2597 : {
2598 9 : bool has_scram_server_key = false;
2599 9 : bool has_scram_client_key = false;
2600 9 : bool has_require_auth = false;
2601 9 : bool has_scram_keys = false;
2602 :
2603 : /*
2604 : * Continue iterating even if we found the keys that we need to validate
2605 : * to make sure that there is no other declaration of these keys that can
2606 : * overwrite the first.
2607 : */
2608 90 : for (int i = 0; keywords[i] != NULL; i++)
2609 : {
2610 81 : if (strcmp(keywords[i], "scram_client_key") == 0)
2611 : {
2612 9 : if (values[i] != NULL && values[i][0] != '\0')
2613 9 : has_scram_client_key = true;
2614 : else
2615 0 : has_scram_client_key = false;
2616 : }
2617 :
2618 81 : if (strcmp(keywords[i], "scram_server_key") == 0)
2619 : {
2620 9 : if (values[i] != NULL && values[i][0] != '\0')
2621 9 : has_scram_server_key = true;
2622 : else
2623 0 : has_scram_server_key = false;
2624 : }
2625 :
2626 81 : if (strcmp(keywords[i], "require_auth") == 0)
2627 : {
2628 9 : if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
2629 9 : has_require_auth = true;
2630 : else
2631 0 : has_require_auth = false;
2632 : }
2633 : }
2634 :
2635 9 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
2636 :
2637 9 : return (has_scram_keys && has_require_auth);
2638 : }
|