Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * connection.c
4 : * Connection management functions for postgres_fdw
5 : *
6 : * Portions Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/postgres_fdw/connection.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #if HAVE_POLL_H
16 : #include <poll.h>
17 : #endif
18 :
19 : #include "access/xact.h"
20 : #include "catalog/pg_user_mapping.h"
21 : #include "commands/defrem.h"
22 : #include "common/base64.h"
23 : #include "funcapi.h"
24 : #include "libpq/libpq-be.h"
25 : #include "libpq/libpq-be-fe-helpers.h"
26 : #include "mb/pg_wchar.h"
27 : #include "miscadmin.h"
28 : #include "pgstat.h"
29 : #include "postgres_fdw.h"
30 : #include "storage/latch.h"
31 : #include "utils/builtins.h"
32 : #include "utils/hsearch.h"
33 : #include "utils/inval.h"
34 : #include "utils/syscache.h"
35 :
36 : /*
37 : * Connection cache hash table entry
38 : *
39 : * The lookup key in this hash table is the user mapping OID. We use just one
40 : * connection per user mapping ID, which ensures that all the scans use the
41 : * same snapshot during a query. Using the user mapping OID rather than
42 : * the foreign server OID + user OID avoids creating multiple connections when
43 : * the public user mapping applies to all user OIDs.
44 : *
45 : * The "conn" pointer can be NULL if we don't currently have a live connection.
46 : * When we do have a connection, xact_depth tracks the current depth of
47 : * transactions and subtransactions open on the remote side. We need to issue
48 : * commands at the same nesting depth on the remote as we're executing at
49 : * ourselves, so that rolling back a subtransaction will kill the right
50 : * queries and not the wrong ones.
51 : */
52 : typedef Oid ConnCacheKey;
53 :
54 : typedef struct ConnCacheEntry
55 : {
56 : ConnCacheKey key; /* hash key (must be first) */
57 : PGconn *conn; /* connection to foreign server, or NULL */
58 : /* Remaining fields are invalid when conn is NULL: */
59 : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
60 : * one level of subxact open, etc */
61 : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
62 : bool have_error; /* have any subxacts aborted in this xact? */
63 : bool changing_xact_state; /* xact state change in process */
64 : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
65 : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
66 : bool invalidated; /* true if reconnect is pending */
67 : bool keep_connections; /* setting value of keep_connections
68 : * server option */
69 : Oid serverid; /* foreign server OID used to get server name */
70 : uint32 server_hashvalue; /* hash value of foreign server OID */
71 : uint32 mapping_hashvalue; /* hash value of user mapping OID */
72 : PgFdwConnState state; /* extra per-connection state */
73 : } ConnCacheEntry;
74 :
75 : /*
76 : * Connection cache (initialized on first use)
77 : */
78 : static HTAB *ConnectionHash = NULL;
79 :
80 : /* for assigning cursor numbers and prepared statement numbers */
81 : static unsigned int cursor_number = 0;
82 : static unsigned int prep_stmt_number = 0;
83 :
84 : /* tracks whether any work is needed in callback functions */
85 : static bool xact_got_connection = false;
86 :
87 : /* custom wait event values, retrieved from shared memory */
88 : static uint32 pgfdw_we_cleanup_result = 0;
89 : static uint32 pgfdw_we_connect = 0;
90 : static uint32 pgfdw_we_get_result = 0;
91 :
92 : /*
93 : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
94 : * query; if it takes longer than 30 seconds to do these, we assume the
95 : * connection is dead.
96 : */
97 : #define CONNECTION_CLEANUP_TIMEOUT 30000
98 :
99 : /*
100 : * Milliseconds to wait before issuing another cancel request. This covers
101 : * the race condition where the remote session ignored our cancel request
102 : * because it arrived while idle.
103 : */
104 : #define RETRY_CANCEL_TIMEOUT 1000
105 :
106 : /* Macro for constructing abort command to be sent */
107 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
108 : do { \
109 : if (toplevel) \
110 : snprintf((sql), sizeof(sql), \
111 : "ABORT TRANSACTION"); \
112 : else \
113 : snprintf((sql), sizeof(sql), \
114 : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
115 : (entry)->xact_depth, (entry)->xact_depth); \
116 : } while(0)
117 :
118 : /*
119 : * Extension version number, for supporting older extension versions' objects
120 : */
121 : enum pgfdwVersion
122 : {
123 : PGFDW_V1_1 = 0,
124 : PGFDW_V1_2,
125 : };
126 :
127 : /*
128 : * SQL functions
129 : */
130 4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
131 6 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
132 6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
133 6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
134 :
135 : /* prototypes of private functions */
136 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
137 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
138 : static void disconnect_pg_server(ConnCacheEntry *entry);
139 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
140 : static void configure_remote_session(PGconn *conn);
141 : static void do_sql_command_begin(PGconn *conn, const char *sql);
142 : static void do_sql_command_end(PGconn *conn, const char *sql,
143 : bool consume_input);
144 : static void begin_remote_xact(ConnCacheEntry *entry);
145 : static void pgfdw_xact_callback(XactEvent event, void *arg);
146 : static void pgfdw_subxact_callback(SubXactEvent event,
147 : SubTransactionId mySubid,
148 : SubTransactionId parentSubid,
149 : void *arg);
150 : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
151 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
152 : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
153 : static bool pgfdw_cancel_query(PGconn *conn);
154 : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
155 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
156 : TimestampTz retrycanceltime,
157 : bool consume_input);
158 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
159 : bool ignore_errors);
160 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
161 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
162 : TimestampTz endtime,
163 : bool consume_input,
164 : bool ignore_errors);
165 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
166 : TimestampTz retrycanceltime,
167 : PGresult **result, bool *timed_out);
168 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
169 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
170 : List **pending_entries,
171 : List **cancel_requested);
172 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
173 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
174 : int curlevel);
175 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
176 : List *cancel_requested,
177 : bool toplevel);
178 : static void pgfdw_security_check(const char **keywords, const char **values,
179 : UserMapping *user, PGconn *conn);
180 : static bool UserMappingPasswordRequired(UserMapping *user);
181 : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
182 : static bool disconnect_cached_connections(Oid serverid);
183 : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
184 : enum pgfdwVersion api_version);
185 : static int pgfdw_conn_check(PGconn *conn);
186 : static bool pgfdw_conn_checkable(void);
187 : static bool pgfdw_has_required_scram_options(const char **keywords, const char **values);
188 :
189 : /*
190 : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
191 : * server with the user's authorization. A new connection is established
192 : * if we don't already have a suitable one, and a transaction is opened at
193 : * the right subtransaction nesting depth if we didn't do that already.
194 : *
195 : * will_prep_stmt must be true if caller intends to create any prepared
196 : * statements. Since those don't go away automatically at transaction end
197 : * (not even on error), we need this flag to cue manual cleanup.
198 : *
199 : * If state is not NULL, *state receives the per-connection state associated
200 : * with the PGconn.
201 : */
202 : PGconn *
203 4334 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
204 : {
205 : bool found;
206 4334 : bool retry = false;
207 : ConnCacheEntry *entry;
208 : ConnCacheKey key;
209 4334 : MemoryContext ccxt = CurrentMemoryContext;
210 :
211 : /* First time through, initialize connection cache hashtable */
212 4334 : if (ConnectionHash == NULL)
213 : {
214 : HASHCTL ctl;
215 :
216 20 : if (pgfdw_we_get_result == 0)
217 20 : pgfdw_we_get_result =
218 20 : WaitEventExtensionNew("PostgresFdwGetResult");
219 :
220 20 : ctl.keysize = sizeof(ConnCacheKey);
221 20 : ctl.entrysize = sizeof(ConnCacheEntry);
222 20 : ConnectionHash = hash_create("postgres_fdw connections", 8,
223 : &ctl,
224 : HASH_ELEM | HASH_BLOBS);
225 :
226 : /*
227 : * Register some callback functions that manage connection cleanup.
228 : * This should be done just once in each backend.
229 : */
230 20 : RegisterXactCallback(pgfdw_xact_callback, NULL);
231 20 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
232 20 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
233 : pgfdw_inval_callback, (Datum) 0);
234 20 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
235 : pgfdw_inval_callback, (Datum) 0);
236 : }
237 :
238 : /* Set flag that we did GetConnection during the current transaction */
239 4334 : xact_got_connection = true;
240 :
241 : /* Create hash key for the entry. Assume no pad bytes in key struct */
242 4334 : key = user->umid;
243 :
244 : /*
245 : * Find or create cached entry for requested connection.
246 : */
247 4334 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
248 4334 : if (!found)
249 : {
250 : /*
251 : * We need only clear "conn" here; remaining fields will be filled
252 : * later when "conn" is set.
253 : */
254 36 : entry->conn = NULL;
255 : }
256 :
257 : /* Reject further use of connections which failed abort cleanup. */
258 4334 : pgfdw_reject_incomplete_xact_state_change(entry);
259 :
260 : /*
261 : * If the connection needs to be remade due to invalidation, disconnect as
262 : * soon as we're out of all transactions.
263 : */
264 4334 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
265 : {
266 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
267 : entry->conn);
268 0 : disconnect_pg_server(entry);
269 : }
270 :
271 : /*
272 : * If cache entry doesn't have a connection, we have to establish a new
273 : * connection. (If connect_pg_server throws an error, the cache entry
274 : * will remain in a valid empty state, ie conn == NULL.)
275 : */
276 4334 : if (entry->conn == NULL)
277 142 : make_new_connection(entry, user);
278 :
279 : /*
280 : * We check the health of the cached connection here when using it. In
281 : * cases where we're out of all transactions, if a broken connection is
282 : * detected, we try to reestablish a new connection later.
283 : */
284 4320 : PG_TRY();
285 : {
286 : /* Process a pending asynchronous request if any. */
287 4320 : if (entry->state.pendingAreq)
288 0 : process_pending_request(entry->state.pendingAreq);
289 : /* Start a new transaction or subtransaction if needed. */
290 4320 : begin_remote_xact(entry);
291 : }
292 4 : PG_CATCH();
293 : {
294 4 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
295 4 : ErrorData *errdata = CopyErrorData();
296 :
297 : /*
298 : * Determine whether to try to reestablish the connection.
299 : *
300 : * After a broken connection is detected in libpq, any error other
301 : * than connection failure (e.g., out-of-memory) can be thrown
302 : * somewhere between return from libpq and the expected ereport() call
303 : * in pgfdw_report_error(). In this case, since PQstatus() indicates
304 : * CONNECTION_BAD, checking only PQstatus() causes the false detection
305 : * of connection failure. To avoid this, we also verify that the
306 : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
307 : * checking only the sqlstate can cause another false detection
308 : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
309 : * for any libpq-originated error condition.
310 : */
311 4 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
312 4 : PQstatus(entry->conn) != CONNECTION_BAD ||
313 4 : entry->xact_depth > 0)
314 : {
315 2 : MemoryContextSwitchTo(ecxt);
316 2 : PG_RE_THROW();
317 : }
318 :
319 : /* Clean up the error state */
320 2 : FlushErrorState();
321 2 : FreeErrorData(errdata);
322 2 : errdata = NULL;
323 :
324 2 : retry = true;
325 : }
326 4318 : PG_END_TRY();
327 :
328 : /*
329 : * If a broken connection is detected, disconnect it, reestablish a new
330 : * connection and retry a new remote transaction. If connection failure is
331 : * reported again, we give up getting a connection.
332 : */
333 4318 : if (retry)
334 : {
335 : Assert(entry->xact_depth == 0);
336 :
337 2 : ereport(DEBUG3,
338 : (errmsg_internal("could not start remote transaction on connection %p",
339 : entry->conn)),
340 : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
341 :
342 2 : elog(DEBUG3, "closing connection %p to reestablish a new one",
343 : entry->conn);
344 2 : disconnect_pg_server(entry);
345 :
346 2 : make_new_connection(entry, user);
347 :
348 2 : begin_remote_xact(entry);
349 : }
350 :
351 : /* Remember if caller will prepare statements */
352 4318 : entry->have_prep_stmt |= will_prep_stmt;
353 :
354 : /* If caller needs access to the per-connection state, return it. */
355 4318 : if (state)
356 1456 : *state = &entry->state;
357 :
358 4318 : return entry->conn;
359 : }
360 :
361 : /*
362 : * Reset all transient state fields in the cached connection entry and
363 : * establish new connection to the remote server.
364 : */
365 : static void
366 144 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
367 : {
368 144 : ForeignServer *server = GetForeignServer(user->serverid);
369 : ListCell *lc;
370 :
371 : Assert(entry->conn == NULL);
372 :
373 : /* Reset all transient state fields, to be sure all are clean */
374 144 : entry->xact_depth = 0;
375 144 : entry->have_prep_stmt = false;
376 144 : entry->have_error = false;
377 144 : entry->changing_xact_state = false;
378 144 : entry->invalidated = false;
379 144 : entry->serverid = server->serverid;
380 144 : entry->server_hashvalue =
381 144 : GetSysCacheHashValue1(FOREIGNSERVEROID,
382 : ObjectIdGetDatum(server->serverid));
383 144 : entry->mapping_hashvalue =
384 144 : GetSysCacheHashValue1(USERMAPPINGOID,
385 : ObjectIdGetDatum(user->umid));
386 144 : memset(&entry->state, 0, sizeof(entry->state));
387 :
388 : /*
389 : * Determine whether to keep the connection that we're about to make here
390 : * open even after the transaction using it ends, so that the subsequent
391 : * transactions can re-use it.
392 : *
393 : * By default, all the connections to any foreign servers are kept open.
394 : *
395 : * Also determine whether to commit/abort (sub)transactions opened on the
396 : * remote server in parallel at (sub)transaction end, which is disabled by
397 : * default.
398 : *
399 : * Note: it's enough to determine these only when making a new connection
400 : * because if these settings for it are changed, it will be closed and
401 : * re-made later.
402 : */
403 144 : entry->keep_connections = true;
404 144 : entry->parallel_commit = false;
405 144 : entry->parallel_abort = false;
406 640 : foreach(lc, server->options)
407 : {
408 496 : DefElem *def = (DefElem *) lfirst(lc);
409 :
410 496 : if (strcmp(def->defname, "keep_connections") == 0)
411 16 : entry->keep_connections = defGetBoolean(def);
412 480 : else if (strcmp(def->defname, "parallel_commit") == 0)
413 4 : entry->parallel_commit = defGetBoolean(def);
414 476 : else if (strcmp(def->defname, "parallel_abort") == 0)
415 4 : entry->parallel_abort = defGetBoolean(def);
416 : }
417 :
418 : /* Now try to make the connection */
419 144 : entry->conn = connect_pg_server(server, user);
420 :
421 130 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
422 : entry->conn, server->servername, user->umid, user->userid);
423 130 : }
424 :
425 : /*
426 : * Check that non-superuser has used password or delegated credentials
427 : * to establish connection; otherwise, he's piggybacking on the
428 : * postgres server's user identity. See also dblink_security_check()
429 : * in contrib/dblink and check_conn_params.
430 : */
431 : static void
432 134 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
433 : {
434 : /* Superusers bypass the check */
435 134 : if (superuser_arg(user->userid))
436 118 : return;
437 :
438 : #ifdef ENABLE_GSS
439 : /* Connected via GSSAPI with delegated credentials- all good. */
440 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
441 : return;
442 : #endif
443 :
444 : /* Ok if superuser set PW required false. */
445 16 : if (!UserMappingPasswordRequired(user))
446 4 : return;
447 :
448 : /* Connected via PW, with PW required true, and provided non-empty PW. */
449 12 : if (PQconnectionUsedPassword(conn))
450 : {
451 : /* ok if params contain a non-empty password */
452 80 : for (int i = 0; keywords[i] != NULL; i++)
453 : {
454 72 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
455 0 : return;
456 : }
457 : }
458 :
459 : /*
460 : * Ok if SCRAM pass-through is being used and all required SCRAM options
461 : * are set correctly. If pgfdw_has_required_scram_options returns true we
462 : * assume that UseScramPassthrough is also true since SCRAM options are
463 : * only set when UseScramPassthrough is enabled.
464 : */
465 12 : if (MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
466 8 : return;
467 :
468 4 : ereport(ERROR,
469 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
470 : errmsg("password or GSSAPI delegated credentials required"),
471 : errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
472 : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
473 : }
474 :
475 : /*
476 : * Connect to remote server using specified server and user mapping properties.
477 : */
478 : static PGconn *
479 144 : connect_pg_server(ForeignServer *server, UserMapping *user)
480 : {
481 144 : PGconn *volatile conn = NULL;
482 :
483 : /*
484 : * Use PG_TRY block to ensure closing connection on error.
485 : */
486 144 : PG_TRY();
487 : {
488 : const char **keywords;
489 : const char **values;
490 144 : char *appname = NULL;
491 : int n;
492 :
493 : /*
494 : * Construct connection params from generic options of ForeignServer
495 : * and UserMapping. (Some of them might not be libpq options, in
496 : * which case we'll just waste a few array slots.) Add 4 extra slots
497 : * for application_name, fallback_application_name, client_encoding,
498 : * end marker, and 3 extra slots for scram keys and required scram
499 : * pass-through options.
500 : */
501 144 : n = list_length(server->options) + list_length(user->options) + 4 + 3;
502 144 : keywords = (const char **) palloc(n * sizeof(char *));
503 144 : values = (const char **) palloc(n * sizeof(char *));
504 :
505 144 : n = 0;
506 288 : n += ExtractConnectionOptions(server->options,
507 144 : keywords + n, values + n);
508 288 : n += ExtractConnectionOptions(user->options,
509 144 : keywords + n, values + n);
510 :
511 : /*
512 : * Use pgfdw_application_name as application_name if set.
513 : *
514 : * PQconnectdbParams() processes the parameter arrays from start to
515 : * end. If any key word is repeated, the last value is used. Therefore
516 : * note that pgfdw_application_name must be added to the arrays after
517 : * options of ForeignServer are, so that it can override
518 : * application_name set in ForeignServer.
519 : */
520 144 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
521 : {
522 2 : keywords[n] = "application_name";
523 2 : values[n] = pgfdw_application_name;
524 2 : n++;
525 : }
526 :
527 : /*
528 : * Search the parameter arrays to find application_name setting, and
529 : * replace escape sequences in it with status information if found.
530 : * The arrays are searched backwards because the last value is used if
531 : * application_name is repeatedly set.
532 : */
533 388 : for (int i = n - 1; i >= 0; i--)
534 : {
535 280 : if (strcmp(keywords[i], "application_name") == 0 &&
536 36 : *(values[i]) != '\0')
537 : {
538 : /*
539 : * Use this application_name setting if it's not empty string
540 : * even after any escape sequences in it are replaced.
541 : */
542 36 : appname = process_pgfdw_appname(values[i]);
543 36 : if (appname[0] != '\0')
544 : {
545 36 : values[i] = appname;
546 36 : break;
547 : }
548 :
549 : /*
550 : * This empty application_name is not used, so we set
551 : * values[i] to NULL and keep searching the array to find the
552 : * next one.
553 : */
554 0 : values[i] = NULL;
555 0 : pfree(appname);
556 0 : appname = NULL;
557 : }
558 : }
559 :
560 : /* Use "postgres_fdw" as fallback_application_name */
561 144 : keywords[n] = "fallback_application_name";
562 144 : values[n] = "postgres_fdw";
563 144 : n++;
564 :
565 : /* Set client_encoding so that libpq can convert encoding properly. */
566 144 : keywords[n] = "client_encoding";
567 144 : values[n] = GetDatabaseEncodingName();
568 144 : n++;
569 :
570 : /* Add required SCRAM pass-through connection options if it's enabled. */
571 144 : if (MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
572 : {
573 : int len;
574 : int encoded_len;
575 :
576 10 : keywords[n] = "scram_client_key";
577 10 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
578 : /* don't forget the zero-terminator */
579 10 : values[n] = palloc0(len + 1);
580 20 : encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ClientKey,
581 : sizeof(MyProcPort->scram_ClientKey),
582 10 : (char *) values[n], len);
583 10 : if (encoded_len < 0)
584 0 : elog(ERROR, "could not encode SCRAM client key");
585 10 : n++;
586 :
587 10 : keywords[n] = "scram_server_key";
588 10 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
589 : /* don't forget the zero-terminator */
590 10 : values[n] = palloc0(len + 1);
591 20 : encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ServerKey,
592 : sizeof(MyProcPort->scram_ServerKey),
593 10 : (char *) values[n], len);
594 10 : if (encoded_len < 0)
595 0 : elog(ERROR, "could not encode SCRAM server key");
596 10 : n++;
597 :
598 : /*
599 : * Require scram-sha-256 to ensure that no other auth method is
600 : * used when connecting with foreign server.
601 : */
602 10 : keywords[n] = "require_auth";
603 10 : values[n] = "scram-sha-256";
604 10 : n++;
605 : }
606 :
607 144 : keywords[n] = values[n] = NULL;
608 :
609 : /* Verify the set of connection parameters. */
610 144 : check_conn_params(keywords, values, user);
611 :
612 : /* first time, allocate or get the custom wait event */
613 140 : if (pgfdw_we_connect == 0)
614 20 : pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
615 :
616 : /* OK to make connection */
617 140 : conn = libpqsrv_connect_params(keywords, values,
618 : false, /* expand_dbname */
619 : pgfdw_we_connect);
620 :
621 140 : if (!conn || PQstatus(conn) != CONNECTION_OK)
622 6 : ereport(ERROR,
623 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
624 : errmsg("could not connect to server \"%s\"",
625 : server->servername),
626 : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
627 :
628 : /* Perform post-connection security checks. */
629 134 : pgfdw_security_check(keywords, values, user, conn);
630 :
631 : /* Prepare new session for use */
632 130 : configure_remote_session(conn);
633 :
634 130 : if (appname != NULL)
635 36 : pfree(appname);
636 130 : pfree(keywords);
637 130 : pfree(values);
638 : }
639 14 : PG_CATCH();
640 : {
641 14 : libpqsrv_disconnect(conn);
642 14 : PG_RE_THROW();
643 : }
644 130 : PG_END_TRY();
645 :
646 130 : return conn;
647 : }
648 :
649 : /*
650 : * Disconnect any open connection for a connection cache entry.
651 : */
652 : static void
653 112 : disconnect_pg_server(ConnCacheEntry *entry)
654 : {
655 112 : if (entry->conn != NULL)
656 : {
657 112 : libpqsrv_disconnect(entry->conn);
658 112 : entry->conn = NULL;
659 : }
660 112 : }
661 :
662 : /*
663 : * Return true if the password_required is defined and false for this user
664 : * mapping, otherwise false. The mapping has been pre-validated.
665 : */
666 : static bool
667 32 : UserMappingPasswordRequired(UserMapping *user)
668 : {
669 : ListCell *cell;
670 :
671 56 : foreach(cell, user->options)
672 : {
673 30 : DefElem *def = (DefElem *) lfirst(cell);
674 :
675 30 : if (strcmp(def->defname, "password_required") == 0)
676 6 : return defGetBoolean(def);
677 : }
678 :
679 26 : return true;
680 : }
681 :
682 : static bool
683 10 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
684 : {
685 : ListCell *cell;
686 :
687 40 : foreach(cell, server->options)
688 : {
689 40 : DefElem *def = (DefElem *) lfirst(cell);
690 :
691 40 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
692 10 : return defGetBoolean(def);
693 : }
694 :
695 0 : foreach(cell, user->options)
696 : {
697 0 : DefElem *def = (DefElem *) lfirst(cell);
698 :
699 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
700 0 : return defGetBoolean(def);
701 : }
702 :
703 0 : return false;
704 : }
705 :
706 : /*
707 : * For non-superusers, insist that the connstr specify a password or that the
708 : * user provided their own GSSAPI delegated credentials. This
709 : * prevents a password from being picked up from .pgpass, a service file, the
710 : * environment, etc. We don't want the postgres user's passwords,
711 : * certificates, etc to be accessible to non-superusers. (See also
712 : * dblink_connstr_check in contrib/dblink.)
713 : */
714 : static void
715 144 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
716 : {
717 : int i;
718 :
719 : /* no check required if superuser */
720 144 : if (superuser_arg(user->userid))
721 122 : return;
722 :
723 : #ifdef ENABLE_GSS
724 : /* ok if the user provided their own delegated credentials */
725 : if (be_gssapi_get_delegation(MyProcPort))
726 : return;
727 : #endif
728 :
729 : /* ok if params contain a non-empty password */
730 148 : for (i = 0; keywords[i] != NULL; i++)
731 : {
732 132 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
733 6 : return;
734 : }
735 :
736 : /* ok if the superuser explicitly said so at user mapping creation time */
737 16 : if (!UserMappingPasswordRequired(user))
738 2 : return;
739 :
740 : /*
741 : * Ok if SCRAM pass-through is being used and all required scram options
742 : * are set correctly. If pgfdw_has_required_scram_options returns true we
743 : * assume that UseScramPassthrough is also true since SCRAM options are
744 : * only set when UseScramPassthrough is enabled.
745 : */
746 14 : if (MyProcPort->has_scram_keys && pgfdw_has_required_scram_options(keywords, values))
747 10 : return;
748 :
749 4 : ereport(ERROR,
750 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
751 : errmsg("password or GSSAPI delegated credentials required"),
752 : errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
753 : }
754 :
755 : /*
756 : * Issue SET commands to make sure remote session is configured properly.
757 : *
758 : * We do this just once at connection, assuming nothing will change the
759 : * values later. Since we'll never send volatile function calls to the
760 : * remote, there shouldn't be any way to break this assumption from our end.
761 : * It's possible to think of ways to break it at the remote end, eg making
762 : * a foreign table point to a view that includes a set_config call ---
763 : * but once you admit the possibility of a malicious view definition,
764 : * there are any number of ways to break things.
765 : */
766 : static void
767 130 : configure_remote_session(PGconn *conn)
768 : {
769 130 : int remoteversion = PQserverVersion(conn);
770 :
771 : /* Force the search path to contain only pg_catalog (see deparse.c) */
772 130 : do_sql_command(conn, "SET search_path = pg_catalog");
773 :
774 : /*
775 : * Set remote timezone; this is basically just cosmetic, since all
776 : * transmitted and returned timestamptzs should specify a zone explicitly
777 : * anyway. However it makes the regression test outputs more predictable.
778 : *
779 : * We don't risk setting remote zone equal to ours, since the remote
780 : * server might use a different timezone database. Instead, use GMT
781 : * (quoted, because very old servers are picky about case). That's
782 : * guaranteed to work regardless of the remote's timezone database,
783 : * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
784 : */
785 130 : do_sql_command(conn, "SET timezone = 'GMT'");
786 :
787 : /*
788 : * Set values needed to ensure unambiguous data output from remote. (This
789 : * logic should match what pg_dump does. See also set_transmission_modes
790 : * in postgres_fdw.c.)
791 : */
792 130 : do_sql_command(conn, "SET datestyle = ISO");
793 130 : if (remoteversion >= 80400)
794 130 : do_sql_command(conn, "SET intervalstyle = postgres");
795 130 : if (remoteversion >= 90000)
796 130 : do_sql_command(conn, "SET extra_float_digits = 3");
797 : else
798 0 : do_sql_command(conn, "SET extra_float_digits = 2");
799 130 : }
800 :
801 : /*
802 : * Convenience subroutine to issue a non-data-returning SQL command to remote
803 : */
804 : void
805 3528 : do_sql_command(PGconn *conn, const char *sql)
806 : {
807 3528 : do_sql_command_begin(conn, sql);
808 3528 : do_sql_command_end(conn, sql, false);
809 3522 : }
810 :
811 : static void
812 3564 : do_sql_command_begin(PGconn *conn, const char *sql)
813 : {
814 3564 : if (!PQsendQuery(conn, sql))
815 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
816 3564 : }
817 :
818 : static void
819 3564 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
820 : {
821 : PGresult *res;
822 :
823 : /*
824 : * If requested, consume whatever data is available from the socket. (Note
825 : * that if all data is available, this allows pgfdw_get_result to call
826 : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
827 : * would be large compared to the overhead of PQconsumeInput.)
828 : */
829 3564 : if (consume_input && !PQconsumeInput(conn))
830 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
831 3564 : res = pgfdw_get_result(conn);
832 3564 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
833 6 : pgfdw_report_error(ERROR, res, conn, true, sql);
834 3558 : PQclear(res);
835 3558 : }
836 :
837 : /*
838 : * Start remote transaction or subtransaction, if needed.
839 : *
840 : * Note that we always use at least REPEATABLE READ in the remote session.
841 : * This is so that, if a query initiates multiple scans of the same or
842 : * different foreign tables, we will get snapshot-consistent results from
843 : * those scans. A disadvantage is that we can't provide sane emulation of
844 : * READ COMMITTED behavior --- it would be nice if we had some other way to
845 : * control which remote queries share a snapshot.
846 : */
847 : static void
848 4322 : begin_remote_xact(ConnCacheEntry *entry)
849 : {
850 4322 : int curlevel = GetCurrentTransactionNestLevel();
851 :
852 : /* Start main transaction if we haven't yet */
853 4322 : if (entry->xact_depth <= 0)
854 : {
855 : const char *sql;
856 :
857 1468 : elog(DEBUG3, "starting remote transaction on connection %p",
858 : entry->conn);
859 :
860 1468 : if (IsolationIsSerializable())
861 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
862 : else
863 1468 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
864 1468 : entry->changing_xact_state = true;
865 1468 : do_sql_command(entry->conn, sql);
866 1466 : entry->xact_depth = 1;
867 1466 : entry->changing_xact_state = false;
868 : }
869 :
870 : /*
871 : * If we're in a subtransaction, stack up savepoints to match our level.
872 : * This ensures we can rollback just the desired effects when a
873 : * subtransaction aborts.
874 : */
875 4348 : while (entry->xact_depth < curlevel)
876 : {
877 : char sql[64];
878 :
879 30 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
880 30 : entry->changing_xact_state = true;
881 30 : do_sql_command(entry->conn, sql);
882 28 : entry->xact_depth++;
883 28 : entry->changing_xact_state = false;
884 : }
885 4318 : }
886 :
887 : /*
888 : * Release connection reference count created by calling GetConnection.
889 : */
890 : void
891 4212 : ReleaseConnection(PGconn *conn)
892 : {
893 : /*
894 : * Currently, we don't actually track connection references because all
895 : * cleanup is managed on a transaction or subtransaction basis instead. So
896 : * there's nothing to do here.
897 : */
898 4212 : }
899 :
900 : /*
901 : * Assign a "unique" number for a cursor.
902 : *
903 : * These really only need to be unique per connection within a transaction.
904 : * For the moment we ignore the per-connection point and assign them across
905 : * all connections in the transaction, but we ask for the connection to be
906 : * supplied in case we want to refine that.
907 : *
908 : * Note that even if wraparound happens in a very long transaction, actual
909 : * collisions are highly improbable; just be sure to use %u not %d to print.
910 : */
911 : unsigned int
912 1050 : GetCursorNumber(PGconn *conn)
913 : {
914 1050 : return ++cursor_number;
915 : }
916 :
917 : /*
918 : * Assign a "unique" number for a prepared statement.
919 : *
920 : * This works much like GetCursorNumber, except that we never reset the counter
921 : * within a session. That's because we can't be 100% sure we've gotten rid
922 : * of all prepared statements on all connections, and it's not really worth
923 : * increasing the risk of prepared-statement name collisions by resetting.
924 : */
925 : unsigned int
926 356 : GetPrepStmtNumber(PGconn *conn)
927 : {
928 356 : return ++prep_stmt_number;
929 : }
930 :
931 : /*
932 : * Submit a query and wait for the result.
933 : *
934 : * Since we don't use non-blocking mode, this can't process interrupts while
935 : * pushing the query text to the server. That risk is relatively small, so we
936 : * ignore that for now.
937 : *
938 : * Caller is responsible for the error handling on the result.
939 : */
940 : PGresult *
941 7968 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
942 : {
943 : /* First, process a pending asynchronous request, if any. */
944 7968 : if (state && state->pendingAreq)
945 8 : process_pending_request(state->pendingAreq);
946 :
947 7968 : if (!PQsendQuery(conn, query))
948 0 : return NULL;
949 7968 : return pgfdw_get_result(conn);
950 : }
951 :
952 : /*
953 : * Wrap libpqsrv_get_result_last(), adding wait event.
954 : *
955 : * Caller is responsible for the error handling on the result.
956 : */
957 : PGresult *
958 16074 : pgfdw_get_result(PGconn *conn)
959 : {
960 16074 : return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
961 : }
962 :
963 : /*
964 : * Report an error we got from the remote server.
965 : *
966 : * elevel: error level to use (typically ERROR, but might be less)
967 : * res: PGresult containing the error
968 : * conn: connection we did the query on
969 : * clear: if true, PQclear the result (otherwise caller will handle it)
970 : * sql: NULL, or text of remote command we tried to execute
971 : *
972 : * Note: callers that choose not to throw ERROR for a remote error are
973 : * responsible for making sure that the associated ConnCacheEntry gets
974 : * marked with have_error = true.
975 : */
976 : void
977 32 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
978 : bool clear, const char *sql)
979 : {
980 : /* If requested, PGresult must be released before leaving this function. */
981 32 : PG_TRY();
982 : {
983 32 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
984 32 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
985 32 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
986 32 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
987 32 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
988 : int sqlstate;
989 :
990 32 : if (diag_sqlstate)
991 28 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
992 : diag_sqlstate[1],
993 : diag_sqlstate[2],
994 : diag_sqlstate[3],
995 : diag_sqlstate[4]);
996 : else
997 4 : sqlstate = ERRCODE_CONNECTION_FAILURE;
998 :
999 : /*
1000 : * If we don't get a message from the PGresult, try the PGconn. This
1001 : * is needed because for connection-level failures, PQgetResult may
1002 : * just return NULL, not a PGresult at all.
1003 : */
1004 32 : if (message_primary == NULL)
1005 4 : message_primary = pchomp(PQerrorMessage(conn));
1006 :
1007 32 : ereport(elevel,
1008 : (errcode(sqlstate),
1009 : (message_primary != NULL && message_primary[0] != '\0') ?
1010 : errmsg_internal("%s", message_primary) :
1011 : errmsg("could not obtain message string for remote error"),
1012 : message_detail ? errdetail_internal("%s", message_detail) : 0,
1013 : message_hint ? errhint("%s", message_hint) : 0,
1014 : message_context ? errcontext("%s", message_context) : 0,
1015 : sql ? errcontext("remote SQL command: %s", sql) : 0));
1016 : }
1017 32 : PG_FINALLY();
1018 : {
1019 32 : if (clear)
1020 30 : PQclear(res);
1021 : }
1022 32 : PG_END_TRY();
1023 0 : }
1024 :
1025 : /*
1026 : * pgfdw_xact_callback --- cleanup at main-transaction end.
1027 : *
1028 : * This runs just late enough that it must not enter user-defined code
1029 : * locally. (Entering such code on the remote side is fine. Its remote
1030 : * COMMIT TRANSACTION may run deferred triggers.)
1031 : */
1032 : static void
1033 7744 : pgfdw_xact_callback(XactEvent event, void *arg)
1034 : {
1035 : HASH_SEQ_STATUS scan;
1036 : ConnCacheEntry *entry;
1037 7744 : List *pending_entries = NIL;
1038 7744 : List *cancel_requested = NIL;
1039 :
1040 : /* Quick exit if no connections were touched in this transaction. */
1041 7744 : if (!xact_got_connection)
1042 6338 : return;
1043 :
1044 : /*
1045 : * Scan all connection cache entries to find open remote transactions, and
1046 : * close them.
1047 : */
1048 1406 : hash_seq_init(&scan, ConnectionHash);
1049 7216 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1050 : {
1051 : PGresult *res;
1052 :
1053 : /* Ignore cache entry if no open connection right now */
1054 5812 : if (entry->conn == NULL)
1055 3290 : continue;
1056 :
1057 : /* If it has an open remote transaction, try to close it */
1058 2522 : if (entry->xact_depth > 0)
1059 : {
1060 1468 : elog(DEBUG3, "closing remote transaction on connection %p",
1061 : entry->conn);
1062 :
1063 1468 : switch (event)
1064 : {
1065 1378 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
1066 : case XACT_EVENT_PRE_COMMIT:
1067 :
1068 : /*
1069 : * If abort cleanup previously failed for this connection,
1070 : * we can't issue any more commands against it.
1071 : */
1072 1378 : pgfdw_reject_incomplete_xact_state_change(entry);
1073 :
1074 : /* Commit all remote transactions during pre-commit */
1075 1378 : entry->changing_xact_state = true;
1076 1378 : if (entry->parallel_commit)
1077 : {
1078 32 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
1079 32 : pending_entries = lappend(pending_entries, entry);
1080 32 : continue;
1081 : }
1082 1346 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
1083 1346 : entry->changing_xact_state = false;
1084 :
1085 : /*
1086 : * If there were any errors in subtransactions, and we
1087 : * made prepared statements, do a DEALLOCATE ALL to make
1088 : * sure we get rid of all prepared statements. This is
1089 : * annoying and not terribly bulletproof, but it's
1090 : * probably not worth trying harder.
1091 : *
1092 : * DEALLOCATE ALL only exists in 8.3 and later, so this
1093 : * constrains how old a server postgres_fdw can
1094 : * communicate with. We intentionally ignore errors in
1095 : * the DEALLOCATE, so that we can hobble along to some
1096 : * extent with older servers (leaking prepared statements
1097 : * as we go; but we don't really support update operations
1098 : * pre-8.3 anyway).
1099 : */
1100 1346 : if (entry->have_prep_stmt && entry->have_error)
1101 : {
1102 0 : res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1103 : NULL);
1104 0 : PQclear(res);
1105 : }
1106 1346 : entry->have_prep_stmt = false;
1107 1346 : entry->have_error = false;
1108 1346 : break;
1109 2 : case XACT_EVENT_PRE_PREPARE:
1110 :
1111 : /*
1112 : * We disallow any remote transactions, since it's not
1113 : * very reasonable to hold them open until the prepared
1114 : * transaction is committed. For the moment, throw error
1115 : * unconditionally; later we might allow read-only cases.
1116 : * Note that the error will cause us to come right back
1117 : * here with event == XACT_EVENT_ABORT, so we'll clean up
1118 : * the connection state at that point.
1119 : */
1120 2 : ereport(ERROR,
1121 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1122 : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1123 : break;
1124 0 : case XACT_EVENT_PARALLEL_COMMIT:
1125 : case XACT_EVENT_COMMIT:
1126 : case XACT_EVENT_PREPARE:
1127 : /* Pre-commit should have closed the open transaction */
1128 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1129 : break;
1130 88 : case XACT_EVENT_PARALLEL_ABORT:
1131 : case XACT_EVENT_ABORT:
1132 : /* Rollback all remote transactions during abort */
1133 88 : if (entry->parallel_abort)
1134 : {
1135 8 : if (pgfdw_abort_cleanup_begin(entry, true,
1136 : &pending_entries,
1137 : &cancel_requested))
1138 8 : continue;
1139 : }
1140 : else
1141 80 : pgfdw_abort_cleanup(entry, true);
1142 80 : break;
1143 : }
1144 1054 : }
1145 :
1146 : /* Reset state to show we're out of a transaction */
1147 2480 : pgfdw_reset_xact_state(entry, true);
1148 : }
1149 :
1150 : /* If there are any pending connections, finish cleaning them up */
1151 1404 : if (pending_entries || cancel_requested)
1152 : {
1153 30 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1154 : event == XACT_EVENT_PRE_COMMIT)
1155 : {
1156 : Assert(cancel_requested == NIL);
1157 26 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1158 : }
1159 : else
1160 : {
1161 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1162 : event == XACT_EVENT_ABORT);
1163 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1164 : true);
1165 : }
1166 : }
1167 :
1168 : /*
1169 : * Regardless of the event type, we can now mark ourselves as out of the
1170 : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1171 : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1172 : */
1173 1404 : xact_got_connection = false;
1174 :
1175 : /* Also reset cursor numbering for next transaction */
1176 1404 : cursor_number = 0;
1177 : }
1178 :
1179 : /*
1180 : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1181 : */
1182 : static void
1183 76 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1184 : SubTransactionId parentSubid, void *arg)
1185 : {
1186 : HASH_SEQ_STATUS scan;
1187 : ConnCacheEntry *entry;
1188 : int curlevel;
1189 76 : List *pending_entries = NIL;
1190 76 : List *cancel_requested = NIL;
1191 :
1192 : /* Nothing to do at subxact start, nor after commit. */
1193 76 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1194 : event == SUBXACT_EVENT_ABORT_SUB))
1195 46 : return;
1196 :
1197 : /* Quick exit if no connections were touched in this transaction. */
1198 30 : if (!xact_got_connection)
1199 0 : return;
1200 :
1201 : /*
1202 : * Scan all connection cache entries to find open remote subtransactions
1203 : * of the current level, and close them.
1204 : */
1205 30 : curlevel = GetCurrentTransactionNestLevel();
1206 30 : hash_seq_init(&scan, ConnectionHash);
1207 204 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1208 : {
1209 : char sql[100];
1210 :
1211 : /*
1212 : * We only care about connections with open remote subtransactions of
1213 : * the current level.
1214 : */
1215 174 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1216 158 : continue;
1217 :
1218 28 : if (entry->xact_depth > curlevel)
1219 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1220 : entry->xact_depth);
1221 :
1222 28 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1223 : {
1224 : /*
1225 : * If abort cleanup previously failed for this connection, we
1226 : * can't issue any more commands against it.
1227 : */
1228 14 : pgfdw_reject_incomplete_xact_state_change(entry);
1229 :
1230 : /* Commit all remote subtransactions during pre-commit */
1231 14 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1232 14 : entry->changing_xact_state = true;
1233 14 : if (entry->parallel_commit)
1234 : {
1235 4 : do_sql_command_begin(entry->conn, sql);
1236 4 : pending_entries = lappend(pending_entries, entry);
1237 4 : continue;
1238 : }
1239 10 : do_sql_command(entry->conn, sql);
1240 10 : entry->changing_xact_state = false;
1241 : }
1242 : else
1243 : {
1244 : /* Rollback all remote subtransactions during abort */
1245 14 : if (entry->parallel_abort)
1246 : {
1247 8 : if (pgfdw_abort_cleanup_begin(entry, false,
1248 : &pending_entries,
1249 : &cancel_requested))
1250 8 : continue;
1251 : }
1252 : else
1253 6 : pgfdw_abort_cleanup(entry, false);
1254 : }
1255 :
1256 : /* OK, we're outta that level of subtransaction */
1257 16 : pgfdw_reset_xact_state(entry, false);
1258 : }
1259 :
1260 : /* If there are any pending connections, finish cleaning them up */
1261 30 : if (pending_entries || cancel_requested)
1262 : {
1263 6 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1264 : {
1265 : Assert(cancel_requested == NIL);
1266 2 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1267 : }
1268 : else
1269 : {
1270 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1271 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1272 : false);
1273 : }
1274 : }
1275 : }
1276 :
1277 : /*
1278 : * Connection invalidation callback function
1279 : *
1280 : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1281 : * close connections depending on that entry immediately if current transaction
1282 : * has not used those connections yet. Otherwise, mark those connections as
1283 : * invalid and then make pgfdw_xact_callback() close them at the end of current
1284 : * transaction, since they cannot be closed in the midst of the transaction
1285 : * using them. Closed connections will be remade at the next opportunity if
1286 : * necessary.
1287 : *
1288 : * Although most cache invalidation callbacks blow away all the related stuff
1289 : * regardless of the given hashvalue, connections are expensive enough that
1290 : * it's worth trying to avoid that.
1291 : *
1292 : * NB: We could avoid unnecessary disconnection more strictly by examining
1293 : * individual option values, but it seems too much effort for the gain.
1294 : */
1295 : static void
1296 344 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1297 : {
1298 : HASH_SEQ_STATUS scan;
1299 : ConnCacheEntry *entry;
1300 :
1301 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1302 :
1303 : /* ConnectionHash must exist already, if we're registered */
1304 344 : hash_seq_init(&scan, ConnectionHash);
1305 2332 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1306 : {
1307 : /* Ignore invalid entries */
1308 1988 : if (entry->conn == NULL)
1309 1624 : continue;
1310 :
1311 : /* hashvalue == 0 means a cache reset, must clear all state */
1312 364 : if (hashvalue == 0 ||
1313 258 : (cacheid == FOREIGNSERVEROID &&
1314 364 : entry->server_hashvalue == hashvalue) ||
1315 106 : (cacheid == USERMAPPINGOID &&
1316 106 : entry->mapping_hashvalue == hashvalue))
1317 : {
1318 : /*
1319 : * Close the connection immediately if it's not used yet in this
1320 : * transaction. Otherwise mark it as invalid so that
1321 : * pgfdw_xact_callback() can close it at the end of this
1322 : * transaction.
1323 : */
1324 98 : if (entry->xact_depth == 0)
1325 : {
1326 92 : elog(DEBUG3, "discarding connection %p", entry->conn);
1327 92 : disconnect_pg_server(entry);
1328 : }
1329 : else
1330 6 : entry->invalidated = true;
1331 : }
1332 : }
1333 344 : }
1334 :
1335 : /*
1336 : * Raise an error if the given connection cache entry is marked as being
1337 : * in the middle of an xact state change. This should be called at which no
1338 : * such change is expected to be in progress; if one is found to be in
1339 : * progress, it means that we aborted in the middle of a previous state change
1340 : * and now don't know what the remote transaction state actually is.
1341 : * Such connections can't safely be further used. Re-establishing the
1342 : * connection would change the snapshot and roll back any writes already
1343 : * performed, so that's not an option, either. Thus, we must abort.
1344 : */
1345 : static void
1346 5726 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1347 : {
1348 : ForeignServer *server;
1349 :
1350 : /* nothing to do for inactive entries and entries of sane state */
1351 5726 : if (entry->conn == NULL || !entry->changing_xact_state)
1352 5726 : return;
1353 :
1354 : /* make sure this entry is inactive */
1355 0 : disconnect_pg_server(entry);
1356 :
1357 : /* find server name to be shown in the message below */
1358 0 : server = GetForeignServer(entry->serverid);
1359 :
1360 0 : ereport(ERROR,
1361 : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1362 : errmsg("connection to server \"%s\" was lost",
1363 : server->servername)));
1364 : }
1365 :
1366 : /*
1367 : * Reset state to show we're out of a (sub)transaction.
1368 : */
1369 : static void
1370 2548 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1371 : {
1372 2548 : if (toplevel)
1373 : {
1374 : /* Reset state to show we're out of a transaction */
1375 2520 : entry->xact_depth = 0;
1376 :
1377 : /*
1378 : * If the connection isn't in a good idle state, it is marked as
1379 : * invalid or keep_connections option of its server is disabled, then
1380 : * discard it to recover. Next GetConnection will open a new
1381 : * connection.
1382 : */
1383 5038 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1384 2518 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1385 2518 : entry->changing_xact_state ||
1386 2518 : entry->invalidated ||
1387 2514 : !entry->keep_connections)
1388 : {
1389 8 : elog(DEBUG3, "discarding connection %p", entry->conn);
1390 8 : disconnect_pg_server(entry);
1391 : }
1392 : }
1393 : else
1394 : {
1395 : /* Reset state to show we're out of a subtransaction */
1396 28 : entry->xact_depth--;
1397 : }
1398 2548 : }
1399 :
1400 : /*
1401 : * Cancel the currently-in-progress query (whose query text we do not have)
1402 : * and ignore the result. Returns true if we successfully cancel the query
1403 : * and discard any pending result, and false if not.
1404 : *
1405 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1406 : * recursion trouble, we'll end up slamming the connection shut, which will
1407 : * necessitate failing the entire toplevel transaction even if subtransactions
1408 : * were used. Try to use WARNING where we can.
1409 : *
1410 : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1411 : * query text from the pendingAreq saved in the per-connection state, then
1412 : * report the query using it.
1413 : */
1414 : static bool
1415 4 : pgfdw_cancel_query(PGconn *conn)
1416 : {
1417 4 : TimestampTz now = GetCurrentTimestamp();
1418 : TimestampTz endtime;
1419 : TimestampTz retrycanceltime;
1420 :
1421 : /*
1422 : * If it takes too long to cancel the query and discard the result, assume
1423 : * the connection is dead.
1424 : */
1425 4 : endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
1426 :
1427 : /*
1428 : * Also, lose patience and re-issue the cancel request after a little bit.
1429 : * (This serves to close some race conditions.)
1430 : */
1431 4 : retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
1432 :
1433 4 : if (!pgfdw_cancel_query_begin(conn, endtime))
1434 0 : return false;
1435 4 : return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
1436 : }
1437 :
1438 : /*
1439 : * Submit a cancel request to the given connection, waiting only until
1440 : * the given time.
1441 : *
1442 : * We sleep interruptibly until we receive confirmation that the cancel
1443 : * request has been accepted, and if it is, return true; if the timeout
1444 : * lapses without that, or the request fails for whatever reason, return
1445 : * false.
1446 : */
1447 : static bool
1448 4 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1449 : {
1450 4 : const char *errormsg = libpqsrv_cancel(conn, endtime);
1451 :
1452 4 : if (errormsg != NULL)
1453 0 : ereport(WARNING,
1454 : errcode(ERRCODE_CONNECTION_FAILURE),
1455 : errmsg("could not send cancel request: %s", errormsg));
1456 :
1457 4 : return errormsg == NULL;
1458 : }
1459 :
1460 : static bool
1461 4 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
1462 : TimestampTz retrycanceltime, bool consume_input)
1463 : {
1464 : PGresult *result;
1465 : bool timed_out;
1466 :
1467 : /*
1468 : * If requested, consume whatever data is available from the socket. (Note
1469 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1470 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1471 : * which would be large compared to the overhead of PQconsumeInput.)
1472 : */
1473 4 : if (consume_input && !PQconsumeInput(conn))
1474 : {
1475 0 : ereport(WARNING,
1476 : (errcode(ERRCODE_CONNECTION_FAILURE),
1477 : errmsg("could not get result of cancel request: %s",
1478 : pchomp(PQerrorMessage(conn)))));
1479 0 : return false;
1480 : }
1481 :
1482 : /* Get and discard the result of the query. */
1483 4 : if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
1484 : &result, &timed_out))
1485 : {
1486 0 : if (timed_out)
1487 0 : ereport(WARNING,
1488 : (errmsg("could not get result of cancel request due to timeout")));
1489 : else
1490 0 : ereport(WARNING,
1491 : (errcode(ERRCODE_CONNECTION_FAILURE),
1492 : errmsg("could not get result of cancel request: %s",
1493 : pchomp(PQerrorMessage(conn)))));
1494 :
1495 0 : return false;
1496 : }
1497 4 : PQclear(result);
1498 :
1499 4 : return true;
1500 : }
1501 :
1502 : /*
1503 : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1504 : * result. If the query is executed without error, the return value is true.
1505 : * If the query is executed successfully but returns an error, the return
1506 : * value is true if and only if ignore_errors is set. If the query can't be
1507 : * sent or times out, the return value is false.
1508 : *
1509 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1510 : * recursion trouble, we'll end up slamming the connection shut, which will
1511 : * necessitate failing the entire toplevel transaction even if subtransactions
1512 : * were used. Try to use WARNING where we can.
1513 : */
1514 : static bool
1515 118 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1516 : {
1517 : TimestampTz endtime;
1518 :
1519 : /*
1520 : * If it takes too long to execute a cleanup query, assume the connection
1521 : * is dead. It's fairly likely that this is why we aborted in the first
1522 : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1523 : * be too long.
1524 : */
1525 118 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1526 : CONNECTION_CLEANUP_TIMEOUT);
1527 :
1528 118 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
1529 0 : return false;
1530 118 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1531 : false, ignore_errors);
1532 : }
1533 :
1534 : static bool
1535 142 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1536 : {
1537 : Assert(query != NULL);
1538 :
1539 : /*
1540 : * Submit a query. Since we don't use non-blocking mode, this also can
1541 : * block. But its risk is relatively small, so we ignore that for now.
1542 : */
1543 142 : if (!PQsendQuery(conn, query))
1544 : {
1545 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1546 0 : return false;
1547 : }
1548 :
1549 142 : return true;
1550 : }
1551 :
1552 : static bool
1553 142 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1554 : TimestampTz endtime, bool consume_input,
1555 : bool ignore_errors)
1556 : {
1557 : PGresult *result;
1558 : bool timed_out;
1559 :
1560 : Assert(query != NULL);
1561 :
1562 : /*
1563 : * If requested, consume whatever data is available from the socket. (Note
1564 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1565 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1566 : * which would be large compared to the overhead of PQconsumeInput.)
1567 : */
1568 142 : if (consume_input && !PQconsumeInput(conn))
1569 : {
1570 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1571 0 : return false;
1572 : }
1573 :
1574 : /* Get the result of the query. */
1575 142 : if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
1576 : {
1577 0 : if (timed_out)
1578 0 : ereport(WARNING,
1579 : (errmsg("could not get query result due to timeout"),
1580 : errcontext("remote SQL command: %s", query)));
1581 : else
1582 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1583 :
1584 0 : return false;
1585 : }
1586 :
1587 : /* Issue a warning if not successful. */
1588 142 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1589 : {
1590 0 : pgfdw_report_error(WARNING, result, conn, true, query);
1591 0 : return ignore_errors;
1592 : }
1593 142 : PQclear(result);
1594 :
1595 142 : return true;
1596 : }
1597 :
1598 : /*
1599 : * Get, during abort cleanup, the result of a query that is in progress.
1600 : * This might be a query that is being interrupted by a cancel request or by
1601 : * transaction abort, or it might be a query that was initiated as part of
1602 : * transaction abort to get the remote side back to the appropriate state.
1603 : *
1604 : * endtime is the time at which we should give up and assume the remote side
1605 : * is dead. retrycanceltime is the time at which we should issue a fresh
1606 : * cancel request (pass the same value as endtime if this is not wanted).
1607 : *
1608 : * Returns true if the timeout expired or connection trouble occurred,
1609 : * false otherwise. Sets *result except in case of a true result.
1610 : * Sets *timed_out to true only when the timeout expired.
1611 : */
1612 : static bool
1613 146 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
1614 : TimestampTz retrycanceltime,
1615 : PGresult **result,
1616 : bool *timed_out)
1617 : {
1618 146 : volatile bool failed = false;
1619 146 : PGresult *volatile last_res = NULL;
1620 :
1621 146 : *result = NULL;
1622 146 : *timed_out = false;
1623 :
1624 : /* In what follows, do not leak any PGresults on an error. */
1625 146 : PG_TRY();
1626 : {
1627 146 : int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
1628 :
1629 : for (;;)
1630 160 : {
1631 : PGresult *res;
1632 :
1633 444 : while (PQisBusy(conn))
1634 : {
1635 : int wc;
1636 138 : TimestampTz now = GetCurrentTimestamp();
1637 : long cur_timeout;
1638 :
1639 : /* If timeout has expired, give up. */
1640 138 : if (now >= endtime)
1641 : {
1642 0 : *timed_out = true;
1643 0 : failed = true;
1644 0 : goto exit;
1645 : }
1646 :
1647 : /* If we need to re-issue the cancel request, do that. */
1648 138 : if (now >= retrycanceltime)
1649 : {
1650 : /* We ignore failure to issue the repeated request. */
1651 0 : (void) libpqsrv_cancel(conn, endtime);
1652 :
1653 : /* Recompute "now" in case that took measurable time. */
1654 0 : now = GetCurrentTimestamp();
1655 :
1656 : /* Adjust re-cancel timeout in increasing steps. */
1657 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
1658 : canceldelta);
1659 0 : canceldelta += canceldelta;
1660 : }
1661 :
1662 : /* If timeout has expired, give up, else get sleep time. */
1663 138 : cur_timeout = TimestampDifferenceMilliseconds(now,
1664 : Min(endtime,
1665 : retrycanceltime));
1666 138 : if (cur_timeout <= 0)
1667 : {
1668 0 : *timed_out = true;
1669 0 : failed = true;
1670 0 : goto exit;
1671 : }
1672 :
1673 : /* first time, allocate or get the custom wait event */
1674 138 : if (pgfdw_we_cleanup_result == 0)
1675 4 : pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1676 :
1677 : /* Sleep until there's something to do */
1678 138 : wc = WaitLatchOrSocket(MyLatch,
1679 : WL_LATCH_SET | WL_SOCKET_READABLE |
1680 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1681 : PQsocket(conn),
1682 : cur_timeout, pgfdw_we_cleanup_result);
1683 138 : ResetLatch(MyLatch);
1684 :
1685 138 : CHECK_FOR_INTERRUPTS();
1686 :
1687 : /* Data available in socket? */
1688 138 : if (wc & WL_SOCKET_READABLE)
1689 : {
1690 138 : if (!PQconsumeInput(conn))
1691 : {
1692 : /* connection trouble */
1693 0 : failed = true;
1694 0 : goto exit;
1695 : }
1696 : }
1697 : }
1698 :
1699 306 : res = PQgetResult(conn);
1700 306 : if (res == NULL)
1701 146 : break; /* query is complete */
1702 :
1703 160 : PQclear(last_res);
1704 160 : last_res = res;
1705 : }
1706 146 : exit: ;
1707 : }
1708 0 : PG_CATCH();
1709 : {
1710 0 : PQclear(last_res);
1711 0 : PG_RE_THROW();
1712 : }
1713 146 : PG_END_TRY();
1714 :
1715 146 : if (failed)
1716 0 : PQclear(last_res);
1717 : else
1718 146 : *result = last_res;
1719 146 : return failed;
1720 : }
1721 :
1722 : /*
1723 : * Abort remote transaction or subtransaction.
1724 : *
1725 : * "toplevel" should be set to true if toplevel (main) transaction is
1726 : * rollbacked, false otherwise.
1727 : *
1728 : * Set entry->changing_xact_state to false on success, true on failure.
1729 : */
1730 : static void
1731 86 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1732 : {
1733 : char sql[100];
1734 :
1735 : /*
1736 : * Don't try to clean up the connection if we're already in error
1737 : * recursion trouble.
1738 : */
1739 86 : if (in_error_recursion_trouble())
1740 0 : entry->changing_xact_state = true;
1741 :
1742 : /*
1743 : * If connection is already unsalvageable, don't touch it further.
1744 : */
1745 86 : if (entry->changing_xact_state)
1746 2 : return;
1747 :
1748 : /*
1749 : * Mark this connection as in the process of changing transaction state.
1750 : */
1751 84 : entry->changing_xact_state = true;
1752 :
1753 : /* Assume we might have lost track of prepared statements */
1754 84 : entry->have_error = true;
1755 :
1756 : /*
1757 : * If a command has been submitted to the remote server by using an
1758 : * asynchronous execution function, the command might not have yet
1759 : * completed. Check to see if a command is still being processed by the
1760 : * remote server, and if so, request cancellation of the command.
1761 : */
1762 84 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1763 4 : !pgfdw_cancel_query(entry->conn))
1764 0 : return; /* Unable to cancel running query */
1765 :
1766 84 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1767 84 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1768 0 : return; /* Unable to abort remote (sub)transaction */
1769 :
1770 84 : if (toplevel)
1771 : {
1772 78 : if (entry->have_prep_stmt && entry->have_error &&
1773 34 : !pgfdw_exec_cleanup_query(entry->conn,
1774 : "DEALLOCATE ALL",
1775 : true))
1776 0 : return; /* Trouble clearing prepared statements */
1777 :
1778 78 : entry->have_prep_stmt = false;
1779 78 : entry->have_error = false;
1780 : }
1781 :
1782 : /*
1783 : * If pendingAreq of the per-connection state is not NULL, it means that
1784 : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1785 : * successfully and thus the per-connection state was not reset in
1786 : * fetch_more_data(); in that case reset the per-connection state here.
1787 : */
1788 84 : if (entry->state.pendingAreq)
1789 2 : memset(&entry->state, 0, sizeof(entry->state));
1790 :
1791 : /* Disarm changing_xact_state if it all worked */
1792 84 : entry->changing_xact_state = false;
1793 : }
1794 :
1795 : /*
1796 : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1797 : * don't wait for the result.
1798 : *
1799 : * Returns true if the abort command or cancel request is successfully issued,
1800 : * false otherwise. If the abort command is successfully issued, the given
1801 : * connection cache entry is appended to *pending_entries. Otherwise, if the
1802 : * cancel request is successfully issued, it is appended to *cancel_requested.
1803 : */
1804 : static bool
1805 16 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1806 : List **pending_entries, List **cancel_requested)
1807 : {
1808 : /*
1809 : * Don't try to clean up the connection if we're already in error
1810 : * recursion trouble.
1811 : */
1812 16 : if (in_error_recursion_trouble())
1813 0 : entry->changing_xact_state = true;
1814 :
1815 : /*
1816 : * If connection is already unsalvageable, don't touch it further.
1817 : */
1818 16 : if (entry->changing_xact_state)
1819 0 : return false;
1820 :
1821 : /*
1822 : * Mark this connection as in the process of changing transaction state.
1823 : */
1824 16 : entry->changing_xact_state = true;
1825 :
1826 : /* Assume we might have lost track of prepared statements */
1827 16 : entry->have_error = true;
1828 :
1829 : /*
1830 : * If a command has been submitted to the remote server by using an
1831 : * asynchronous execution function, the command might not have yet
1832 : * completed. Check to see if a command is still being processed by the
1833 : * remote server, and if so, request cancellation of the command.
1834 : */
1835 16 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1836 : {
1837 : TimestampTz endtime;
1838 :
1839 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1840 : CONNECTION_CLEANUP_TIMEOUT);
1841 0 : if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1842 0 : return false; /* Unable to cancel running query */
1843 0 : *cancel_requested = lappend(*cancel_requested, entry);
1844 : }
1845 : else
1846 : {
1847 : char sql[100];
1848 :
1849 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1850 16 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1851 0 : return false; /* Unable to abort remote transaction */
1852 16 : *pending_entries = lappend(*pending_entries, entry);
1853 : }
1854 :
1855 16 : return true;
1856 : }
1857 :
1858 : /*
1859 : * Finish pre-commit cleanup of connections on each of which we've sent a
1860 : * COMMIT command to the remote server.
1861 : */
1862 : static void
1863 26 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1864 : {
1865 : ConnCacheEntry *entry;
1866 26 : List *pending_deallocs = NIL;
1867 : ListCell *lc;
1868 :
1869 : Assert(pending_entries);
1870 :
1871 : /*
1872 : * Get the result of the COMMIT command for each of the pending entries
1873 : */
1874 58 : foreach(lc, pending_entries)
1875 : {
1876 32 : entry = (ConnCacheEntry *) lfirst(lc);
1877 :
1878 : Assert(entry->changing_xact_state);
1879 :
1880 : /*
1881 : * We might already have received the result on the socket, so pass
1882 : * consume_input=true to try to consume it first
1883 : */
1884 32 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1885 32 : entry->changing_xact_state = false;
1886 :
1887 : /* Do a DEALLOCATE ALL in parallel if needed */
1888 32 : if (entry->have_prep_stmt && entry->have_error)
1889 : {
1890 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1891 4 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1892 : {
1893 4 : pending_deallocs = lappend(pending_deallocs, entry);
1894 4 : continue;
1895 : }
1896 : }
1897 28 : entry->have_prep_stmt = false;
1898 28 : entry->have_error = false;
1899 :
1900 28 : pgfdw_reset_xact_state(entry, true);
1901 : }
1902 :
1903 : /* No further work if no pending entries */
1904 26 : if (!pending_deallocs)
1905 24 : return;
1906 :
1907 : /*
1908 : * Get the result of the DEALLOCATE command for each of the pending
1909 : * entries
1910 : */
1911 6 : foreach(lc, pending_deallocs)
1912 : {
1913 : PGresult *res;
1914 :
1915 4 : entry = (ConnCacheEntry *) lfirst(lc);
1916 :
1917 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1918 8 : while ((res = PQgetResult(entry->conn)) != NULL)
1919 : {
1920 4 : PQclear(res);
1921 : /* Stop if the connection is lost (else we'll loop infinitely) */
1922 4 : if (PQstatus(entry->conn) == CONNECTION_BAD)
1923 0 : break;
1924 : }
1925 4 : entry->have_prep_stmt = false;
1926 4 : entry->have_error = false;
1927 :
1928 4 : pgfdw_reset_xact_state(entry, true);
1929 : }
1930 : }
1931 :
1932 : /*
1933 : * Finish pre-subcommit cleanup of connections on each of which we've sent a
1934 : * RELEASE command to the remote server.
1935 : */
1936 : static void
1937 2 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1938 : {
1939 : ConnCacheEntry *entry;
1940 : char sql[100];
1941 : ListCell *lc;
1942 :
1943 : Assert(pending_entries);
1944 :
1945 : /*
1946 : * Get the result of the RELEASE command for each of the pending entries
1947 : */
1948 2 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1949 6 : foreach(lc, pending_entries)
1950 : {
1951 4 : entry = (ConnCacheEntry *) lfirst(lc);
1952 :
1953 : Assert(entry->changing_xact_state);
1954 :
1955 : /*
1956 : * We might already have received the result on the socket, so pass
1957 : * consume_input=true to try to consume it first
1958 : */
1959 4 : do_sql_command_end(entry->conn, sql, true);
1960 4 : entry->changing_xact_state = false;
1961 :
1962 4 : pgfdw_reset_xact_state(entry, false);
1963 : }
1964 2 : }
1965 :
1966 : /*
1967 : * Finish abort cleanup of connections on each of which we've sent an abort
1968 : * command or cancel request to the remote server.
1969 : */
1970 : static void
1971 8 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1972 : bool toplevel)
1973 : {
1974 8 : List *pending_deallocs = NIL;
1975 : ListCell *lc;
1976 :
1977 : /*
1978 : * For each of the pending cancel requests (if any), get and discard the
1979 : * result of the query, and submit an abort command to the remote server.
1980 : */
1981 8 : if (cancel_requested)
1982 : {
1983 0 : foreach(lc, cancel_requested)
1984 : {
1985 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1986 0 : TimestampTz now = GetCurrentTimestamp();
1987 : TimestampTz endtime;
1988 : TimestampTz retrycanceltime;
1989 : char sql[100];
1990 :
1991 : Assert(entry->changing_xact_state);
1992 :
1993 : /*
1994 : * Set end time. You might think we should do this before issuing
1995 : * cancel request like in normal mode, but that is problematic,
1996 : * because if, for example, it took longer than 30 seconds to
1997 : * process the first few entries in the cancel_requested list, it
1998 : * would cause a timeout error when processing each of the
1999 : * remaining entries in the list, leading to slamming that entry's
2000 : * connection shut.
2001 : */
2002 0 : endtime = TimestampTzPlusMilliseconds(now,
2003 : CONNECTION_CLEANUP_TIMEOUT);
2004 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
2005 : RETRY_CANCEL_TIMEOUT);
2006 :
2007 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime,
2008 : retrycanceltime, true))
2009 : {
2010 : /* Unable to cancel running query */
2011 0 : pgfdw_reset_xact_state(entry, toplevel);
2012 0 : continue;
2013 : }
2014 :
2015 : /* Send an abort command in parallel if needed */
2016 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2017 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
2018 : {
2019 : /* Unable to abort remote (sub)transaction */
2020 0 : pgfdw_reset_xact_state(entry, toplevel);
2021 : }
2022 : else
2023 0 : pending_entries = lappend(pending_entries, entry);
2024 : }
2025 : }
2026 :
2027 : /* No further work if no pending entries */
2028 8 : if (!pending_entries)
2029 0 : return;
2030 :
2031 : /*
2032 : * Get the result of the abort command for each of the pending entries
2033 : */
2034 24 : foreach(lc, pending_entries)
2035 : {
2036 16 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2037 : TimestampTz endtime;
2038 : char sql[100];
2039 :
2040 : Assert(entry->changing_xact_state);
2041 :
2042 : /*
2043 : * Set end time. We do this now, not before issuing the command like
2044 : * in normal mode, for the same reason as for the cancel_requested
2045 : * entries.
2046 : */
2047 16 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2048 : CONNECTION_CLEANUP_TIMEOUT);
2049 :
2050 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2051 16 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
2052 : true, false))
2053 : {
2054 : /* Unable to abort remote (sub)transaction */
2055 0 : pgfdw_reset_xact_state(entry, toplevel);
2056 8 : continue;
2057 : }
2058 :
2059 16 : if (toplevel)
2060 : {
2061 : /* Do a DEALLOCATE ALL in parallel if needed */
2062 8 : if (entry->have_prep_stmt && entry->have_error)
2063 : {
2064 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
2065 : "DEALLOCATE ALL"))
2066 : {
2067 : /* Trouble clearing prepared statements */
2068 0 : pgfdw_reset_xact_state(entry, toplevel);
2069 : }
2070 : else
2071 8 : pending_deallocs = lappend(pending_deallocs, entry);
2072 8 : continue;
2073 : }
2074 0 : entry->have_prep_stmt = false;
2075 0 : entry->have_error = false;
2076 : }
2077 :
2078 : /* Reset the per-connection state if needed */
2079 8 : if (entry->state.pendingAreq)
2080 0 : memset(&entry->state, 0, sizeof(entry->state));
2081 :
2082 : /* We're done with this entry; unset the changing_xact_state flag */
2083 8 : entry->changing_xact_state = false;
2084 8 : pgfdw_reset_xact_state(entry, toplevel);
2085 : }
2086 :
2087 : /* No further work if no pending entries */
2088 8 : if (!pending_deallocs)
2089 4 : return;
2090 : Assert(toplevel);
2091 :
2092 : /*
2093 : * Get the result of the DEALLOCATE command for each of the pending
2094 : * entries
2095 : */
2096 12 : foreach(lc, pending_deallocs)
2097 : {
2098 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2099 : TimestampTz endtime;
2100 :
2101 : Assert(entry->changing_xact_state);
2102 : Assert(entry->have_prep_stmt);
2103 : Assert(entry->have_error);
2104 :
2105 : /*
2106 : * Set end time. We do this now, not before issuing the command like
2107 : * in normal mode, for the same reason as for the cancel_requested
2108 : * entries.
2109 : */
2110 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2111 : CONNECTION_CLEANUP_TIMEOUT);
2112 :
2113 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
2114 : endtime, true, true))
2115 : {
2116 : /* Trouble clearing prepared statements */
2117 0 : pgfdw_reset_xact_state(entry, toplevel);
2118 0 : continue;
2119 : }
2120 8 : entry->have_prep_stmt = false;
2121 8 : entry->have_error = false;
2122 :
2123 : /* Reset the per-connection state if needed */
2124 8 : if (entry->state.pendingAreq)
2125 0 : memset(&entry->state, 0, sizeof(entry->state));
2126 :
2127 : /* We're done with this entry; unset the changing_xact_state flag */
2128 8 : entry->changing_xact_state = false;
2129 8 : pgfdw_reset_xact_state(entry, toplevel);
2130 : }
2131 : }
2132 :
2133 : /* Number of output arguments (columns) for various API versions */
2134 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2135 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 6
2136 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 6 /* maximum of above */
2137 :
2138 : /*
2139 : * Internal function used by postgres_fdw_get_connections variants.
2140 : *
2141 : * For API version 1.1, this function takes no input parameter and
2142 : * returns a set of records with the following values:
2143 : *
2144 : * - server_name - server name of active connection. In case the foreign server
2145 : * is dropped but still the connection is active, then the server name will
2146 : * be NULL in output.
2147 : * - valid - true/false representing whether the connection is valid or not.
2148 : * Note that connections can become invalid in pgfdw_inval_callback.
2149 : *
2150 : * For API version 1.2 and later, this function takes an input parameter
2151 : * to check a connection status and returns the following
2152 : * additional values along with the four values from version 1.1:
2153 : *
2154 : * - user_name - the local user name of the active connection. In case the
2155 : * user mapping is dropped but the connection is still active, then the
2156 : * user name will be NULL in the output.
2157 : * - used_in_xact - true if the connection is used in the current transaction.
2158 : * - closed - true if the connection is closed.
2159 : * - remote_backend_pid - process ID of the remote backend, on the foreign
2160 : * server, handling the connection.
2161 : *
2162 : * No records are returned when there are no cached connections at all.
2163 : */
2164 : static void
2165 26 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
2166 : enum pgfdwVersion api_version)
2167 : {
2168 26 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2169 : HASH_SEQ_STATUS scan;
2170 : ConnCacheEntry *entry;
2171 :
2172 26 : InitMaterializedSRF(fcinfo, 0);
2173 :
2174 : /* If cache doesn't exist, we return no records */
2175 26 : if (!ConnectionHash)
2176 0 : return;
2177 :
2178 : /* Check we have the expected number of output arguments */
2179 26 : switch (rsinfo->setDesc->natts)
2180 : {
2181 0 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
2182 0 : if (api_version != PGFDW_V1_1)
2183 0 : elog(ERROR, "incorrect number of output arguments");
2184 0 : break;
2185 26 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
2186 26 : if (api_version != PGFDW_V1_2)
2187 0 : elog(ERROR, "incorrect number of output arguments");
2188 26 : break;
2189 0 : default:
2190 0 : elog(ERROR, "incorrect number of output arguments");
2191 : }
2192 :
2193 26 : hash_seq_init(&scan, ConnectionHash);
2194 226 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2195 : {
2196 : ForeignServer *server;
2197 200 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2198 200 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2199 200 : int i = 0;
2200 :
2201 : /* We only look for open remote connections */
2202 200 : if (!entry->conn)
2203 174 : continue;
2204 :
2205 26 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2206 :
2207 : /*
2208 : * The foreign server may have been dropped in current explicit
2209 : * transaction. It is not possible to drop the server from another
2210 : * session when the connection associated with it is in use in the
2211 : * current transaction, if tried so, the drop query in another session
2212 : * blocks until the current transaction finishes.
2213 : *
2214 : * Even though the server is dropped in the current transaction, the
2215 : * cache can still have associated active connection entry, say we
2216 : * call such connections dangling. Since we can not fetch the server
2217 : * name from system catalogs for dangling connections, instead we show
2218 : * NULL value for server name in output.
2219 : *
2220 : * We could have done better by storing the server name in the cache
2221 : * entry instead of server oid so that it could be used in the output.
2222 : * But the server name in each cache entry requires 64 bytes of
2223 : * memory, which is huge, when there are many cached connections and
2224 : * the use case i.e. dropping the foreign server within the explicit
2225 : * current transaction seems rare. So, we chose to show NULL value for
2226 : * server name in output.
2227 : *
2228 : * Such dangling connections get closed either in next use or at the
2229 : * end of current explicit transaction in pgfdw_xact_callback.
2230 : */
2231 26 : if (!server)
2232 : {
2233 : /*
2234 : * If the server has been dropped in the current explicit
2235 : * transaction, then this entry would have been invalidated in
2236 : * pgfdw_inval_callback at the end of drop server command. Note
2237 : * that this connection would not have been closed in
2238 : * pgfdw_inval_callback because it is still being used in the
2239 : * current explicit transaction. So, assert that here.
2240 : */
2241 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2242 :
2243 : /* Show null, if no server name was found */
2244 2 : nulls[i++] = true;
2245 : }
2246 : else
2247 24 : values[i++] = CStringGetTextDatum(server->servername);
2248 :
2249 26 : if (api_version >= PGFDW_V1_2)
2250 : {
2251 : HeapTuple tp;
2252 :
2253 : /* Use the system cache to obtain the user mapping */
2254 26 : tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
2255 :
2256 : /*
2257 : * Just like in the foreign server case, user mappings can also be
2258 : * dropped in the current explicit transaction. Therefore, the
2259 : * similar check as in the server case is required.
2260 : */
2261 26 : if (!HeapTupleIsValid(tp))
2262 : {
2263 : /*
2264 : * If we reach here, this entry must have been invalidated in
2265 : * pgfdw_inval_callback, same as in the server case.
2266 : */
2267 : Assert(entry->conn && entry->xact_depth > 0 &&
2268 : entry->invalidated);
2269 :
2270 2 : nulls[i++] = true;
2271 : }
2272 : else
2273 : {
2274 : Oid userid;
2275 :
2276 24 : userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2277 24 : values[i++] = CStringGetTextDatum(MappingUserName(userid));
2278 24 : ReleaseSysCache(tp);
2279 : }
2280 : }
2281 :
2282 26 : values[i++] = BoolGetDatum(!entry->invalidated);
2283 :
2284 26 : if (api_version >= PGFDW_V1_2)
2285 : {
2286 26 : bool check_conn = PG_GETARG_BOOL(0);
2287 :
2288 : /* Is this connection used in the current transaction? */
2289 26 : values[i++] = BoolGetDatum(entry->xact_depth > 0);
2290 :
2291 : /*
2292 : * If a connection status check is requested and supported, return
2293 : * whether the connection is closed. Otherwise, return NULL.
2294 : */
2295 26 : if (check_conn && pgfdw_conn_checkable())
2296 4 : values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2297 : else
2298 22 : nulls[i++] = true;
2299 :
2300 : /* Return process ID of remote backend */
2301 26 : values[i++] = Int32GetDatum(PQbackendPID(entry->conn));
2302 : }
2303 :
2304 26 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2305 : }
2306 : }
2307 :
2308 : /*
2309 : * List active foreign server connections.
2310 : *
2311 : * The SQL API of this function has changed multiple times, and will likely
2312 : * do so again in future. To support the case where a newer version of this
2313 : * loadable module is being used with an old SQL declaration of the function,
2314 : * we continue to support the older API versions.
2315 : */
2316 : Datum
2317 26 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
2318 : {
2319 26 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
2320 :
2321 26 : PG_RETURN_VOID();
2322 : }
2323 :
2324 : Datum
2325 0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
2326 : {
2327 0 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
2328 :
2329 0 : PG_RETURN_VOID();
2330 : }
2331 :
2332 : /*
2333 : * Disconnect the specified cached connections.
2334 : *
2335 : * This function discards the open connections that are established by
2336 : * postgres_fdw from the local session to the foreign server with
2337 : * the given name. Note that there can be multiple connections to
2338 : * the given server using different user mappings. If the connections
2339 : * are used in the current local transaction, they are not disconnected
2340 : * and warning messages are reported. This function returns true
2341 : * if it disconnects at least one connection, otherwise false. If no
2342 : * foreign server with the given name is found, an error is reported.
2343 : */
2344 : Datum
2345 8 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2346 : {
2347 : ForeignServer *server;
2348 : char *servername;
2349 :
2350 8 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2351 8 : server = GetForeignServerByName(servername, false);
2352 :
2353 6 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2354 : }
2355 :
2356 : /*
2357 : * Disconnect all the cached connections.
2358 : *
2359 : * This function discards all the open connections that are established by
2360 : * postgres_fdw from the local session to the foreign servers.
2361 : * If the connections are used in the current local transaction, they are
2362 : * not disconnected and warning messages are reported. This function
2363 : * returns true if it disconnects at least one connection, otherwise false.
2364 : */
2365 : Datum
2366 10 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2367 : {
2368 10 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2369 : }
2370 :
2371 : /*
2372 : * Workhorse to disconnect cached connections.
2373 : *
2374 : * This function scans all the connection cache entries and disconnects
2375 : * the open connections whose foreign server OID matches with
2376 : * the specified one. If InvalidOid is specified, it disconnects all
2377 : * the cached connections.
2378 : *
2379 : * This function emits a warning for each connection that's used in
2380 : * the current transaction and doesn't close it. It returns true if
2381 : * it disconnects at least one connection, otherwise false.
2382 : *
2383 : * Note that this function disconnects even the connections that are
2384 : * established by other users in the same local session using different
2385 : * user mappings. This leads even non-superuser to be able to close
2386 : * the connections established by superusers in the same local session.
2387 : *
2388 : * XXX As of now we don't see any security risk doing this. But we should
2389 : * set some restrictions on that, for example, prevent non-superuser
2390 : * from closing the connections established by superusers even
2391 : * in the same session?
2392 : */
2393 : static bool
2394 16 : disconnect_cached_connections(Oid serverid)
2395 : {
2396 : HASH_SEQ_STATUS scan;
2397 : ConnCacheEntry *entry;
2398 16 : bool all = !OidIsValid(serverid);
2399 16 : bool result = false;
2400 :
2401 : /*
2402 : * Connection cache hashtable has not been initialized yet in this
2403 : * session, so return false.
2404 : */
2405 16 : if (!ConnectionHash)
2406 0 : return false;
2407 :
2408 16 : hash_seq_init(&scan, ConnectionHash);
2409 134 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2410 : {
2411 : /* Ignore cache entry if no open connection right now. */
2412 118 : if (!entry->conn)
2413 96 : continue;
2414 :
2415 22 : if (all || entry->serverid == serverid)
2416 : {
2417 : /*
2418 : * Emit a warning because the connection to close is used in the
2419 : * current transaction and cannot be disconnected right now.
2420 : */
2421 16 : if (entry->xact_depth > 0)
2422 : {
2423 : ForeignServer *server;
2424 :
2425 6 : server = GetForeignServerExtended(entry->serverid,
2426 : FSV_MISSING_OK);
2427 :
2428 6 : if (!server)
2429 : {
2430 : /*
2431 : * If the foreign server was dropped while its connection
2432 : * was used in the current transaction, the connection
2433 : * must have been marked as invalid by
2434 : * pgfdw_inval_callback at the end of DROP SERVER command.
2435 : */
2436 : Assert(entry->invalidated);
2437 :
2438 0 : ereport(WARNING,
2439 : (errmsg("cannot close dropped server connection because it is still in use")));
2440 : }
2441 : else
2442 6 : ereport(WARNING,
2443 : (errmsg("cannot close connection for server \"%s\" because it is still in use",
2444 : server->servername)));
2445 : }
2446 : else
2447 : {
2448 10 : elog(DEBUG3, "discarding connection %p", entry->conn);
2449 10 : disconnect_pg_server(entry);
2450 10 : result = true;
2451 : }
2452 : }
2453 : }
2454 :
2455 16 : return result;
2456 : }
2457 :
2458 : /*
2459 : * Check if the remote server closed the connection.
2460 : *
2461 : * Returns 1 if the connection is closed, -1 if an error occurred,
2462 : * and 0 if it's not closed or if the connection check is unavailable
2463 : * on this platform.
2464 : */
2465 : static int
2466 4 : pgfdw_conn_check(PGconn *conn)
2467 : {
2468 4 : int sock = PQsocket(conn);
2469 :
2470 4 : if (PQstatus(conn) != CONNECTION_OK || sock == -1)
2471 0 : return -1;
2472 :
2473 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2474 : {
2475 : struct pollfd input_fd;
2476 : int result;
2477 :
2478 4 : input_fd.fd = sock;
2479 4 : input_fd.events = POLLRDHUP;
2480 4 : input_fd.revents = 0;
2481 :
2482 : do
2483 4 : result = poll(&input_fd, 1, 0);
2484 4 : while (result < 0 && errno == EINTR);
2485 :
2486 4 : if (result < 0)
2487 0 : return -1;
2488 :
2489 4 : return (input_fd.revents &
2490 4 : (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2491 : }
2492 : #else
2493 : return 0;
2494 : #endif
2495 : }
2496 :
2497 : /*
2498 : * Check if connection status checking is available on this platform.
2499 : *
2500 : * Returns true if available, false otherwise.
2501 : */
2502 : static bool
2503 4 : pgfdw_conn_checkable(void)
2504 : {
2505 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2506 4 : return true;
2507 : #else
2508 : return false;
2509 : #endif
2510 : }
2511 :
2512 : /*
2513 : * Ensure that require_auth and SCRAM keys are correctly set on values. SCRAM
2514 : * keys used to pass-through are coming from the initial connection from the
2515 : * client with the server.
2516 : *
2517 : * All required SCRAM options are set by postgres_fdw, so we just need to
2518 : * ensure that these options are not overwritten by the user.
2519 : */
2520 : static bool
2521 18 : pgfdw_has_required_scram_options(const char **keywords, const char **values)
2522 : {
2523 18 : bool has_scram_server_key = false;
2524 18 : bool has_scram_client_key = false;
2525 18 : bool has_require_auth = false;
2526 18 : bool has_scram_keys = false;
2527 :
2528 : /*
2529 : * Continue iterating even if we found the keys that we need to validate
2530 : * to make sure that there is no other declaration of these keys that can
2531 : * overwrite the first.
2532 : */
2533 180 : for (int i = 0; keywords[i] != NULL; i++)
2534 : {
2535 162 : if (strcmp(keywords[i], "scram_client_key") == 0)
2536 : {
2537 18 : if (values[i] != NULL && values[i][0] != '\0')
2538 18 : has_scram_client_key = true;
2539 : else
2540 0 : has_scram_client_key = false;
2541 : }
2542 :
2543 162 : if (strcmp(keywords[i], "scram_server_key") == 0)
2544 : {
2545 18 : if (values[i] != NULL && values[i][0] != '\0')
2546 18 : has_scram_server_key = true;
2547 : else
2548 0 : has_scram_server_key = false;
2549 : }
2550 :
2551 162 : if (strcmp(keywords[i], "require_auth") == 0)
2552 : {
2553 18 : if (values[i] != NULL && strcmp(values[i], "scram-sha-256") == 0)
2554 18 : has_require_auth = true;
2555 : else
2556 0 : has_require_auth = false;
2557 : }
2558 : }
2559 :
2560 18 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
2561 :
2562 18 : return (has_scram_keys && has_require_auth);
2563 : }
|