Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * connection.c
4 : * Connection management functions for postgres_fdw
5 : *
6 : * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/postgres_fdw/connection.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #if HAVE_POLL_H
16 : #include <poll.h>
17 : #endif
18 :
19 : #include "access/xact.h"
20 : #include "catalog/pg_user_mapping.h"
21 : #include "commands/defrem.h"
22 : #include "funcapi.h"
23 : #include "libpq/libpq-be.h"
24 : #include "libpq/libpq-be-fe-helpers.h"
25 : #include "mb/pg_wchar.h"
26 : #include "miscadmin.h"
27 : #include "pgstat.h"
28 : #include "postgres_fdw.h"
29 : #include "storage/latch.h"
30 : #include "utils/builtins.h"
31 : #include "utils/hsearch.h"
32 : #include "utils/inval.h"
33 : #include "utils/syscache.h"
34 :
35 : /*
36 : * Connection cache hash table entry
37 : *
38 : * The lookup key in this hash table is the user mapping OID. We use just one
39 : * connection per user mapping ID, which ensures that all the scans use the
40 : * same snapshot during a query. Using the user mapping OID rather than
41 : * the foreign server OID + user OID avoids creating multiple connections when
42 : * the public user mapping applies to all user OIDs.
43 : *
44 : * The "conn" pointer can be NULL if we don't currently have a live connection.
45 : * When we do have a connection, xact_depth tracks the current depth of
46 : * transactions and subtransactions open on the remote side. We need to issue
47 : * commands at the same nesting depth on the remote as we're executing at
48 : * ourselves, so that rolling back a subtransaction will kill the right
49 : * queries and not the wrong ones.
50 : */
51 : typedef Oid ConnCacheKey;
52 :
53 : typedef struct ConnCacheEntry
54 : {
55 : ConnCacheKey key; /* hash key (must be first) */
56 : PGconn *conn; /* connection to foreign server, or NULL */
57 : /* Remaining fields are invalid when conn is NULL: */
58 : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
59 : * one level of subxact open, etc */
60 : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
61 : bool have_error; /* have any subxacts aborted in this xact? */
62 : bool changing_xact_state; /* xact state change in process */
63 : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
64 : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
65 : bool invalidated; /* true if reconnect is pending */
66 : bool keep_connections; /* setting value of keep_connections
67 : * server option */
68 : Oid serverid; /* foreign server OID used to get server name */
69 : uint32 server_hashvalue; /* hash value of foreign server OID */
70 : uint32 mapping_hashvalue; /* hash value of user mapping OID */
71 : PgFdwConnState state; /* extra per-connection state */
72 : } ConnCacheEntry;
73 :
74 : /*
75 : * Connection cache (initialized on first use)
76 : */
77 : static HTAB *ConnectionHash = NULL;
78 :
79 : /* for assigning cursor numbers and prepared statement numbers */
80 : static unsigned int cursor_number = 0;
81 : static unsigned int prep_stmt_number = 0;
82 :
83 : /* tracks whether any work is needed in callback functions */
84 : static bool xact_got_connection = false;
85 :
86 : /* custom wait event values, retrieved from shared memory */
87 : static uint32 pgfdw_we_cleanup_result = 0;
88 : static uint32 pgfdw_we_connect = 0;
89 : static uint32 pgfdw_we_get_result = 0;
90 :
91 : /*
92 : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
93 : * query; if it takes longer than 30 seconds to do these, we assume the
94 : * connection is dead.
95 : */
96 : #define CONNECTION_CLEANUP_TIMEOUT 30000
97 :
98 : /* Macro for constructing abort command to be sent */
99 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
100 : do { \
101 : if (toplevel) \
102 : snprintf((sql), sizeof(sql), \
103 : "ABORT TRANSACTION"); \
104 : else \
105 : snprintf((sql), sizeof(sql), \
106 : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
107 : (entry)->xact_depth, (entry)->xact_depth); \
108 : } while(0)
109 :
110 : /*
111 : * Extension version number, for supporting older extension versions' objects
112 : */
113 : enum pgfdwVersion
114 : {
115 : PGFDW_V1_1 = 0,
116 : PGFDW_V1_2,
117 : };
118 :
119 : /*
120 : * SQL functions
121 : */
122 2 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
123 4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
124 4 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
125 4 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
126 :
127 : /* prototypes of private functions */
128 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
129 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
130 : static void disconnect_pg_server(ConnCacheEntry *entry);
131 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
132 : static void configure_remote_session(PGconn *conn);
133 : static void do_sql_command_begin(PGconn *conn, const char *sql);
134 : static void do_sql_command_end(PGconn *conn, const char *sql,
135 : bool consume_input);
136 : static void begin_remote_xact(ConnCacheEntry *entry);
137 : static void pgfdw_xact_callback(XactEvent event, void *arg);
138 : static void pgfdw_subxact_callback(SubXactEvent event,
139 : SubTransactionId mySubid,
140 : SubTransactionId parentSubid,
141 : void *arg);
142 : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
143 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
144 : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
145 : static bool pgfdw_cancel_query(PGconn *conn);
146 : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
147 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
148 : bool consume_input);
149 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
150 : bool ignore_errors);
151 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
152 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
153 : TimestampTz endtime,
154 : bool consume_input,
155 : bool ignore_errors);
156 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
157 : PGresult **result, bool *timed_out);
158 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
159 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
160 : List **pending_entries,
161 : List **cancel_requested);
162 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
163 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
164 : int curlevel);
165 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
166 : List *cancel_requested,
167 : bool toplevel);
168 : static void pgfdw_security_check(const char **keywords, const char **values,
169 : UserMapping *user, PGconn *conn);
170 : static bool UserMappingPasswordRequired(UserMapping *user);
171 : static bool disconnect_cached_connections(Oid serverid);
172 : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
173 : enum pgfdwVersion api_version);
174 : static int pgfdw_conn_check(PGconn *conn);
175 : static bool pgfdw_conn_checkable(void);
176 :
177 : /*
178 : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
179 : * server with the user's authorization. A new connection is established
180 : * if we don't already have a suitable one, and a transaction is opened at
181 : * the right subtransaction nesting depth if we didn't do that already.
182 : *
183 : * will_prep_stmt must be true if caller intends to create any prepared
184 : * statements. Since those don't go away automatically at transaction end
185 : * (not even on error), we need this flag to cue manual cleanup.
186 : *
187 : * If state is not NULL, *state receives the per-connection state associated
188 : * with the PGconn.
189 : */
190 : PGconn *
191 4192 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
192 : {
193 : bool found;
194 4192 : bool retry = false;
195 : ConnCacheEntry *entry;
196 : ConnCacheKey key;
197 4192 : MemoryContext ccxt = CurrentMemoryContext;
198 :
199 : /* First time through, initialize connection cache hashtable */
200 4192 : if (ConnectionHash == NULL)
201 : {
202 : HASHCTL ctl;
203 :
204 10 : if (pgfdw_we_get_result == 0)
205 10 : pgfdw_we_get_result =
206 10 : WaitEventExtensionNew("PostgresFdwGetResult");
207 :
208 10 : ctl.keysize = sizeof(ConnCacheKey);
209 10 : ctl.entrysize = sizeof(ConnCacheEntry);
210 10 : ConnectionHash = hash_create("postgres_fdw connections", 8,
211 : &ctl,
212 : HASH_ELEM | HASH_BLOBS);
213 :
214 : /*
215 : * Register some callback functions that manage connection cleanup.
216 : * This should be done just once in each backend.
217 : */
218 10 : RegisterXactCallback(pgfdw_xact_callback, NULL);
219 10 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
220 10 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
221 : pgfdw_inval_callback, (Datum) 0);
222 10 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
223 : pgfdw_inval_callback, (Datum) 0);
224 : }
225 :
226 : /* Set flag that we did GetConnection during the current transaction */
227 4192 : xact_got_connection = true;
228 :
229 : /* Create hash key for the entry. Assume no pad bytes in key struct */
230 4192 : key = user->umid;
231 :
232 : /*
233 : * Find or create cached entry for requested connection.
234 : */
235 4192 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
236 4192 : if (!found)
237 : {
238 : /*
239 : * We need only clear "conn" here; remaining fields will be filled
240 : * later when "conn" is set.
241 : */
242 26 : entry->conn = NULL;
243 : }
244 :
245 : /* Reject further use of connections which failed abort cleanup. */
246 4192 : pgfdw_reject_incomplete_xact_state_change(entry);
247 :
248 : /*
249 : * If the connection needs to be remade due to invalidation, disconnect as
250 : * soon as we're out of all transactions.
251 : */
252 4192 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
253 : {
254 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
255 : entry->conn);
256 0 : disconnect_pg_server(entry);
257 : }
258 :
259 : /*
260 : * If cache entry doesn't have a connection, we have to establish a new
261 : * connection. (If connect_pg_server throws an error, the cache entry
262 : * will remain in a valid empty state, ie conn == NULL.)
263 : */
264 4192 : if (entry->conn == NULL)
265 132 : make_new_connection(entry, user);
266 :
267 : /*
268 : * We check the health of the cached connection here when using it. In
269 : * cases where we're out of all transactions, if a broken connection is
270 : * detected, we try to reestablish a new connection later.
271 : */
272 4180 : PG_TRY();
273 : {
274 : /* Process a pending asynchronous request if any. */
275 4180 : if (entry->state.pendingAreq)
276 0 : process_pending_request(entry->state.pendingAreq);
277 : /* Start a new transaction or subtransaction if needed. */
278 4180 : begin_remote_xact(entry);
279 : }
280 4 : PG_CATCH();
281 : {
282 4 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
283 4 : ErrorData *errdata = CopyErrorData();
284 :
285 : /*
286 : * Determine whether to try to reestablish the connection.
287 : *
288 : * After a broken connection is detected in libpq, any error other
289 : * than connection failure (e.g., out-of-memory) can be thrown
290 : * somewhere between return from libpq and the expected ereport() call
291 : * in pgfdw_report_error(). In this case, since PQstatus() indicates
292 : * CONNECTION_BAD, checking only PQstatus() causes the false detection
293 : * of connection failure. To avoid this, we also verify that the
294 : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
295 : * checking only the sqlstate can cause another false detection
296 : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
297 : * for any libpq-originated error condition.
298 : */
299 4 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
300 4 : PQstatus(entry->conn) != CONNECTION_BAD ||
301 4 : entry->xact_depth > 0)
302 : {
303 2 : MemoryContextSwitchTo(ecxt);
304 2 : PG_RE_THROW();
305 : }
306 :
307 : /* Clean up the error state */
308 2 : FlushErrorState();
309 2 : FreeErrorData(errdata);
310 2 : errdata = NULL;
311 :
312 2 : retry = true;
313 : }
314 4178 : PG_END_TRY();
315 :
316 : /*
317 : * If a broken connection is detected, disconnect it, reestablish a new
318 : * connection and retry a new remote transaction. If connection failure is
319 : * reported again, we give up getting a connection.
320 : */
321 4178 : if (retry)
322 : {
323 : Assert(entry->xact_depth == 0);
324 :
325 2 : ereport(DEBUG3,
326 : (errmsg_internal("could not start remote transaction on connection %p",
327 : entry->conn)),
328 : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
329 :
330 2 : elog(DEBUG3, "closing connection %p to reestablish a new one",
331 : entry->conn);
332 2 : disconnect_pg_server(entry);
333 :
334 2 : make_new_connection(entry, user);
335 :
336 2 : begin_remote_xact(entry);
337 : }
338 :
339 : /* Remember if caller will prepare statements */
340 4178 : entry->have_prep_stmt |= will_prep_stmt;
341 :
342 : /* If caller needs access to the per-connection state, return it. */
343 4178 : if (state)
344 1432 : *state = &entry->state;
345 :
346 4178 : return entry->conn;
347 : }
348 :
349 : /*
350 : * Reset all transient state fields in the cached connection entry and
351 : * establish new connection to the remote server.
352 : */
353 : static void
354 134 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
355 : {
356 134 : ForeignServer *server = GetForeignServer(user->serverid);
357 : ListCell *lc;
358 :
359 : Assert(entry->conn == NULL);
360 :
361 : /* Reset all transient state fields, to be sure all are clean */
362 134 : entry->xact_depth = 0;
363 134 : entry->have_prep_stmt = false;
364 134 : entry->have_error = false;
365 134 : entry->changing_xact_state = false;
366 134 : entry->invalidated = false;
367 134 : entry->serverid = server->serverid;
368 134 : entry->server_hashvalue =
369 134 : GetSysCacheHashValue1(FOREIGNSERVEROID,
370 : ObjectIdGetDatum(server->serverid));
371 134 : entry->mapping_hashvalue =
372 134 : GetSysCacheHashValue1(USERMAPPINGOID,
373 : ObjectIdGetDatum(user->umid));
374 134 : memset(&entry->state, 0, sizeof(entry->state));
375 :
376 : /*
377 : * Determine whether to keep the connection that we're about to make here
378 : * open even after the transaction using it ends, so that the subsequent
379 : * transactions can re-use it.
380 : *
381 : * By default, all the connections to any foreign servers are kept open.
382 : *
383 : * Also determine whether to commit/abort (sub)transactions opened on the
384 : * remote server in parallel at (sub)transaction end, which is disabled by
385 : * default.
386 : *
387 : * Note: it's enough to determine these only when making a new connection
388 : * because if these settings for it are changed, it will be closed and
389 : * re-made later.
390 : */
391 134 : entry->keep_connections = true;
392 134 : entry->parallel_commit = false;
393 134 : entry->parallel_abort = false;
394 590 : foreach(lc, server->options)
395 : {
396 456 : DefElem *def = (DefElem *) lfirst(lc);
397 :
398 456 : if (strcmp(def->defname, "keep_connections") == 0)
399 16 : entry->keep_connections = defGetBoolean(def);
400 440 : else if (strcmp(def->defname, "parallel_commit") == 0)
401 4 : entry->parallel_commit = defGetBoolean(def);
402 436 : else if (strcmp(def->defname, "parallel_abort") == 0)
403 4 : entry->parallel_abort = defGetBoolean(def);
404 : }
405 :
406 : /* Now try to make the connection */
407 134 : entry->conn = connect_pg_server(server, user);
408 :
409 122 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
410 : entry->conn, server->servername, user->umid, user->userid);
411 122 : }
412 :
413 : /*
414 : * Check that non-superuser has used password or delegated credentials
415 : * to establish connection; otherwise, he's piggybacking on the
416 : * postgres server's user identity. See also dblink_security_check()
417 : * in contrib/dblink and check_conn_params.
418 : */
419 : static void
420 126 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
421 : {
422 : /* Superusers bypass the check */
423 126 : if (superuser_arg(user->userid))
424 118 : return;
425 :
426 : #ifdef ENABLE_GSS
427 : /* Connected via GSSAPI with delegated credentials- all good. */
428 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
429 : return;
430 : #endif
431 :
432 : /* Ok if superuser set PW required false. */
433 8 : if (!UserMappingPasswordRequired(user))
434 4 : return;
435 :
436 : /* Connected via PW, with PW required true, and provided non-empty PW. */
437 4 : if (PQconnectionUsedPassword(conn))
438 : {
439 : /* ok if params contain a non-empty password */
440 0 : for (int i = 0; keywords[i] != NULL; i++)
441 : {
442 0 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
443 0 : return;
444 : }
445 : }
446 :
447 4 : ereport(ERROR,
448 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
449 : errmsg("password or GSSAPI delegated credentials required"),
450 : errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
451 : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
452 : }
453 :
454 : /*
455 : * Connect to remote server using specified server and user mapping properties.
456 : */
457 : static PGconn *
458 134 : connect_pg_server(ForeignServer *server, UserMapping *user)
459 : {
460 134 : PGconn *volatile conn = NULL;
461 :
462 : /*
463 : * Use PG_TRY block to ensure closing connection on error.
464 : */
465 134 : PG_TRY();
466 : {
467 : const char **keywords;
468 : const char **values;
469 134 : char *appname = NULL;
470 : int n;
471 :
472 : /*
473 : * Construct connection params from generic options of ForeignServer
474 : * and UserMapping. (Some of them might not be libpq options, in
475 : * which case we'll just waste a few array slots.) Add 4 extra slots
476 : * for application_name, fallback_application_name, client_encoding,
477 : * end marker.
478 : */
479 134 : n = list_length(server->options) + list_length(user->options) + 4;
480 134 : keywords = (const char **) palloc(n * sizeof(char *));
481 134 : values = (const char **) palloc(n * sizeof(char *));
482 :
483 134 : n = 0;
484 268 : n += ExtractConnectionOptions(server->options,
485 134 : keywords + n, values + n);
486 268 : n += ExtractConnectionOptions(user->options,
487 134 : keywords + n, values + n);
488 :
489 : /*
490 : * Use pgfdw_application_name as application_name if set.
491 : *
492 : * PQconnectdbParams() processes the parameter arrays from start to
493 : * end. If any key word is repeated, the last value is used. Therefore
494 : * note that pgfdw_application_name must be added to the arrays after
495 : * options of ForeignServer are, so that it can override
496 : * application_name set in ForeignServer.
497 : */
498 134 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
499 : {
500 2 : keywords[n] = "application_name";
501 2 : values[n] = pgfdw_application_name;
502 2 : n++;
503 : }
504 :
505 : /*
506 : * Search the parameter arrays to find application_name setting, and
507 : * replace escape sequences in it with status information if found.
508 : * The arrays are searched backwards because the last value is used if
509 : * application_name is repeatedly set.
510 : */
511 338 : for (int i = n - 1; i >= 0; i--)
512 : {
513 240 : if (strcmp(keywords[i], "application_name") == 0 &&
514 36 : *(values[i]) != '\0')
515 : {
516 : /*
517 : * Use this application_name setting if it's not empty string
518 : * even after any escape sequences in it are replaced.
519 : */
520 36 : appname = process_pgfdw_appname(values[i]);
521 36 : if (appname[0] != '\0')
522 : {
523 36 : values[i] = appname;
524 36 : break;
525 : }
526 :
527 : /*
528 : * This empty application_name is not used, so we set
529 : * values[i] to NULL and keep searching the array to find the
530 : * next one.
531 : */
532 0 : values[i] = NULL;
533 0 : pfree(appname);
534 0 : appname = NULL;
535 : }
536 : }
537 :
538 : /* Use "postgres_fdw" as fallback_application_name */
539 134 : keywords[n] = "fallback_application_name";
540 134 : values[n] = "postgres_fdw";
541 134 : n++;
542 :
543 : /* Set client_encoding so that libpq can convert encoding properly. */
544 134 : keywords[n] = "client_encoding";
545 134 : values[n] = GetDatabaseEncodingName();
546 134 : n++;
547 :
548 134 : keywords[n] = values[n] = NULL;
549 :
550 : /* verify the set of connection parameters */
551 134 : check_conn_params(keywords, values, user);
552 :
553 : /* first time, allocate or get the custom wait event */
554 130 : if (pgfdw_we_connect == 0)
555 10 : pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
556 :
557 : /* OK to make connection */
558 130 : conn = libpqsrv_connect_params(keywords, values,
559 : false, /* expand_dbname */
560 : pgfdw_we_connect);
561 :
562 130 : if (!conn || PQstatus(conn) != CONNECTION_OK)
563 4 : ereport(ERROR,
564 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
565 : errmsg("could not connect to server \"%s\"",
566 : server->servername),
567 : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
568 :
569 : /* Perform post-connection security checks */
570 126 : pgfdw_security_check(keywords, values, user, conn);
571 :
572 : /* Prepare new session for use */
573 122 : configure_remote_session(conn);
574 :
575 122 : if (appname != NULL)
576 36 : pfree(appname);
577 122 : pfree(keywords);
578 122 : pfree(values);
579 : }
580 12 : PG_CATCH();
581 : {
582 12 : libpqsrv_disconnect(conn);
583 12 : PG_RE_THROW();
584 : }
585 122 : PG_END_TRY();
586 :
587 122 : return conn;
588 : }
589 :
590 : /*
591 : * Disconnect any open connection for a connection cache entry.
592 : */
593 : static void
594 112 : disconnect_pg_server(ConnCacheEntry *entry)
595 : {
596 112 : if (entry->conn != NULL)
597 : {
598 112 : libpqsrv_disconnect(entry->conn);
599 112 : entry->conn = NULL;
600 : }
601 112 : }
602 :
603 : /*
604 : * Return true if the password_required is defined and false for this user
605 : * mapping, otherwise false. The mapping has been pre-validated.
606 : */
607 : static bool
608 14 : UserMappingPasswordRequired(UserMapping *user)
609 : {
610 : ListCell *cell;
611 :
612 20 : foreach(cell, user->options)
613 : {
614 12 : DefElem *def = (DefElem *) lfirst(cell);
615 :
616 12 : if (strcmp(def->defname, "password_required") == 0)
617 6 : return defGetBoolean(def);
618 : }
619 :
620 8 : return true;
621 : }
622 :
623 : /*
624 : * For non-superusers, insist that the connstr specify a password or that the
625 : * user provided their own GSSAPI delegated credentials. This
626 : * prevents a password from being picked up from .pgpass, a service file, the
627 : * environment, etc. We don't want the postgres user's passwords,
628 : * certificates, etc to be accessible to non-superusers. (See also
629 : * dblink_connstr_check in contrib/dblink.)
630 : */
631 : static void
632 134 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
633 : {
634 : int i;
635 :
636 : /* no check required if superuser */
637 134 : if (superuser_arg(user->userid))
638 122 : return;
639 :
640 : #ifdef ENABLE_GSS
641 : /* ok if the user provided their own delegated credentials */
642 : if (be_gssapi_get_delegation(MyProcPort))
643 : return;
644 : #endif
645 :
646 : /* ok if params contain a non-empty password */
647 48 : for (i = 0; keywords[i] != NULL; i++)
648 : {
649 42 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
650 6 : return;
651 : }
652 :
653 : /* ok if the superuser explicitly said so at user mapping creation time */
654 6 : if (!UserMappingPasswordRequired(user))
655 2 : return;
656 :
657 4 : ereport(ERROR,
658 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
659 : errmsg("password or GSSAPI delegated credentials required"),
660 : errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
661 : }
662 :
663 : /*
664 : * Issue SET commands to make sure remote session is configured properly.
665 : *
666 : * We do this just once at connection, assuming nothing will change the
667 : * values later. Since we'll never send volatile function calls to the
668 : * remote, there shouldn't be any way to break this assumption from our end.
669 : * It's possible to think of ways to break it at the remote end, eg making
670 : * a foreign table point to a view that includes a set_config call ---
671 : * but once you admit the possibility of a malicious view definition,
672 : * there are any number of ways to break things.
673 : */
674 : static void
675 122 : configure_remote_session(PGconn *conn)
676 : {
677 122 : int remoteversion = PQserverVersion(conn);
678 :
679 : /* Force the search path to contain only pg_catalog (see deparse.c) */
680 122 : do_sql_command(conn, "SET search_path = pg_catalog");
681 :
682 : /*
683 : * Set remote timezone; this is basically just cosmetic, since all
684 : * transmitted and returned timestamptzs should specify a zone explicitly
685 : * anyway. However it makes the regression test outputs more predictable.
686 : *
687 : * We don't risk setting remote zone equal to ours, since the remote
688 : * server might use a different timezone database. Instead, use GMT
689 : * (quoted, because very old servers are picky about case). That's
690 : * guaranteed to work regardless of the remote's timezone database,
691 : * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
692 : */
693 122 : do_sql_command(conn, "SET timezone = 'GMT'");
694 :
695 : /*
696 : * Set values needed to ensure unambiguous data output from remote. (This
697 : * logic should match what pg_dump does. See also set_transmission_modes
698 : * in postgres_fdw.c.)
699 : */
700 122 : do_sql_command(conn, "SET datestyle = ISO");
701 122 : if (remoteversion >= 80400)
702 122 : do_sql_command(conn, "SET intervalstyle = postgres");
703 122 : if (remoteversion >= 90000)
704 122 : do_sql_command(conn, "SET extra_float_digits = 3");
705 : else
706 0 : do_sql_command(conn, "SET extra_float_digits = 2");
707 122 : }
708 :
709 : /*
710 : * Convenience subroutine to issue a non-data-returning SQL command to remote
711 : */
712 : void
713 3458 : do_sql_command(PGconn *conn, const char *sql)
714 : {
715 3458 : do_sql_command_begin(conn, sql);
716 3458 : do_sql_command_end(conn, sql, false);
717 3452 : }
718 :
719 : static void
720 3494 : do_sql_command_begin(PGconn *conn, const char *sql)
721 : {
722 3494 : if (!PQsendQuery(conn, sql))
723 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
724 3494 : }
725 :
726 : static void
727 3494 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
728 : {
729 : PGresult *res;
730 :
731 : /*
732 : * If requested, consume whatever data is available from the socket. (Note
733 : * that if all data is available, this allows pgfdw_get_result to call
734 : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
735 : * would be large compared to the overhead of PQconsumeInput.)
736 : */
737 3494 : if (consume_input && !PQconsumeInput(conn))
738 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
739 3494 : res = pgfdw_get_result(conn);
740 3494 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
741 6 : pgfdw_report_error(ERROR, res, conn, true, sql);
742 3488 : PQclear(res);
743 3488 : }
744 :
745 : /*
746 : * Start remote transaction or subtransaction, if needed.
747 : *
748 : * Note that we always use at least REPEATABLE READ in the remote session.
749 : * This is so that, if a query initiates multiple scans of the same or
750 : * different foreign tables, we will get snapshot-consistent results from
751 : * those scans. A disadvantage is that we can't provide sane emulation of
752 : * READ COMMITTED behavior --- it would be nice if we had some other way to
753 : * control which remote queries share a snapshot.
754 : */
755 : static void
756 4182 : begin_remote_xact(ConnCacheEntry *entry)
757 : {
758 4182 : int curlevel = GetCurrentTransactionNestLevel();
759 :
760 : /* Start main transaction if we haven't yet */
761 4182 : if (entry->xact_depth <= 0)
762 : {
763 : const char *sql;
764 :
765 1450 : elog(DEBUG3, "starting remote transaction on connection %p",
766 : entry->conn);
767 :
768 1450 : if (IsolationIsSerializable())
769 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
770 : else
771 1450 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
772 1450 : entry->changing_xact_state = true;
773 1450 : do_sql_command(entry->conn, sql);
774 1448 : entry->xact_depth = 1;
775 1448 : entry->changing_xact_state = false;
776 : }
777 :
778 : /*
779 : * If we're in a subtransaction, stack up savepoints to match our level.
780 : * This ensures we can rollback just the desired effects when a
781 : * subtransaction aborts.
782 : */
783 4208 : while (entry->xact_depth < curlevel)
784 : {
785 : char sql[64];
786 :
787 30 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
788 30 : entry->changing_xact_state = true;
789 30 : do_sql_command(entry->conn, sql);
790 28 : entry->xact_depth++;
791 28 : entry->changing_xact_state = false;
792 : }
793 4178 : }
794 :
795 : /*
796 : * Release connection reference count created by calling GetConnection.
797 : */
798 : void
799 4072 : ReleaseConnection(PGconn *conn)
800 : {
801 : /*
802 : * Currently, we don't actually track connection references because all
803 : * cleanup is managed on a transaction or subtransaction basis instead. So
804 : * there's nothing to do here.
805 : */
806 4072 : }
807 :
808 : /*
809 : * Assign a "unique" number for a cursor.
810 : *
811 : * These really only need to be unique per connection within a transaction.
812 : * For the moment we ignore the per-connection point and assign them across
813 : * all connections in the transaction, but we ask for the connection to be
814 : * supplied in case we want to refine that.
815 : *
816 : * Note that even if wraparound happens in a very long transaction, actual
817 : * collisions are highly improbable; just be sure to use %u not %d to print.
818 : */
819 : unsigned int
820 1032 : GetCursorNumber(PGconn *conn)
821 : {
822 1032 : return ++cursor_number;
823 : }
824 :
825 : /*
826 : * Assign a "unique" number for a prepared statement.
827 : *
828 : * This works much like GetCursorNumber, except that we never reset the counter
829 : * within a session. That's because we can't be 100% sure we've gotten rid
830 : * of all prepared statements on all connections, and it's not really worth
831 : * increasing the risk of prepared-statement name collisions by resetting.
832 : */
833 : unsigned int
834 350 : GetPrepStmtNumber(PGconn *conn)
835 : {
836 350 : return ++prep_stmt_number;
837 : }
838 :
839 : /*
840 : * Submit a query and wait for the result.
841 : *
842 : * Since we don't use non-blocking mode, this can't process interrupts while
843 : * pushing the query text to the server. That risk is relatively small, so we
844 : * ignore that for now.
845 : *
846 : * Caller is responsible for the error handling on the result.
847 : */
848 : PGresult *
849 7820 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
850 : {
851 : /* First, process a pending asynchronous request, if any. */
852 7820 : if (state && state->pendingAreq)
853 8 : process_pending_request(state->pendingAreq);
854 :
855 7820 : if (!PQsendQuery(conn, query))
856 0 : return NULL;
857 7820 : return pgfdw_get_result(conn);
858 : }
859 :
860 : /*
861 : * Wrap libpqsrv_get_result_last(), adding wait event.
862 : *
863 : * Caller is responsible for the error handling on the result.
864 : */
865 : PGresult *
866 15790 : pgfdw_get_result(PGconn *conn)
867 : {
868 15790 : return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
869 : }
870 :
871 : /*
872 : * Report an error we got from the remote server.
873 : *
874 : * elevel: error level to use (typically ERROR, but might be less)
875 : * res: PGresult containing the error
876 : * conn: connection we did the query on
877 : * clear: if true, PQclear the result (otherwise caller will handle it)
878 : * sql: NULL, or text of remote command we tried to execute
879 : *
880 : * Note: callers that choose not to throw ERROR for a remote error are
881 : * responsible for making sure that the associated ConnCacheEntry gets
882 : * marked with have_error = true.
883 : */
884 : void
885 32 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
886 : bool clear, const char *sql)
887 : {
888 : /* If requested, PGresult must be released before leaving this function. */
889 32 : PG_TRY();
890 : {
891 32 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
892 32 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
893 32 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
894 32 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
895 32 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
896 : int sqlstate;
897 :
898 32 : if (diag_sqlstate)
899 28 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
900 : diag_sqlstate[1],
901 : diag_sqlstate[2],
902 : diag_sqlstate[3],
903 : diag_sqlstate[4]);
904 : else
905 4 : sqlstate = ERRCODE_CONNECTION_FAILURE;
906 :
907 : /*
908 : * If we don't get a message from the PGresult, try the PGconn. This
909 : * is needed because for connection-level failures, PQgetResult may
910 : * just return NULL, not a PGresult at all.
911 : */
912 32 : if (message_primary == NULL)
913 4 : message_primary = pchomp(PQerrorMessage(conn));
914 :
915 32 : ereport(elevel,
916 : (errcode(sqlstate),
917 : (message_primary != NULL && message_primary[0] != '\0') ?
918 : errmsg_internal("%s", message_primary) :
919 : errmsg("could not obtain message string for remote error"),
920 : message_detail ? errdetail_internal("%s", message_detail) : 0,
921 : message_hint ? errhint("%s", message_hint) : 0,
922 : message_context ? errcontext("%s", message_context) : 0,
923 : sql ? errcontext("remote SQL command: %s", sql) : 0));
924 : }
925 32 : PG_FINALLY();
926 : {
927 32 : if (clear)
928 30 : PQclear(res);
929 : }
930 32 : PG_END_TRY();
931 0 : }
932 :
933 : /*
934 : * pgfdw_xact_callback --- cleanup at main-transaction end.
935 : *
936 : * This runs just late enough that it must not enter user-defined code
937 : * locally. (Entering such code on the remote side is fine. Its remote
938 : * COMMIT TRANSACTION may run deferred triggers.)
939 : */
940 : static void
941 7704 : pgfdw_xact_callback(XactEvent event, void *arg)
942 : {
943 : HASH_SEQ_STATUS scan;
944 : ConnCacheEntry *entry;
945 7704 : List *pending_entries = NIL;
946 7704 : List *cancel_requested = NIL;
947 :
948 : /* Quick exit if no connections were touched in this transaction. */
949 7704 : if (!xact_got_connection)
950 6318 : return;
951 :
952 : /*
953 : * Scan all connection cache entries to find open remote transactions, and
954 : * close them.
955 : */
956 1386 : hash_seq_init(&scan, ConnectionHash);
957 7146 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
958 : {
959 : PGresult *res;
960 :
961 : /* Ignore cache entry if no open connection right now */
962 5762 : if (entry->conn == NULL)
963 3268 : continue;
964 :
965 : /* If it has an open remote transaction, try to close it */
966 2494 : if (entry->xact_depth > 0)
967 : {
968 1450 : elog(DEBUG3, "closing remote transaction on connection %p",
969 : entry->conn);
970 :
971 1450 : switch (event)
972 : {
973 1366 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
974 : case XACT_EVENT_PRE_COMMIT:
975 :
976 : /*
977 : * If abort cleanup previously failed for this connection,
978 : * we can't issue any more commands against it.
979 : */
980 1366 : pgfdw_reject_incomplete_xact_state_change(entry);
981 :
982 : /* Commit all remote transactions during pre-commit */
983 1366 : entry->changing_xact_state = true;
984 1366 : if (entry->parallel_commit)
985 : {
986 32 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
987 32 : pending_entries = lappend(pending_entries, entry);
988 32 : continue;
989 : }
990 1334 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
991 1334 : entry->changing_xact_state = false;
992 :
993 : /*
994 : * If there were any errors in subtransactions, and we
995 : * made prepared statements, do a DEALLOCATE ALL to make
996 : * sure we get rid of all prepared statements. This is
997 : * annoying and not terribly bulletproof, but it's
998 : * probably not worth trying harder.
999 : *
1000 : * DEALLOCATE ALL only exists in 8.3 and later, so this
1001 : * constrains how old a server postgres_fdw can
1002 : * communicate with. We intentionally ignore errors in
1003 : * the DEALLOCATE, so that we can hobble along to some
1004 : * extent with older servers (leaking prepared statements
1005 : * as we go; but we don't really support update operations
1006 : * pre-8.3 anyway).
1007 : */
1008 1334 : if (entry->have_prep_stmt && entry->have_error)
1009 : {
1010 0 : res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1011 : NULL);
1012 0 : PQclear(res);
1013 : }
1014 1334 : entry->have_prep_stmt = false;
1015 1334 : entry->have_error = false;
1016 1334 : break;
1017 2 : case XACT_EVENT_PRE_PREPARE:
1018 :
1019 : /*
1020 : * We disallow any remote transactions, since it's not
1021 : * very reasonable to hold them open until the prepared
1022 : * transaction is committed. For the moment, throw error
1023 : * unconditionally; later we might allow read-only cases.
1024 : * Note that the error will cause us to come right back
1025 : * here with event == XACT_EVENT_ABORT, so we'll clean up
1026 : * the connection state at that point.
1027 : */
1028 2 : ereport(ERROR,
1029 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1030 : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1031 : break;
1032 0 : case XACT_EVENT_PARALLEL_COMMIT:
1033 : case XACT_EVENT_COMMIT:
1034 : case XACT_EVENT_PREPARE:
1035 : /* Pre-commit should have closed the open transaction */
1036 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1037 : break;
1038 82 : case XACT_EVENT_PARALLEL_ABORT:
1039 : case XACT_EVENT_ABORT:
1040 : /* Rollback all remote transactions during abort */
1041 82 : if (entry->parallel_abort)
1042 : {
1043 8 : if (pgfdw_abort_cleanup_begin(entry, true,
1044 : &pending_entries,
1045 : &cancel_requested))
1046 8 : continue;
1047 : }
1048 : else
1049 74 : pgfdw_abort_cleanup(entry, true);
1050 74 : break;
1051 : }
1052 1044 : }
1053 :
1054 : /* Reset state to show we're out of a transaction */
1055 2452 : pgfdw_reset_xact_state(entry, true);
1056 : }
1057 :
1058 : /* If there are any pending connections, finish cleaning them up */
1059 1384 : if (pending_entries || cancel_requested)
1060 : {
1061 30 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1062 : event == XACT_EVENT_PRE_COMMIT)
1063 : {
1064 : Assert(cancel_requested == NIL);
1065 26 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1066 : }
1067 : else
1068 : {
1069 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1070 : event == XACT_EVENT_ABORT);
1071 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1072 : true);
1073 : }
1074 : }
1075 :
1076 : /*
1077 : * Regardless of the event type, we can now mark ourselves as out of the
1078 : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1079 : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1080 : */
1081 1384 : xact_got_connection = false;
1082 :
1083 : /* Also reset cursor numbering for next transaction */
1084 1384 : cursor_number = 0;
1085 : }
1086 :
1087 : /*
1088 : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1089 : */
1090 : static void
1091 76 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1092 : SubTransactionId parentSubid, void *arg)
1093 : {
1094 : HASH_SEQ_STATUS scan;
1095 : ConnCacheEntry *entry;
1096 : int curlevel;
1097 76 : List *pending_entries = NIL;
1098 76 : List *cancel_requested = NIL;
1099 :
1100 : /* Nothing to do at subxact start, nor after commit. */
1101 76 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1102 : event == SUBXACT_EVENT_ABORT_SUB))
1103 46 : return;
1104 :
1105 : /* Quick exit if no connections were touched in this transaction. */
1106 30 : if (!xact_got_connection)
1107 0 : return;
1108 :
1109 : /*
1110 : * Scan all connection cache entries to find open remote subtransactions
1111 : * of the current level, and close them.
1112 : */
1113 30 : curlevel = GetCurrentTransactionNestLevel();
1114 30 : hash_seq_init(&scan, ConnectionHash);
1115 204 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1116 : {
1117 : char sql[100];
1118 :
1119 : /*
1120 : * We only care about connections with open remote subtransactions of
1121 : * the current level.
1122 : */
1123 174 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1124 158 : continue;
1125 :
1126 28 : if (entry->xact_depth > curlevel)
1127 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1128 : entry->xact_depth);
1129 :
1130 28 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1131 : {
1132 : /*
1133 : * If abort cleanup previously failed for this connection, we
1134 : * can't issue any more commands against it.
1135 : */
1136 14 : pgfdw_reject_incomplete_xact_state_change(entry);
1137 :
1138 : /* Commit all remote subtransactions during pre-commit */
1139 14 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1140 14 : entry->changing_xact_state = true;
1141 14 : if (entry->parallel_commit)
1142 : {
1143 4 : do_sql_command_begin(entry->conn, sql);
1144 4 : pending_entries = lappend(pending_entries, entry);
1145 4 : continue;
1146 : }
1147 10 : do_sql_command(entry->conn, sql);
1148 10 : entry->changing_xact_state = false;
1149 : }
1150 : else
1151 : {
1152 : /* Rollback all remote subtransactions during abort */
1153 14 : if (entry->parallel_abort)
1154 : {
1155 8 : if (pgfdw_abort_cleanup_begin(entry, false,
1156 : &pending_entries,
1157 : &cancel_requested))
1158 8 : continue;
1159 : }
1160 : else
1161 6 : pgfdw_abort_cleanup(entry, false);
1162 : }
1163 :
1164 : /* OK, we're outta that level of subtransaction */
1165 16 : pgfdw_reset_xact_state(entry, false);
1166 : }
1167 :
1168 : /* If there are any pending connections, finish cleaning them up */
1169 30 : if (pending_entries || cancel_requested)
1170 : {
1171 6 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1172 : {
1173 : Assert(cancel_requested == NIL);
1174 2 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1175 : }
1176 : else
1177 : {
1178 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1179 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1180 : false);
1181 : }
1182 : }
1183 : }
1184 :
1185 : /*
1186 : * Connection invalidation callback function
1187 : *
1188 : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1189 : * close connections depending on that entry immediately if current transaction
1190 : * has not used those connections yet. Otherwise, mark those connections as
1191 : * invalid and then make pgfdw_xact_callback() close them at the end of current
1192 : * transaction, since they cannot be closed in the midst of the transaction
1193 : * using them. Closed connections will be remade at the next opportunity if
1194 : * necessary.
1195 : *
1196 : * Although most cache invalidation callbacks blow away all the related stuff
1197 : * regardless of the given hashvalue, connections are expensive enough that
1198 : * it's worth trying to avoid that.
1199 : *
1200 : * NB: We could avoid unnecessary disconnection more strictly by examining
1201 : * individual option values, but it seems too much effort for the gain.
1202 : */
1203 : static void
1204 344 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1205 : {
1206 : HASH_SEQ_STATUS scan;
1207 : ConnCacheEntry *entry;
1208 :
1209 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1210 :
1211 : /* ConnectionHash must exist already, if we're registered */
1212 344 : hash_seq_init(&scan, ConnectionHash);
1213 2332 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1214 : {
1215 : /* Ignore invalid entries */
1216 1988 : if (entry->conn == NULL)
1217 1624 : continue;
1218 :
1219 : /* hashvalue == 0 means a cache reset, must clear all state */
1220 364 : if (hashvalue == 0 ||
1221 258 : (cacheid == FOREIGNSERVEROID &&
1222 364 : entry->server_hashvalue == hashvalue) ||
1223 106 : (cacheid == USERMAPPINGOID &&
1224 106 : entry->mapping_hashvalue == hashvalue))
1225 : {
1226 : /*
1227 : * Close the connection immediately if it's not used yet in this
1228 : * transaction. Otherwise mark it as invalid so that
1229 : * pgfdw_xact_callback() can close it at the end of this
1230 : * transaction.
1231 : */
1232 98 : if (entry->xact_depth == 0)
1233 : {
1234 92 : elog(DEBUG3, "discarding connection %p", entry->conn);
1235 92 : disconnect_pg_server(entry);
1236 : }
1237 : else
1238 6 : entry->invalidated = true;
1239 : }
1240 : }
1241 344 : }
1242 :
1243 : /*
1244 : * Raise an error if the given connection cache entry is marked as being
1245 : * in the middle of an xact state change. This should be called at which no
1246 : * such change is expected to be in progress; if one is found to be in
1247 : * progress, it means that we aborted in the middle of a previous state change
1248 : * and now don't know what the remote transaction state actually is.
1249 : * Such connections can't safely be further used. Re-establishing the
1250 : * connection would change the snapshot and roll back any writes already
1251 : * performed, so that's not an option, either. Thus, we must abort.
1252 : */
1253 : static void
1254 5572 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1255 : {
1256 : ForeignServer *server;
1257 :
1258 : /* nothing to do for inactive entries and entries of sane state */
1259 5572 : if (entry->conn == NULL || !entry->changing_xact_state)
1260 5572 : return;
1261 :
1262 : /* make sure this entry is inactive */
1263 0 : disconnect_pg_server(entry);
1264 :
1265 : /* find server name to be shown in the message below */
1266 0 : server = GetForeignServer(entry->serverid);
1267 :
1268 0 : ereport(ERROR,
1269 : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1270 : errmsg("connection to server \"%s\" was lost",
1271 : server->servername)));
1272 : }
1273 :
1274 : /*
1275 : * Reset state to show we're out of a (sub)transaction.
1276 : */
1277 : static void
1278 2520 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1279 : {
1280 2520 : if (toplevel)
1281 : {
1282 : /* Reset state to show we're out of a transaction */
1283 2492 : entry->xact_depth = 0;
1284 :
1285 : /*
1286 : * If the connection isn't in a good idle state, it is marked as
1287 : * invalid or keep_connections option of its server is disabled, then
1288 : * discard it to recover. Next GetConnection will open a new
1289 : * connection.
1290 : */
1291 4982 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1292 2490 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1293 2490 : entry->changing_xact_state ||
1294 2490 : entry->invalidated ||
1295 2486 : !entry->keep_connections)
1296 : {
1297 8 : elog(DEBUG3, "discarding connection %p", entry->conn);
1298 8 : disconnect_pg_server(entry);
1299 : }
1300 : }
1301 : else
1302 : {
1303 : /* Reset state to show we're out of a subtransaction */
1304 28 : entry->xact_depth--;
1305 : }
1306 2520 : }
1307 :
1308 : /*
1309 : * Cancel the currently-in-progress query (whose query text we do not have)
1310 : * and ignore the result. Returns true if we successfully cancel the query
1311 : * and discard any pending result, and false if not.
1312 : *
1313 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1314 : * recursion trouble, we'll end up slamming the connection shut, which will
1315 : * necessitate failing the entire toplevel transaction even if subtransactions
1316 : * were used. Try to use WARNING where we can.
1317 : *
1318 : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1319 : * query text from the pendingAreq saved in the per-connection state, then
1320 : * report the query using it.
1321 : */
1322 : static bool
1323 4 : pgfdw_cancel_query(PGconn *conn)
1324 : {
1325 : TimestampTz endtime;
1326 :
1327 : /*
1328 : * If it takes too long to cancel the query and discard the result, assume
1329 : * the connection is dead.
1330 : */
1331 4 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1332 : CONNECTION_CLEANUP_TIMEOUT);
1333 :
1334 4 : if (!pgfdw_cancel_query_begin(conn, endtime))
1335 0 : return false;
1336 4 : return pgfdw_cancel_query_end(conn, endtime, false);
1337 : }
1338 :
1339 : /*
1340 : * Submit a cancel request to the given connection, waiting only until
1341 : * the given time.
1342 : *
1343 : * We sleep interruptibly until we receive confirmation that the cancel
1344 : * request has been accepted, and if it is, return true; if the timeout
1345 : * lapses without that, or the request fails for whatever reason, return
1346 : * false.
1347 : */
1348 : static bool
1349 4 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1350 : {
1351 4 : const char *errormsg = libpqsrv_cancel(conn, endtime);
1352 :
1353 4 : if (errormsg != NULL)
1354 0 : ereport(WARNING,
1355 : errcode(ERRCODE_CONNECTION_FAILURE),
1356 : errmsg("could not send cancel request: %s", errormsg));
1357 :
1358 4 : return errormsg == NULL;
1359 : }
1360 :
1361 : static bool
1362 4 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1363 : {
1364 4 : PGresult *result = NULL;
1365 : bool timed_out;
1366 :
1367 : /*
1368 : * If requested, consume whatever data is available from the socket. (Note
1369 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1370 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1371 : * which would be large compared to the overhead of PQconsumeInput.)
1372 : */
1373 4 : if (consume_input && !PQconsumeInput(conn))
1374 : {
1375 0 : ereport(WARNING,
1376 : (errcode(ERRCODE_CONNECTION_FAILURE),
1377 : errmsg("could not get result of cancel request: %s",
1378 : pchomp(PQerrorMessage(conn)))));
1379 0 : return false;
1380 : }
1381 :
1382 : /* Get and discard the result of the query. */
1383 4 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1384 : {
1385 0 : if (timed_out)
1386 0 : ereport(WARNING,
1387 : (errmsg("could not get result of cancel request due to timeout")));
1388 : else
1389 0 : ereport(WARNING,
1390 : (errcode(ERRCODE_CONNECTION_FAILURE),
1391 : errmsg("could not get result of cancel request: %s",
1392 : pchomp(PQerrorMessage(conn)))));
1393 :
1394 0 : return false;
1395 : }
1396 4 : PQclear(result);
1397 :
1398 4 : return true;
1399 : }
1400 :
1401 : /*
1402 : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1403 : * result. If the query is executed without error, the return value is true.
1404 : * If the query is executed successfully but returns an error, the return
1405 : * value is true if and only if ignore_errors is set. If the query can't be
1406 : * sent or times out, the return value is false.
1407 : *
1408 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1409 : * recursion trouble, we'll end up slamming the connection shut, which will
1410 : * necessitate failing the entire toplevel transaction even if subtransactions
1411 : * were used. Try to use WARNING where we can.
1412 : */
1413 : static bool
1414 106 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1415 : {
1416 : TimestampTz endtime;
1417 :
1418 : /*
1419 : * If it takes too long to execute a cleanup query, assume the connection
1420 : * is dead. It's fairly likely that this is why we aborted in the first
1421 : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1422 : * be too long.
1423 : */
1424 106 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1425 : CONNECTION_CLEANUP_TIMEOUT);
1426 :
1427 106 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
1428 0 : return false;
1429 106 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1430 : false, ignore_errors);
1431 : }
1432 :
1433 : static bool
1434 130 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1435 : {
1436 : Assert(query != NULL);
1437 :
1438 : /*
1439 : * Submit a query. Since we don't use non-blocking mode, this also can
1440 : * block. But its risk is relatively small, so we ignore that for now.
1441 : */
1442 130 : if (!PQsendQuery(conn, query))
1443 : {
1444 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1445 0 : return false;
1446 : }
1447 :
1448 130 : return true;
1449 : }
1450 :
1451 : static bool
1452 130 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1453 : TimestampTz endtime, bool consume_input,
1454 : bool ignore_errors)
1455 : {
1456 130 : PGresult *result = NULL;
1457 : bool timed_out;
1458 :
1459 : Assert(query != NULL);
1460 :
1461 : /*
1462 : * If requested, consume whatever data is available from the socket. (Note
1463 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1464 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1465 : * which would be large compared to the overhead of PQconsumeInput.)
1466 : */
1467 130 : if (consume_input && !PQconsumeInput(conn))
1468 : {
1469 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1470 0 : return false;
1471 : }
1472 :
1473 : /* Get the result of the query. */
1474 130 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1475 : {
1476 0 : if (timed_out)
1477 0 : ereport(WARNING,
1478 : (errmsg("could not get query result due to timeout"),
1479 : errcontext("remote SQL command: %s", query)));
1480 : else
1481 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1482 :
1483 0 : return false;
1484 : }
1485 :
1486 : /* Issue a warning if not successful. */
1487 130 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1488 : {
1489 0 : pgfdw_report_error(WARNING, result, conn, true, query);
1490 0 : return ignore_errors;
1491 : }
1492 130 : PQclear(result);
1493 :
1494 130 : return true;
1495 : }
1496 :
1497 : /*
1498 : * Get, during abort cleanup, the result of a query that is in progress. This
1499 : * might be a query that is being interrupted by transaction abort, or it might
1500 : * be a query that was initiated as part of transaction abort to get the remote
1501 : * side back to the appropriate state.
1502 : *
1503 : * endtime is the time at which we should give up and assume the remote
1504 : * side is dead. Returns true if the timeout expired or connection trouble
1505 : * occurred, false otherwise. Sets *result except in case of a timeout.
1506 : * Sets timed_out to true only when the timeout expired.
1507 : */
1508 : static bool
1509 134 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
1510 : bool *timed_out)
1511 : {
1512 134 : volatile bool failed = false;
1513 134 : PGresult *volatile last_res = NULL;
1514 :
1515 134 : *timed_out = false;
1516 :
1517 : /* In what follows, do not leak any PGresults on an error. */
1518 134 : PG_TRY();
1519 : {
1520 : for (;;)
1521 148 : {
1522 : PGresult *res;
1523 :
1524 406 : while (PQisBusy(conn))
1525 : {
1526 : int wc;
1527 124 : TimestampTz now = GetCurrentTimestamp();
1528 : long cur_timeout;
1529 :
1530 : /* If timeout has expired, give up, else get sleep time. */
1531 124 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1532 124 : if (cur_timeout <= 0)
1533 : {
1534 0 : *timed_out = true;
1535 0 : failed = true;
1536 0 : goto exit;
1537 : }
1538 :
1539 : /* first time, allocate or get the custom wait event */
1540 124 : if (pgfdw_we_cleanup_result == 0)
1541 4 : pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1542 :
1543 : /* Sleep until there's something to do */
1544 124 : wc = WaitLatchOrSocket(MyLatch,
1545 : WL_LATCH_SET | WL_SOCKET_READABLE |
1546 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1547 : PQsocket(conn),
1548 : cur_timeout, pgfdw_we_cleanup_result);
1549 124 : ResetLatch(MyLatch);
1550 :
1551 124 : CHECK_FOR_INTERRUPTS();
1552 :
1553 : /* Data available in socket? */
1554 124 : if (wc & WL_SOCKET_READABLE)
1555 : {
1556 124 : if (!PQconsumeInput(conn))
1557 : {
1558 : /* connection trouble */
1559 0 : failed = true;
1560 0 : goto exit;
1561 : }
1562 : }
1563 : }
1564 :
1565 282 : res = PQgetResult(conn);
1566 282 : if (res == NULL)
1567 134 : break; /* query is complete */
1568 :
1569 148 : PQclear(last_res);
1570 148 : last_res = res;
1571 : }
1572 134 : exit: ;
1573 : }
1574 0 : PG_CATCH();
1575 : {
1576 0 : PQclear(last_res);
1577 0 : PG_RE_THROW();
1578 : }
1579 134 : PG_END_TRY();
1580 :
1581 134 : if (failed)
1582 0 : PQclear(last_res);
1583 : else
1584 134 : *result = last_res;
1585 134 : return failed;
1586 : }
1587 :
1588 : /*
1589 : * Abort remote transaction or subtransaction.
1590 : *
1591 : * "toplevel" should be set to true if toplevel (main) transaction is
1592 : * rollbacked, false otherwise.
1593 : *
1594 : * Set entry->changing_xact_state to false on success, true on failure.
1595 : */
1596 : static void
1597 80 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1598 : {
1599 : char sql[100];
1600 :
1601 : /*
1602 : * Don't try to clean up the connection if we're already in error
1603 : * recursion trouble.
1604 : */
1605 80 : if (in_error_recursion_trouble())
1606 0 : entry->changing_xact_state = true;
1607 :
1608 : /*
1609 : * If connection is already unsalvageable, don't touch it further.
1610 : */
1611 80 : if (entry->changing_xact_state)
1612 2 : return;
1613 :
1614 : /*
1615 : * Mark this connection as in the process of changing transaction state.
1616 : */
1617 78 : entry->changing_xact_state = true;
1618 :
1619 : /* Assume we might have lost track of prepared statements */
1620 78 : entry->have_error = true;
1621 :
1622 : /*
1623 : * If a command has been submitted to the remote server by using an
1624 : * asynchronous execution function, the command might not have yet
1625 : * completed. Check to see if a command is still being processed by the
1626 : * remote server, and if so, request cancellation of the command.
1627 : */
1628 78 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1629 4 : !pgfdw_cancel_query(entry->conn))
1630 0 : return; /* Unable to cancel running query */
1631 :
1632 78 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1633 78 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1634 0 : return; /* Unable to abort remote (sub)transaction */
1635 :
1636 78 : if (toplevel)
1637 : {
1638 72 : if (entry->have_prep_stmt && entry->have_error &&
1639 28 : !pgfdw_exec_cleanup_query(entry->conn,
1640 : "DEALLOCATE ALL",
1641 : true))
1642 0 : return; /* Trouble clearing prepared statements */
1643 :
1644 72 : entry->have_prep_stmt = false;
1645 72 : entry->have_error = false;
1646 : }
1647 :
1648 : /*
1649 : * If pendingAreq of the per-connection state is not NULL, it means that
1650 : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1651 : * successfully and thus the per-connection state was not reset in
1652 : * fetch_more_data(); in that case reset the per-connection state here.
1653 : */
1654 78 : if (entry->state.pendingAreq)
1655 2 : memset(&entry->state, 0, sizeof(entry->state));
1656 :
1657 : /* Disarm changing_xact_state if it all worked */
1658 78 : entry->changing_xact_state = false;
1659 : }
1660 :
1661 : /*
1662 : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1663 : * don't wait for the result.
1664 : *
1665 : * Returns true if the abort command or cancel request is successfully issued,
1666 : * false otherwise. If the abort command is successfully issued, the given
1667 : * connection cache entry is appended to *pending_entries. Otherwise, if the
1668 : * cancel request is successfully issued, it is appended to *cancel_requested.
1669 : */
1670 : static bool
1671 16 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1672 : List **pending_entries, List **cancel_requested)
1673 : {
1674 : /*
1675 : * Don't try to clean up the connection if we're already in error
1676 : * recursion trouble.
1677 : */
1678 16 : if (in_error_recursion_trouble())
1679 0 : entry->changing_xact_state = true;
1680 :
1681 : /*
1682 : * If connection is already unsalvageable, don't touch it further.
1683 : */
1684 16 : if (entry->changing_xact_state)
1685 0 : return false;
1686 :
1687 : /*
1688 : * Mark this connection as in the process of changing transaction state.
1689 : */
1690 16 : entry->changing_xact_state = true;
1691 :
1692 : /* Assume we might have lost track of prepared statements */
1693 16 : entry->have_error = true;
1694 :
1695 : /*
1696 : * If a command has been submitted to the remote server by using an
1697 : * asynchronous execution function, the command might not have yet
1698 : * completed. Check to see if a command is still being processed by the
1699 : * remote server, and if so, request cancellation of the command.
1700 : */
1701 16 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1702 : {
1703 : TimestampTz endtime;
1704 :
1705 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1706 : CONNECTION_CLEANUP_TIMEOUT);
1707 0 : if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1708 0 : return false; /* Unable to cancel running query */
1709 0 : *cancel_requested = lappend(*cancel_requested, entry);
1710 : }
1711 : else
1712 : {
1713 : char sql[100];
1714 :
1715 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1716 16 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1717 0 : return false; /* Unable to abort remote transaction */
1718 16 : *pending_entries = lappend(*pending_entries, entry);
1719 : }
1720 :
1721 16 : return true;
1722 : }
1723 :
1724 : /*
1725 : * Finish pre-commit cleanup of connections on each of which we've sent a
1726 : * COMMIT command to the remote server.
1727 : */
1728 : static void
1729 26 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1730 : {
1731 : ConnCacheEntry *entry;
1732 26 : List *pending_deallocs = NIL;
1733 : ListCell *lc;
1734 :
1735 : Assert(pending_entries);
1736 :
1737 : /*
1738 : * Get the result of the COMMIT command for each of the pending entries
1739 : */
1740 58 : foreach(lc, pending_entries)
1741 : {
1742 32 : entry = (ConnCacheEntry *) lfirst(lc);
1743 :
1744 : Assert(entry->changing_xact_state);
1745 :
1746 : /*
1747 : * We might already have received the result on the socket, so pass
1748 : * consume_input=true to try to consume it first
1749 : */
1750 32 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1751 32 : entry->changing_xact_state = false;
1752 :
1753 : /* Do a DEALLOCATE ALL in parallel if needed */
1754 32 : if (entry->have_prep_stmt && entry->have_error)
1755 : {
1756 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1757 4 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1758 : {
1759 4 : pending_deallocs = lappend(pending_deallocs, entry);
1760 4 : continue;
1761 : }
1762 : }
1763 28 : entry->have_prep_stmt = false;
1764 28 : entry->have_error = false;
1765 :
1766 28 : pgfdw_reset_xact_state(entry, true);
1767 : }
1768 :
1769 : /* No further work if no pending entries */
1770 26 : if (!pending_deallocs)
1771 24 : return;
1772 :
1773 : /*
1774 : * Get the result of the DEALLOCATE command for each of the pending
1775 : * entries
1776 : */
1777 6 : foreach(lc, pending_deallocs)
1778 : {
1779 : PGresult *res;
1780 :
1781 4 : entry = (ConnCacheEntry *) lfirst(lc);
1782 :
1783 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1784 8 : while ((res = PQgetResult(entry->conn)) != NULL)
1785 : {
1786 4 : PQclear(res);
1787 : /* Stop if the connection is lost (else we'll loop infinitely) */
1788 4 : if (PQstatus(entry->conn) == CONNECTION_BAD)
1789 0 : break;
1790 : }
1791 4 : entry->have_prep_stmt = false;
1792 4 : entry->have_error = false;
1793 :
1794 4 : pgfdw_reset_xact_state(entry, true);
1795 : }
1796 : }
1797 :
1798 : /*
1799 : * Finish pre-subcommit cleanup of connections on each of which we've sent a
1800 : * RELEASE command to the remote server.
1801 : */
1802 : static void
1803 2 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1804 : {
1805 : ConnCacheEntry *entry;
1806 : char sql[100];
1807 : ListCell *lc;
1808 :
1809 : Assert(pending_entries);
1810 :
1811 : /*
1812 : * Get the result of the RELEASE command for each of the pending entries
1813 : */
1814 2 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1815 6 : foreach(lc, pending_entries)
1816 : {
1817 4 : entry = (ConnCacheEntry *) lfirst(lc);
1818 :
1819 : Assert(entry->changing_xact_state);
1820 :
1821 : /*
1822 : * We might already have received the result on the socket, so pass
1823 : * consume_input=true to try to consume it first
1824 : */
1825 4 : do_sql_command_end(entry->conn, sql, true);
1826 4 : entry->changing_xact_state = false;
1827 :
1828 4 : pgfdw_reset_xact_state(entry, false);
1829 : }
1830 2 : }
1831 :
1832 : /*
1833 : * Finish abort cleanup of connections on each of which we've sent an abort
1834 : * command or cancel request to the remote server.
1835 : */
1836 : static void
1837 8 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1838 : bool toplevel)
1839 : {
1840 8 : List *pending_deallocs = NIL;
1841 : ListCell *lc;
1842 :
1843 : /*
1844 : * For each of the pending cancel requests (if any), get and discard the
1845 : * result of the query, and submit an abort command to the remote server.
1846 : */
1847 8 : if (cancel_requested)
1848 : {
1849 0 : foreach(lc, cancel_requested)
1850 : {
1851 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1852 : TimestampTz endtime;
1853 : char sql[100];
1854 :
1855 : Assert(entry->changing_xact_state);
1856 :
1857 : /*
1858 : * Set end time. You might think we should do this before issuing
1859 : * cancel request like in normal mode, but that is problematic,
1860 : * because if, for example, it took longer than 30 seconds to
1861 : * process the first few entries in the cancel_requested list, it
1862 : * would cause a timeout error when processing each of the
1863 : * remaining entries in the list, leading to slamming that entry's
1864 : * connection shut.
1865 : */
1866 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1867 : CONNECTION_CLEANUP_TIMEOUT);
1868 :
1869 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1870 : {
1871 : /* Unable to cancel running query */
1872 0 : pgfdw_reset_xact_state(entry, toplevel);
1873 0 : continue;
1874 : }
1875 :
1876 : /* Send an abort command in parallel if needed */
1877 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1878 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1879 : {
1880 : /* Unable to abort remote (sub)transaction */
1881 0 : pgfdw_reset_xact_state(entry, toplevel);
1882 : }
1883 : else
1884 0 : pending_entries = lappend(pending_entries, entry);
1885 : }
1886 : }
1887 :
1888 : /* No further work if no pending entries */
1889 8 : if (!pending_entries)
1890 0 : return;
1891 :
1892 : /*
1893 : * Get the result of the abort command for each of the pending entries
1894 : */
1895 24 : foreach(lc, pending_entries)
1896 : {
1897 16 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1898 : TimestampTz endtime;
1899 : char sql[100];
1900 :
1901 : Assert(entry->changing_xact_state);
1902 :
1903 : /*
1904 : * Set end time. We do this now, not before issuing the command like
1905 : * in normal mode, for the same reason as for the cancel_requested
1906 : * entries.
1907 : */
1908 16 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1909 : CONNECTION_CLEANUP_TIMEOUT);
1910 :
1911 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1912 16 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1913 : true, false))
1914 : {
1915 : /* Unable to abort remote (sub)transaction */
1916 0 : pgfdw_reset_xact_state(entry, toplevel);
1917 8 : continue;
1918 : }
1919 :
1920 16 : if (toplevel)
1921 : {
1922 : /* Do a DEALLOCATE ALL in parallel if needed */
1923 8 : if (entry->have_prep_stmt && entry->have_error)
1924 : {
1925 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
1926 : "DEALLOCATE ALL"))
1927 : {
1928 : /* Trouble clearing prepared statements */
1929 0 : pgfdw_reset_xact_state(entry, toplevel);
1930 : }
1931 : else
1932 8 : pending_deallocs = lappend(pending_deallocs, entry);
1933 8 : continue;
1934 : }
1935 0 : entry->have_prep_stmt = false;
1936 0 : entry->have_error = false;
1937 : }
1938 :
1939 : /* Reset the per-connection state if needed */
1940 8 : if (entry->state.pendingAreq)
1941 0 : memset(&entry->state, 0, sizeof(entry->state));
1942 :
1943 : /* We're done with this entry; unset the changing_xact_state flag */
1944 8 : entry->changing_xact_state = false;
1945 8 : pgfdw_reset_xact_state(entry, toplevel);
1946 : }
1947 :
1948 : /* No further work if no pending entries */
1949 8 : if (!pending_deallocs)
1950 4 : return;
1951 : Assert(toplevel);
1952 :
1953 : /*
1954 : * Get the result of the DEALLOCATE command for each of the pending
1955 : * entries
1956 : */
1957 12 : foreach(lc, pending_deallocs)
1958 : {
1959 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1960 : TimestampTz endtime;
1961 :
1962 : Assert(entry->changing_xact_state);
1963 : Assert(entry->have_prep_stmt);
1964 : Assert(entry->have_error);
1965 :
1966 : /*
1967 : * Set end time. We do this now, not before issuing the command like
1968 : * in normal mode, for the same reason as for the cancel_requested
1969 : * entries.
1970 : */
1971 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1972 : CONNECTION_CLEANUP_TIMEOUT);
1973 :
1974 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1975 : endtime, true, true))
1976 : {
1977 : /* Trouble clearing prepared statements */
1978 0 : pgfdw_reset_xact_state(entry, toplevel);
1979 0 : continue;
1980 : }
1981 8 : entry->have_prep_stmt = false;
1982 8 : entry->have_error = false;
1983 :
1984 : /* Reset the per-connection state if needed */
1985 8 : if (entry->state.pendingAreq)
1986 0 : memset(&entry->state, 0, sizeof(entry->state));
1987 :
1988 : /* We're done with this entry; unset the changing_xact_state flag */
1989 8 : entry->changing_xact_state = false;
1990 8 : pgfdw_reset_xact_state(entry, toplevel);
1991 : }
1992 : }
1993 :
1994 : /* Number of output arguments (columns) for various API versions */
1995 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
1996 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 5
1997 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 5 /* maximum of above */
1998 :
1999 : /*
2000 : * Internal function used by postgres_fdw_get_connections variants.
2001 : *
2002 : * For API version 1.1, this function takes no input parameter and
2003 : * returns a set of records with the following values:
2004 : *
2005 : * - server_name - server name of active connection. In case the foreign server
2006 : * is dropped but still the connection is active, then the server name will
2007 : * be NULL in output.
2008 : * - valid - true/false representing whether the connection is valid or not.
2009 : * Note that connections can become invalid in pgfdw_inval_callback.
2010 : *
2011 : * For API version 1.2 and later, this function takes an input parameter
2012 : * to check a connection status and returns the following
2013 : * additional values along with the three values from version 1.1:
2014 : *
2015 : * - user_name - the local user name of the active connection. In case the
2016 : * user mapping is dropped but the connection is still active, then the
2017 : * user name will be NULL in the output.
2018 : * - used_in_xact - true if the connection is used in the current transaction.
2019 : * - closed - true if the connection is closed.
2020 : *
2021 : * No records are returned when there are no cached connections at all.
2022 : */
2023 : static void
2024 26 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
2025 : enum pgfdwVersion api_version)
2026 : {
2027 26 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2028 : HASH_SEQ_STATUS scan;
2029 : ConnCacheEntry *entry;
2030 :
2031 26 : InitMaterializedSRF(fcinfo, 0);
2032 :
2033 : /* If cache doesn't exist, we return no records */
2034 26 : if (!ConnectionHash)
2035 0 : return;
2036 :
2037 : /* Check we have the expected number of output arguments */
2038 26 : switch (rsinfo->setDesc->natts)
2039 : {
2040 0 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
2041 0 : if (api_version != PGFDW_V1_1)
2042 0 : elog(ERROR, "incorrect number of output arguments");
2043 0 : break;
2044 26 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
2045 26 : if (api_version != PGFDW_V1_2)
2046 0 : elog(ERROR, "incorrect number of output arguments");
2047 26 : break;
2048 0 : default:
2049 0 : elog(ERROR, "incorrect number of output arguments");
2050 : }
2051 :
2052 26 : hash_seq_init(&scan, ConnectionHash);
2053 226 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2054 : {
2055 : ForeignServer *server;
2056 200 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2057 200 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2058 200 : int i = 0;
2059 :
2060 : /* We only look for open remote connections */
2061 200 : if (!entry->conn)
2062 174 : continue;
2063 :
2064 26 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2065 :
2066 : /*
2067 : * The foreign server may have been dropped in current explicit
2068 : * transaction. It is not possible to drop the server from another
2069 : * session when the connection associated with it is in use in the
2070 : * current transaction, if tried so, the drop query in another session
2071 : * blocks until the current transaction finishes.
2072 : *
2073 : * Even though the server is dropped in the current transaction, the
2074 : * cache can still have associated active connection entry, say we
2075 : * call such connections dangling. Since we can not fetch the server
2076 : * name from system catalogs for dangling connections, instead we show
2077 : * NULL value for server name in output.
2078 : *
2079 : * We could have done better by storing the server name in the cache
2080 : * entry instead of server oid so that it could be used in the output.
2081 : * But the server name in each cache entry requires 64 bytes of
2082 : * memory, which is huge, when there are many cached connections and
2083 : * the use case i.e. dropping the foreign server within the explicit
2084 : * current transaction seems rare. So, we chose to show NULL value for
2085 : * server name in output.
2086 : *
2087 : * Such dangling connections get closed either in next use or at the
2088 : * end of current explicit transaction in pgfdw_xact_callback.
2089 : */
2090 26 : if (!server)
2091 : {
2092 : /*
2093 : * If the server has been dropped in the current explicit
2094 : * transaction, then this entry would have been invalidated in
2095 : * pgfdw_inval_callback at the end of drop server command. Note
2096 : * that this connection would not have been closed in
2097 : * pgfdw_inval_callback because it is still being used in the
2098 : * current explicit transaction. So, assert that here.
2099 : */
2100 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2101 :
2102 : /* Show null, if no server name was found */
2103 2 : nulls[i++] = true;
2104 : }
2105 : else
2106 24 : values[i++] = CStringGetTextDatum(server->servername);
2107 :
2108 26 : if (api_version >= PGFDW_V1_2)
2109 : {
2110 : HeapTuple tp;
2111 :
2112 : /* Use the system cache to obtain the user mapping */
2113 26 : tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
2114 :
2115 : /*
2116 : * Just like in the foreign server case, user mappings can also be
2117 : * dropped in the current explicit transaction. Therefore, the
2118 : * similar check as in the server case is required.
2119 : */
2120 26 : if (!HeapTupleIsValid(tp))
2121 : {
2122 : /*
2123 : * If we reach here, this entry must have been invalidated in
2124 : * pgfdw_inval_callback, same as in the server case.
2125 : */
2126 : Assert(entry->conn && entry->xact_depth > 0 &&
2127 : entry->invalidated);
2128 :
2129 2 : nulls[i++] = true;
2130 : }
2131 : else
2132 : {
2133 : Oid userid;
2134 :
2135 24 : userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2136 24 : values[i++] = CStringGetTextDatum(MappingUserName(userid));
2137 24 : ReleaseSysCache(tp);
2138 : }
2139 : }
2140 :
2141 26 : values[i++] = BoolGetDatum(!entry->invalidated);
2142 :
2143 26 : if (api_version >= PGFDW_V1_2)
2144 : {
2145 26 : bool check_conn = PG_GETARG_BOOL(0);
2146 :
2147 : /* Is this connection used in the current transaction? */
2148 26 : values[i++] = BoolGetDatum(entry->xact_depth > 0);
2149 :
2150 : /*
2151 : * If a connection status check is requested and supported, return
2152 : * whether the connection is closed. Otherwise, return NULL.
2153 : */
2154 26 : if (check_conn && pgfdw_conn_checkable())
2155 4 : values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2156 : else
2157 22 : nulls[i++] = true;
2158 : }
2159 :
2160 26 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2161 : }
2162 : }
2163 :
2164 : /*
2165 : * List active foreign server connections.
2166 : *
2167 : * The SQL API of this function has changed multiple times, and will likely
2168 : * do so again in future. To support the case where a newer version of this
2169 : * loadable module is being used with an old SQL declaration of the function,
2170 : * we continue to support the older API versions.
2171 : */
2172 : Datum
2173 26 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
2174 : {
2175 26 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
2176 :
2177 26 : PG_RETURN_VOID();
2178 : }
2179 :
2180 : Datum
2181 0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
2182 : {
2183 0 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
2184 :
2185 0 : PG_RETURN_VOID();
2186 : }
2187 :
2188 : /*
2189 : * Disconnect the specified cached connections.
2190 : *
2191 : * This function discards the open connections that are established by
2192 : * postgres_fdw from the local session to the foreign server with
2193 : * the given name. Note that there can be multiple connections to
2194 : * the given server using different user mappings. If the connections
2195 : * are used in the current local transaction, they are not disconnected
2196 : * and warning messages are reported. This function returns true
2197 : * if it disconnects at least one connection, otherwise false. If no
2198 : * foreign server with the given name is found, an error is reported.
2199 : */
2200 : Datum
2201 8 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2202 : {
2203 : ForeignServer *server;
2204 : char *servername;
2205 :
2206 8 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2207 8 : server = GetForeignServerByName(servername, false);
2208 :
2209 6 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2210 : }
2211 :
2212 : /*
2213 : * Disconnect all the cached connections.
2214 : *
2215 : * This function discards all the open connections that are established by
2216 : * postgres_fdw from the local session to the foreign servers.
2217 : * If the connections are used in the current local transaction, they are
2218 : * not disconnected and warning messages are reported. This function
2219 : * returns true if it disconnects at least one connection, otherwise false.
2220 : */
2221 : Datum
2222 10 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2223 : {
2224 10 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2225 : }
2226 :
2227 : /*
2228 : * Workhorse to disconnect cached connections.
2229 : *
2230 : * This function scans all the connection cache entries and disconnects
2231 : * the open connections whose foreign server OID matches with
2232 : * the specified one. If InvalidOid is specified, it disconnects all
2233 : * the cached connections.
2234 : *
2235 : * This function emits a warning for each connection that's used in
2236 : * the current transaction and doesn't close it. It returns true if
2237 : * it disconnects at least one connection, otherwise false.
2238 : *
2239 : * Note that this function disconnects even the connections that are
2240 : * established by other users in the same local session using different
2241 : * user mappings. This leads even non-superuser to be able to close
2242 : * the connections established by superusers in the same local session.
2243 : *
2244 : * XXX As of now we don't see any security risk doing this. But we should
2245 : * set some restrictions on that, for example, prevent non-superuser
2246 : * from closing the connections established by superusers even
2247 : * in the same session?
2248 : */
2249 : static bool
2250 16 : disconnect_cached_connections(Oid serverid)
2251 : {
2252 : HASH_SEQ_STATUS scan;
2253 : ConnCacheEntry *entry;
2254 16 : bool all = !OidIsValid(serverid);
2255 16 : bool result = false;
2256 :
2257 : /*
2258 : * Connection cache hashtable has not been initialized yet in this
2259 : * session, so return false.
2260 : */
2261 16 : if (!ConnectionHash)
2262 0 : return false;
2263 :
2264 16 : hash_seq_init(&scan, ConnectionHash);
2265 134 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2266 : {
2267 : /* Ignore cache entry if no open connection right now. */
2268 118 : if (!entry->conn)
2269 96 : continue;
2270 :
2271 22 : if (all || entry->serverid == serverid)
2272 : {
2273 : /*
2274 : * Emit a warning because the connection to close is used in the
2275 : * current transaction and cannot be disconnected right now.
2276 : */
2277 16 : if (entry->xact_depth > 0)
2278 : {
2279 : ForeignServer *server;
2280 :
2281 6 : server = GetForeignServerExtended(entry->serverid,
2282 : FSV_MISSING_OK);
2283 :
2284 6 : if (!server)
2285 : {
2286 : /*
2287 : * If the foreign server was dropped while its connection
2288 : * was used in the current transaction, the connection
2289 : * must have been marked as invalid by
2290 : * pgfdw_inval_callback at the end of DROP SERVER command.
2291 : */
2292 : Assert(entry->invalidated);
2293 :
2294 0 : ereport(WARNING,
2295 : (errmsg("cannot close dropped server connection because it is still in use")));
2296 : }
2297 : else
2298 6 : ereport(WARNING,
2299 : (errmsg("cannot close connection for server \"%s\" because it is still in use",
2300 : server->servername)));
2301 : }
2302 : else
2303 : {
2304 10 : elog(DEBUG3, "discarding connection %p", entry->conn);
2305 10 : disconnect_pg_server(entry);
2306 10 : result = true;
2307 : }
2308 : }
2309 : }
2310 :
2311 16 : return result;
2312 : }
2313 :
2314 : /*
2315 : * Check if the remote server closed the connection.
2316 : *
2317 : * Returns 1 if the connection is closed, -1 if an error occurred,
2318 : * and 0 if it's not closed or if the connection check is unavailable
2319 : * on this platform.
2320 : */
2321 : static int
2322 4 : pgfdw_conn_check(PGconn *conn)
2323 : {
2324 4 : int sock = PQsocket(conn);
2325 :
2326 4 : if (PQstatus(conn) != CONNECTION_OK || sock == -1)
2327 0 : return -1;
2328 :
2329 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2330 : {
2331 : struct pollfd input_fd;
2332 : int result;
2333 :
2334 4 : input_fd.fd = sock;
2335 4 : input_fd.events = POLLRDHUP;
2336 4 : input_fd.revents = 0;
2337 :
2338 : do
2339 4 : result = poll(&input_fd, 1, 0);
2340 4 : while (result < 0 && errno == EINTR);
2341 :
2342 4 : if (result < 0)
2343 0 : return -1;
2344 :
2345 4 : return (input_fd.revents &
2346 4 : (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2347 : }
2348 : #else
2349 : return 0;
2350 : #endif
2351 : }
2352 :
2353 : /*
2354 : * Check if connection status checking is available on this platform.
2355 : *
2356 : * Returns true if available, false otherwise.
2357 : */
2358 : static bool
2359 4 : pgfdw_conn_checkable(void)
2360 : {
2361 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2362 4 : return true;
2363 : #else
2364 : return false;
2365 : #endif
2366 : }
|