Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * connection.c
4 : * Connection management functions for postgres_fdw
5 : *
6 : * Portions Copyright (c) 2012-2024, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/postgres_fdw/connection.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #include "access/htup_details.h"
16 : #include "access/xact.h"
17 : #include "catalog/pg_user_mapping.h"
18 : #include "commands/defrem.h"
19 : #include "funcapi.h"
20 : #include "libpq/libpq-be.h"
21 : #include "libpq/libpq-be-fe-helpers.h"
22 : #include "mb/pg_wchar.h"
23 : #include "miscadmin.h"
24 : #include "pgstat.h"
25 : #include "postgres_fdw.h"
26 : #include "storage/fd.h"
27 : #include "storage/latch.h"
28 : #include "utils/builtins.h"
29 : #include "utils/datetime.h"
30 : #include "utils/hsearch.h"
31 : #include "utils/inval.h"
32 : #include "utils/memutils.h"
33 : #include "utils/syscache.h"
34 :
35 : /*
36 : * Connection cache hash table entry
37 : *
38 : * The lookup key in this hash table is the user mapping OID. We use just one
39 : * connection per user mapping ID, which ensures that all the scans use the
40 : * same snapshot during a query. Using the user mapping OID rather than
41 : * the foreign server OID + user OID avoids creating multiple connections when
42 : * the public user mapping applies to all user OIDs.
43 : *
44 : * The "conn" pointer can be NULL if we don't currently have a live connection.
45 : * When we do have a connection, xact_depth tracks the current depth of
46 : * transactions and subtransactions open on the remote side. We need to issue
47 : * commands at the same nesting depth on the remote as we're executing at
48 : * ourselves, so that rolling back a subtransaction will kill the right
49 : * queries and not the wrong ones.
50 : */
51 : typedef Oid ConnCacheKey;
52 :
53 : typedef struct ConnCacheEntry
54 : {
55 : ConnCacheKey key; /* hash key (must be first) */
56 : PGconn *conn; /* connection to foreign server, or NULL */
57 : /* Remaining fields are invalid when conn is NULL: */
58 : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
59 : * one level of subxact open, etc */
60 : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
61 : bool have_error; /* have any subxacts aborted in this xact? */
62 : bool changing_xact_state; /* xact state change in process */
63 : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
64 : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
65 : bool invalidated; /* true if reconnect is pending */
66 : bool keep_connections; /* setting value of keep_connections
67 : * server option */
68 : Oid serverid; /* foreign server OID used to get server name */
69 : uint32 server_hashvalue; /* hash value of foreign server OID */
70 : uint32 mapping_hashvalue; /* hash value of user mapping OID */
71 : PgFdwConnState state; /* extra per-connection state */
72 : } ConnCacheEntry;
73 :
74 : /*
75 : * Connection cache (initialized on first use)
76 : */
77 : static HTAB *ConnectionHash = NULL;
78 :
79 : /* for assigning cursor numbers and prepared statement numbers */
80 : static unsigned int cursor_number = 0;
81 : static unsigned int prep_stmt_number = 0;
82 :
83 : /* tracks whether any work is needed in callback functions */
84 : static bool xact_got_connection = false;
85 :
86 : /* custom wait event values, retrieved from shared memory */
87 : static uint32 pgfdw_we_cleanup_result = 0;
88 : static uint32 pgfdw_we_connect = 0;
89 : static uint32 pgfdw_we_get_result = 0;
90 :
91 : /*
92 : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
93 : * query; if it takes longer than 30 seconds to do these, we assume the
94 : * connection is dead.
95 : */
96 : #define CONNECTION_CLEANUP_TIMEOUT 30000
97 :
98 : /* Macro for constructing abort command to be sent */
99 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
100 : do { \
101 : if (toplevel) \
102 : snprintf((sql), sizeof(sql), \
103 : "ABORT TRANSACTION"); \
104 : else \
105 : snprintf((sql), sizeof(sql), \
106 : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
107 : (entry)->xact_depth, (entry)->xact_depth); \
108 : } while(0)
109 :
110 : /*
111 : * SQL functions
112 : */
113 4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
114 4 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
115 4 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
116 :
117 : /* prototypes of private functions */
118 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
119 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
120 : static void disconnect_pg_server(ConnCacheEntry *entry);
121 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
122 : static void configure_remote_session(PGconn *conn);
123 : static void do_sql_command_begin(PGconn *conn, const char *sql);
124 : static void do_sql_command_end(PGconn *conn, const char *sql,
125 : bool consume_input);
126 : static void begin_remote_xact(ConnCacheEntry *entry);
127 : static void pgfdw_xact_callback(XactEvent event, void *arg);
128 : static void pgfdw_subxact_callback(SubXactEvent event,
129 : SubTransactionId mySubid,
130 : SubTransactionId parentSubid,
131 : void *arg);
132 : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
133 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
134 : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
135 : static bool pgfdw_cancel_query(PGconn *conn);
136 : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
137 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
138 : bool consume_input);
139 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
140 : bool ignore_errors);
141 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
142 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
143 : TimestampTz endtime,
144 : bool consume_input,
145 : bool ignore_errors);
146 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
147 : PGresult **result, bool *timed_out);
148 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
149 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
150 : List **pending_entries,
151 : List **cancel_requested);
152 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
153 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
154 : int curlevel);
155 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
156 : List *cancel_requested,
157 : bool toplevel);
158 : static void pgfdw_security_check(const char **keywords, const char **values,
159 : UserMapping *user, PGconn *conn);
160 : static bool UserMappingPasswordRequired(UserMapping *user);
161 : static bool disconnect_cached_connections(Oid serverid);
162 :
163 : /*
164 : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
165 : * server with the user's authorization. A new connection is established
166 : * if we don't already have a suitable one, and a transaction is opened at
167 : * the right subtransaction nesting depth if we didn't do that already.
168 : *
169 : * will_prep_stmt must be true if caller intends to create any prepared
170 : * statements. Since those don't go away automatically at transaction end
171 : * (not even on error), we need this flag to cue manual cleanup.
172 : *
173 : * If state is not NULL, *state receives the per-connection state associated
174 : * with the PGconn.
175 : */
176 : PGconn *
177 4188 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
178 : {
179 : bool found;
180 4188 : bool retry = false;
181 : ConnCacheEntry *entry;
182 : ConnCacheKey key;
183 4188 : MemoryContext ccxt = CurrentMemoryContext;
184 :
185 : /* First time through, initialize connection cache hashtable */
186 4188 : if (ConnectionHash == NULL)
187 : {
188 : HASHCTL ctl;
189 :
190 8 : if (pgfdw_we_get_result == 0)
191 8 : pgfdw_we_get_result =
192 8 : WaitEventExtensionNew("PostgresFdwGetResult");
193 :
194 8 : ctl.keysize = sizeof(ConnCacheKey);
195 8 : ctl.entrysize = sizeof(ConnCacheEntry);
196 8 : ConnectionHash = hash_create("postgres_fdw connections", 8,
197 : &ctl,
198 : HASH_ELEM | HASH_BLOBS);
199 :
200 : /*
201 : * Register some callback functions that manage connection cleanup.
202 : * This should be done just once in each backend.
203 : */
204 8 : RegisterXactCallback(pgfdw_xact_callback, NULL);
205 8 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
206 8 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
207 : pgfdw_inval_callback, (Datum) 0);
208 8 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
209 : pgfdw_inval_callback, (Datum) 0);
210 : }
211 :
212 : /* Set flag that we did GetConnection during the current transaction */
213 4188 : xact_got_connection = true;
214 :
215 : /* Create hash key for the entry. Assume no pad bytes in key struct */
216 4188 : key = user->umid;
217 :
218 : /*
219 : * Find or create cached entry for requested connection.
220 : */
221 4188 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
222 4188 : if (!found)
223 : {
224 : /*
225 : * We need only clear "conn" here; remaining fields will be filled
226 : * later when "conn" is set.
227 : */
228 24 : entry->conn = NULL;
229 : }
230 :
231 : /* Reject further use of connections which failed abort cleanup. */
232 4188 : pgfdw_reject_incomplete_xact_state_change(entry);
233 :
234 : /*
235 : * If the connection needs to be remade due to invalidation, disconnect as
236 : * soon as we're out of all transactions.
237 : */
238 4188 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
239 : {
240 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
241 : entry->conn);
242 0 : disconnect_pg_server(entry);
243 : }
244 :
245 : /*
246 : * If cache entry doesn't have a connection, we have to establish a new
247 : * connection. (If connect_pg_server throws an error, the cache entry
248 : * will remain in a valid empty state, ie conn == NULL.)
249 : */
250 4188 : if (entry->conn == NULL)
251 128 : make_new_connection(entry, user);
252 :
253 : /*
254 : * We check the health of the cached connection here when using it. In
255 : * cases where we're out of all transactions, if a broken connection is
256 : * detected, we try to reestablish a new connection later.
257 : */
258 4176 : PG_TRY();
259 : {
260 : /* Process a pending asynchronous request if any. */
261 4176 : if (entry->state.pendingAreq)
262 0 : process_pending_request(entry->state.pendingAreq);
263 : /* Start a new transaction or subtransaction if needed. */
264 4176 : begin_remote_xact(entry);
265 : }
266 4 : PG_CATCH();
267 : {
268 4 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
269 4 : ErrorData *errdata = CopyErrorData();
270 :
271 : /*
272 : * Determine whether to try to reestablish the connection.
273 : *
274 : * After a broken connection is detected in libpq, any error other
275 : * than connection failure (e.g., out-of-memory) can be thrown
276 : * somewhere between return from libpq and the expected ereport() call
277 : * in pgfdw_report_error(). In this case, since PQstatus() indicates
278 : * CONNECTION_BAD, checking only PQstatus() causes the false detection
279 : * of connection failure. To avoid this, we also verify that the
280 : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
281 : * checking only the sqlstate can cause another false detection
282 : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
283 : * for any libpq-originated error condition.
284 : */
285 4 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
286 4 : PQstatus(entry->conn) != CONNECTION_BAD ||
287 4 : entry->xact_depth > 0)
288 : {
289 2 : MemoryContextSwitchTo(ecxt);
290 2 : PG_RE_THROW();
291 : }
292 :
293 : /* Clean up the error state */
294 2 : FlushErrorState();
295 2 : FreeErrorData(errdata);
296 2 : errdata = NULL;
297 :
298 2 : retry = true;
299 : }
300 4174 : PG_END_TRY();
301 :
302 : /*
303 : * If a broken connection is detected, disconnect it, reestablish a new
304 : * connection and retry a new remote transaction. If connection failure is
305 : * reported again, we give up getting a connection.
306 : */
307 4174 : if (retry)
308 : {
309 : Assert(entry->xact_depth == 0);
310 :
311 2 : ereport(DEBUG3,
312 : (errmsg_internal("could not start remote transaction on connection %p",
313 : entry->conn)),
314 : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
315 :
316 2 : elog(DEBUG3, "closing connection %p to reestablish a new one",
317 : entry->conn);
318 2 : disconnect_pg_server(entry);
319 :
320 2 : make_new_connection(entry, user);
321 :
322 2 : begin_remote_xact(entry);
323 : }
324 :
325 : /* Remember if caller will prepare statements */
326 4174 : entry->have_prep_stmt |= will_prep_stmt;
327 :
328 : /* If caller needs access to the per-connection state, return it. */
329 4174 : if (state)
330 1420 : *state = &entry->state;
331 :
332 4174 : return entry->conn;
333 : }
334 :
335 : /*
336 : * Reset all transient state fields in the cached connection entry and
337 : * establish new connection to the remote server.
338 : */
339 : static void
340 130 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
341 : {
342 130 : ForeignServer *server = GetForeignServer(user->serverid);
343 : ListCell *lc;
344 :
345 : Assert(entry->conn == NULL);
346 :
347 : /* Reset all transient state fields, to be sure all are clean */
348 130 : entry->xact_depth = 0;
349 130 : entry->have_prep_stmt = false;
350 130 : entry->have_error = false;
351 130 : entry->changing_xact_state = false;
352 130 : entry->invalidated = false;
353 130 : entry->serverid = server->serverid;
354 130 : entry->server_hashvalue =
355 130 : GetSysCacheHashValue1(FOREIGNSERVEROID,
356 : ObjectIdGetDatum(server->serverid));
357 130 : entry->mapping_hashvalue =
358 130 : GetSysCacheHashValue1(USERMAPPINGOID,
359 : ObjectIdGetDatum(user->umid));
360 130 : memset(&entry->state, 0, sizeof(entry->state));
361 :
362 : /*
363 : * Determine whether to keep the connection that we're about to make here
364 : * open even after the transaction using it ends, so that the subsequent
365 : * transactions can re-use it.
366 : *
367 : * By default, all the connections to any foreign servers are kept open.
368 : *
369 : * Also determine whether to commit/abort (sub)transactions opened on the
370 : * remote server in parallel at (sub)transaction end, which is disabled by
371 : * default.
372 : *
373 : * Note: it's enough to determine these only when making a new connection
374 : * because if these settings for it are changed, it will be closed and
375 : * re-made later.
376 : */
377 130 : entry->keep_connections = true;
378 130 : entry->parallel_commit = false;
379 130 : entry->parallel_abort = false;
380 562 : foreach(lc, server->options)
381 : {
382 432 : DefElem *def = (DefElem *) lfirst(lc);
383 :
384 432 : if (strcmp(def->defname, "keep_connections") == 0)
385 12 : entry->keep_connections = defGetBoolean(def);
386 420 : else if (strcmp(def->defname, "parallel_commit") == 0)
387 4 : entry->parallel_commit = defGetBoolean(def);
388 416 : else if (strcmp(def->defname, "parallel_abort") == 0)
389 4 : entry->parallel_abort = defGetBoolean(def);
390 : }
391 :
392 : /* Now try to make the connection */
393 130 : entry->conn = connect_pg_server(server, user);
394 :
395 118 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
396 : entry->conn, server->servername, user->umid, user->userid);
397 118 : }
398 :
399 : /*
400 : * Check that non-superuser has used password or delegated credentials
401 : * to establish connection; otherwise, he's piggybacking on the
402 : * postgres server's user identity. See also dblink_security_check()
403 : * in contrib/dblink and check_conn_params.
404 : */
405 : static void
406 122 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
407 : {
408 : /* Superusers bypass the check */
409 122 : if (superuser_arg(user->userid))
410 114 : return;
411 :
412 : #ifdef ENABLE_GSS
413 : /* Connected via GSSAPI with delegated credentials- all good. */
414 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
415 : return;
416 : #endif
417 :
418 : /* Ok if superuser set PW required false. */
419 8 : if (!UserMappingPasswordRequired(user))
420 4 : return;
421 :
422 : /* Connected via PW, with PW required true, and provided non-empty PW. */
423 4 : if (PQconnectionUsedPassword(conn))
424 : {
425 : /* ok if params contain a non-empty password */
426 0 : for (int i = 0; keywords[i] != NULL; i++)
427 : {
428 0 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
429 0 : return;
430 : }
431 : }
432 :
433 4 : ereport(ERROR,
434 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
435 : errmsg("password or GSSAPI delegated credentials required"),
436 : errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
437 : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
438 : }
439 :
440 : /*
441 : * Connect to remote server using specified server and user mapping properties.
442 : */
443 : static PGconn *
444 130 : connect_pg_server(ForeignServer *server, UserMapping *user)
445 : {
446 130 : PGconn *volatile conn = NULL;
447 :
448 : /*
449 : * Use PG_TRY block to ensure closing connection on error.
450 : */
451 130 : PG_TRY();
452 : {
453 : const char **keywords;
454 : const char **values;
455 130 : char *appname = NULL;
456 : int n;
457 :
458 : /*
459 : * Construct connection params from generic options of ForeignServer
460 : * and UserMapping. (Some of them might not be libpq options, in
461 : * which case we'll just waste a few array slots.) Add 4 extra slots
462 : * for application_name, fallback_application_name, client_encoding,
463 : * end marker.
464 : */
465 130 : n = list_length(server->options) + list_length(user->options) + 4;
466 130 : keywords = (const char **) palloc(n * sizeof(char *));
467 130 : values = (const char **) palloc(n * sizeof(char *));
468 :
469 130 : n = 0;
470 260 : n += ExtractConnectionOptions(server->options,
471 130 : keywords + n, values + n);
472 260 : n += ExtractConnectionOptions(user->options,
473 130 : keywords + n, values + n);
474 :
475 : /*
476 : * Use pgfdw_application_name as application_name if set.
477 : *
478 : * PQconnectdbParams() processes the parameter arrays from start to
479 : * end. If any key word is repeated, the last value is used. Therefore
480 : * note that pgfdw_application_name must be added to the arrays after
481 : * options of ForeignServer are, so that it can override
482 : * application_name set in ForeignServer.
483 : */
484 130 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
485 : {
486 2 : keywords[n] = "application_name";
487 2 : values[n] = pgfdw_application_name;
488 2 : n++;
489 : }
490 :
491 : /*
492 : * Search the parameter arrays to find application_name setting, and
493 : * replace escape sequences in it with status information if found.
494 : * The arrays are searched backwards because the last value is used if
495 : * application_name is repeatedly set.
496 : */
497 334 : for (int i = n - 1; i >= 0; i--)
498 : {
499 236 : if (strcmp(keywords[i], "application_name") == 0 &&
500 32 : *(values[i]) != '\0')
501 : {
502 : /*
503 : * Use this application_name setting if it's not empty string
504 : * even after any escape sequences in it are replaced.
505 : */
506 32 : appname = process_pgfdw_appname(values[i]);
507 32 : if (appname[0] != '\0')
508 : {
509 32 : values[i] = appname;
510 32 : break;
511 : }
512 :
513 : /*
514 : * This empty application_name is not used, so we set
515 : * values[i] to NULL and keep searching the array to find the
516 : * next one.
517 : */
518 0 : values[i] = NULL;
519 0 : pfree(appname);
520 0 : appname = NULL;
521 : }
522 : }
523 :
524 : /* Use "postgres_fdw" as fallback_application_name */
525 130 : keywords[n] = "fallback_application_name";
526 130 : values[n] = "postgres_fdw";
527 130 : n++;
528 :
529 : /* Set client_encoding so that libpq can convert encoding properly. */
530 130 : keywords[n] = "client_encoding";
531 130 : values[n] = GetDatabaseEncodingName();
532 130 : n++;
533 :
534 130 : keywords[n] = values[n] = NULL;
535 :
536 : /* verify the set of connection parameters */
537 130 : check_conn_params(keywords, values, user);
538 :
539 : /* first time, allocate or get the custom wait event */
540 126 : if (pgfdw_we_connect == 0)
541 8 : pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
542 :
543 : /* OK to make connection */
544 126 : conn = libpqsrv_connect_params(keywords, values,
545 : false, /* expand_dbname */
546 : pgfdw_we_connect);
547 :
548 126 : if (!conn || PQstatus(conn) != CONNECTION_OK)
549 4 : ereport(ERROR,
550 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
551 : errmsg("could not connect to server \"%s\"",
552 : server->servername),
553 : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
554 :
555 : /* Perform post-connection security checks */
556 122 : pgfdw_security_check(keywords, values, user, conn);
557 :
558 : /* Prepare new session for use */
559 118 : configure_remote_session(conn);
560 :
561 118 : if (appname != NULL)
562 32 : pfree(appname);
563 118 : pfree(keywords);
564 118 : pfree(values);
565 : }
566 12 : PG_CATCH();
567 : {
568 12 : libpqsrv_disconnect(conn);
569 12 : PG_RE_THROW();
570 : }
571 118 : PG_END_TRY();
572 :
573 118 : return conn;
574 : }
575 :
576 : /*
577 : * Disconnect any open connection for a connection cache entry.
578 : */
579 : static void
580 112 : disconnect_pg_server(ConnCacheEntry *entry)
581 : {
582 112 : if (entry->conn != NULL)
583 : {
584 112 : libpqsrv_disconnect(entry->conn);
585 112 : entry->conn = NULL;
586 : }
587 112 : }
588 :
589 : /*
590 : * Return true if the password_required is defined and false for this user
591 : * mapping, otherwise false. The mapping has been pre-validated.
592 : */
593 : static bool
594 14 : UserMappingPasswordRequired(UserMapping *user)
595 : {
596 : ListCell *cell;
597 :
598 20 : foreach(cell, user->options)
599 : {
600 12 : DefElem *def = (DefElem *) lfirst(cell);
601 :
602 12 : if (strcmp(def->defname, "password_required") == 0)
603 6 : return defGetBoolean(def);
604 : }
605 :
606 8 : return true;
607 : }
608 :
609 : /*
610 : * For non-superusers, insist that the connstr specify a password or that the
611 : * user provided their own GSSAPI delegated credentials. This
612 : * prevents a password from being picked up from .pgpass, a service file, the
613 : * environment, etc. We don't want the postgres user's passwords,
614 : * certificates, etc to be accessible to non-superusers. (See also
615 : * dblink_connstr_check in contrib/dblink.)
616 : */
617 : static void
618 130 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
619 : {
620 : int i;
621 :
622 : /* no check required if superuser */
623 130 : if (superuser_arg(user->userid))
624 118 : return;
625 :
626 : #ifdef ENABLE_GSS
627 : /* ok if the user provided their own delegated credentials */
628 : if (be_gssapi_get_delegation(MyProcPort))
629 : return;
630 : #endif
631 :
632 : /* ok if params contain a non-empty password */
633 48 : for (i = 0; keywords[i] != NULL; i++)
634 : {
635 42 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
636 6 : return;
637 : }
638 :
639 : /* ok if the superuser explicitly said so at user mapping creation time */
640 6 : if (!UserMappingPasswordRequired(user))
641 2 : return;
642 :
643 4 : ereport(ERROR,
644 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
645 : errmsg("password or GSSAPI delegated credentials required"),
646 : errdetail("Non-superusers must delegate GSSAPI credentials or provide a password in the user mapping.")));
647 : }
648 :
649 : /*
650 : * Issue SET commands to make sure remote session is configured properly.
651 : *
652 : * We do this just once at connection, assuming nothing will change the
653 : * values later. Since we'll never send volatile function calls to the
654 : * remote, there shouldn't be any way to break this assumption from our end.
655 : * It's possible to think of ways to break it at the remote end, eg making
656 : * a foreign table point to a view that includes a set_config call ---
657 : * but once you admit the possibility of a malicious view definition,
658 : * there are any number of ways to break things.
659 : */
660 : static void
661 118 : configure_remote_session(PGconn *conn)
662 : {
663 118 : int remoteversion = PQserverVersion(conn);
664 :
665 : /* Force the search path to contain only pg_catalog (see deparse.c) */
666 118 : do_sql_command(conn, "SET search_path = pg_catalog");
667 :
668 : /*
669 : * Set remote timezone; this is basically just cosmetic, since all
670 : * transmitted and returned timestamptzs should specify a zone explicitly
671 : * anyway. However it makes the regression test outputs more predictable.
672 : *
673 : * We don't risk setting remote zone equal to ours, since the remote
674 : * server might use a different timezone database. Instead, use GMT
675 : * (quoted, because very old servers are picky about case). That's
676 : * guaranteed to work regardless of the remote's timezone database,
677 : * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
678 : */
679 118 : do_sql_command(conn, "SET timezone = 'GMT'");
680 :
681 : /*
682 : * Set values needed to ensure unambiguous data output from remote. (This
683 : * logic should match what pg_dump does. See also set_transmission_modes
684 : * in postgres_fdw.c.)
685 : */
686 118 : do_sql_command(conn, "SET datestyle = ISO");
687 118 : if (remoteversion >= 80400)
688 118 : do_sql_command(conn, "SET intervalstyle = postgres");
689 118 : if (remoteversion >= 90000)
690 118 : do_sql_command(conn, "SET extra_float_digits = 3");
691 : else
692 0 : do_sql_command(conn, "SET extra_float_digits = 2");
693 118 : }
694 :
695 : /*
696 : * Convenience subroutine to issue a non-data-returning SQL command to remote
697 : */
698 : void
699 3422 : do_sql_command(PGconn *conn, const char *sql)
700 : {
701 3422 : do_sql_command_begin(conn, sql);
702 3422 : do_sql_command_end(conn, sql, false);
703 3416 : }
704 :
705 : static void
706 3458 : do_sql_command_begin(PGconn *conn, const char *sql)
707 : {
708 3458 : if (!PQsendQuery(conn, sql))
709 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
710 3458 : }
711 :
712 : static void
713 3458 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
714 : {
715 : PGresult *res;
716 :
717 : /*
718 : * If requested, consume whatever data is available from the socket. (Note
719 : * that if all data is available, this allows pgfdw_get_result to call
720 : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
721 : * would be large compared to the overhead of PQconsumeInput.)
722 : */
723 3458 : if (consume_input && !PQconsumeInput(conn))
724 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
725 3458 : res = pgfdw_get_result(conn);
726 3458 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
727 6 : pgfdw_report_error(ERROR, res, conn, true, sql);
728 3452 : PQclear(res);
729 3452 : }
730 :
731 : /*
732 : * Start remote transaction or subtransaction, if needed.
733 : *
734 : * Note that we always use at least REPEATABLE READ in the remote session.
735 : * This is so that, if a query initiates multiple scans of the same or
736 : * different foreign tables, we will get snapshot-consistent results from
737 : * those scans. A disadvantage is that we can't provide sane emulation of
738 : * READ COMMITTED behavior --- it would be nice if we had some other way to
739 : * control which remote queries share a snapshot.
740 : */
741 : static void
742 4178 : begin_remote_xact(ConnCacheEntry *entry)
743 : {
744 4178 : int curlevel = GetCurrentTransactionNestLevel();
745 :
746 : /* Start main transaction if we haven't yet */
747 4178 : if (entry->xact_depth <= 0)
748 : {
749 : const char *sql;
750 :
751 1442 : elog(DEBUG3, "starting remote transaction on connection %p",
752 : entry->conn);
753 :
754 1442 : if (IsolationIsSerializable())
755 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
756 : else
757 1442 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
758 1442 : entry->changing_xact_state = true;
759 1442 : do_sql_command(entry->conn, sql);
760 1440 : entry->xact_depth = 1;
761 1440 : entry->changing_xact_state = false;
762 : }
763 :
764 : /*
765 : * If we're in a subtransaction, stack up savepoints to match our level.
766 : * This ensures we can rollback just the desired effects when a
767 : * subtransaction aborts.
768 : */
769 4204 : while (entry->xact_depth < curlevel)
770 : {
771 : char sql[64];
772 :
773 30 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
774 30 : entry->changing_xact_state = true;
775 30 : do_sql_command(entry->conn, sql);
776 28 : entry->xact_depth++;
777 28 : entry->changing_xact_state = false;
778 : }
779 4174 : }
780 :
781 : /*
782 : * Release connection reference count created by calling GetConnection.
783 : */
784 : void
785 4068 : ReleaseConnection(PGconn *conn)
786 : {
787 : /*
788 : * Currently, we don't actually track connection references because all
789 : * cleanup is managed on a transaction or subtransaction basis instead. So
790 : * there's nothing to do here.
791 : */
792 4068 : }
793 :
794 : /*
795 : * Assign a "unique" number for a cursor.
796 : *
797 : * These really only need to be unique per connection within a transaction.
798 : * For the moment we ignore the per-connection point and assign them across
799 : * all connections in the transaction, but we ask for the connection to be
800 : * supplied in case we want to refine that.
801 : *
802 : * Note that even if wraparound happens in a very long transaction, actual
803 : * collisions are highly improbable; just be sure to use %u not %d to print.
804 : */
805 : unsigned int
806 1018 : GetCursorNumber(PGconn *conn)
807 : {
808 1018 : return ++cursor_number;
809 : }
810 :
811 : /*
812 : * Assign a "unique" number for a prepared statement.
813 : *
814 : * This works much like GetCursorNumber, except that we never reset the counter
815 : * within a session. That's because we can't be 100% sure we've gotten rid
816 : * of all prepared statements on all connections, and it's not really worth
817 : * increasing the risk of prepared-statement name collisions by resetting.
818 : */
819 : unsigned int
820 348 : GetPrepStmtNumber(PGconn *conn)
821 : {
822 348 : return ++prep_stmt_number;
823 : }
824 :
825 : /*
826 : * Submit a query and wait for the result.
827 : *
828 : * Since we don't use non-blocking mode, this can't process interrupts while
829 : * pushing the query text to the server. That risk is relatively small, so we
830 : * ignore that for now.
831 : *
832 : * Caller is responsible for the error handling on the result.
833 : */
834 : PGresult *
835 7716 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
836 : {
837 : /* First, process a pending asynchronous request, if any. */
838 7716 : if (state && state->pendingAreq)
839 8 : process_pending_request(state->pendingAreq);
840 :
841 7716 : if (!PQsendQuery(conn, query))
842 0 : return NULL;
843 7716 : return pgfdw_get_result(conn);
844 : }
845 :
846 : /*
847 : * Wrap libpqsrv_get_result_last(), adding wait event.
848 : *
849 : * Caller is responsible for the error handling on the result.
850 : */
851 : PGresult *
852 15598 : pgfdw_get_result(PGconn *conn)
853 : {
854 15598 : return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
855 : }
856 :
857 : /*
858 : * Report an error we got from the remote server.
859 : *
860 : * elevel: error level to use (typically ERROR, but might be less)
861 : * res: PGresult containing the error
862 : * conn: connection we did the query on
863 : * clear: if true, PQclear the result (otherwise caller will handle it)
864 : * sql: NULL, or text of remote command we tried to execute
865 : *
866 : * Note: callers that choose not to throw ERROR for a remote error are
867 : * responsible for making sure that the associated ConnCacheEntry gets
868 : * marked with have_error = true.
869 : */
870 : void
871 32 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
872 : bool clear, const char *sql)
873 : {
874 : /* If requested, PGresult must be released before leaving this function. */
875 32 : PG_TRY();
876 : {
877 32 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
878 32 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
879 32 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
880 32 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
881 32 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
882 : int sqlstate;
883 :
884 32 : if (diag_sqlstate)
885 28 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
886 : diag_sqlstate[1],
887 : diag_sqlstate[2],
888 : diag_sqlstate[3],
889 : diag_sqlstate[4]);
890 : else
891 4 : sqlstate = ERRCODE_CONNECTION_FAILURE;
892 :
893 : /*
894 : * If we don't get a message from the PGresult, try the PGconn. This
895 : * is needed because for connection-level failures, PQgetResult may
896 : * just return NULL, not a PGresult at all.
897 : */
898 32 : if (message_primary == NULL)
899 4 : message_primary = pchomp(PQerrorMessage(conn));
900 :
901 32 : ereport(elevel,
902 : (errcode(sqlstate),
903 : (message_primary != NULL && message_primary[0] != '\0') ?
904 : errmsg_internal("%s", message_primary) :
905 : errmsg("could not obtain message string for remote error"),
906 : message_detail ? errdetail_internal("%s", message_detail) : 0,
907 : message_hint ? errhint("%s", message_hint) : 0,
908 : message_context ? errcontext("%s", message_context) : 0,
909 : sql ? errcontext("remote SQL command: %s", sql) : 0));
910 : }
911 32 : PG_FINALLY();
912 : {
913 32 : if (clear)
914 30 : PQclear(res);
915 : }
916 32 : PG_END_TRY();
917 0 : }
918 :
919 : /*
920 : * pgfdw_xact_callback --- cleanup at main-transaction end.
921 : *
922 : * This runs just late enough that it must not enter user-defined code
923 : * locally. (Entering such code on the remote side is fine. Its remote
924 : * COMMIT TRANSACTION may run deferred triggers.)
925 : */
926 : static void
927 7556 : pgfdw_xact_callback(XactEvent event, void *arg)
928 : {
929 : HASH_SEQ_STATUS scan;
930 : ConnCacheEntry *entry;
931 7556 : List *pending_entries = NIL;
932 7556 : List *cancel_requested = NIL;
933 :
934 : /* Quick exit if no connections were touched in this transaction. */
935 7556 : if (!xact_got_connection)
936 6178 : return;
937 :
938 : /*
939 : * Scan all connection cache entries to find open remote transactions, and
940 : * close them.
941 : */
942 1378 : hash_seq_init(&scan, ConnectionHash);
943 7104 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
944 : {
945 : PGresult *res;
946 :
947 : /* Ignore cache entry if no open connection right now */
948 5728 : if (entry->conn == NULL)
949 3240 : continue;
950 :
951 : /* If it has an open remote transaction, try to close it */
952 2488 : if (entry->xact_depth > 0)
953 : {
954 1442 : elog(DEBUG3, "closing remote transaction on connection %p",
955 : entry->conn);
956 :
957 1442 : switch (event)
958 : {
959 1358 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
960 : case XACT_EVENT_PRE_COMMIT:
961 :
962 : /*
963 : * If abort cleanup previously failed for this connection,
964 : * we can't issue any more commands against it.
965 : */
966 1358 : pgfdw_reject_incomplete_xact_state_change(entry);
967 :
968 : /* Commit all remote transactions during pre-commit */
969 1358 : entry->changing_xact_state = true;
970 1358 : if (entry->parallel_commit)
971 : {
972 32 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
973 32 : pending_entries = lappend(pending_entries, entry);
974 32 : continue;
975 : }
976 1326 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
977 1326 : entry->changing_xact_state = false;
978 :
979 : /*
980 : * If there were any errors in subtransactions, and we
981 : * made prepared statements, do a DEALLOCATE ALL to make
982 : * sure we get rid of all prepared statements. This is
983 : * annoying and not terribly bulletproof, but it's
984 : * probably not worth trying harder.
985 : *
986 : * DEALLOCATE ALL only exists in 8.3 and later, so this
987 : * constrains how old a server postgres_fdw can
988 : * communicate with. We intentionally ignore errors in
989 : * the DEALLOCATE, so that we can hobble along to some
990 : * extent with older servers (leaking prepared statements
991 : * as we go; but we don't really support update operations
992 : * pre-8.3 anyway).
993 : */
994 1326 : if (entry->have_prep_stmt && entry->have_error)
995 : {
996 0 : res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
997 : NULL);
998 0 : PQclear(res);
999 : }
1000 1326 : entry->have_prep_stmt = false;
1001 1326 : entry->have_error = false;
1002 1326 : break;
1003 2 : case XACT_EVENT_PRE_PREPARE:
1004 :
1005 : /*
1006 : * We disallow any remote transactions, since it's not
1007 : * very reasonable to hold them open until the prepared
1008 : * transaction is committed. For the moment, throw error
1009 : * unconditionally; later we might allow read-only cases.
1010 : * Note that the error will cause us to come right back
1011 : * here with event == XACT_EVENT_ABORT, so we'll clean up
1012 : * the connection state at that point.
1013 : */
1014 2 : ereport(ERROR,
1015 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1016 : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1017 : break;
1018 0 : case XACT_EVENT_PARALLEL_COMMIT:
1019 : case XACT_EVENT_COMMIT:
1020 : case XACT_EVENT_PREPARE:
1021 : /* Pre-commit should have closed the open transaction */
1022 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1023 : break;
1024 82 : case XACT_EVENT_PARALLEL_ABORT:
1025 : case XACT_EVENT_ABORT:
1026 : /* Rollback all remote transactions during abort */
1027 82 : if (entry->parallel_abort)
1028 : {
1029 8 : if (pgfdw_abort_cleanup_begin(entry, true,
1030 : &pending_entries,
1031 : &cancel_requested))
1032 8 : continue;
1033 : }
1034 : else
1035 74 : pgfdw_abort_cleanup(entry, true);
1036 74 : break;
1037 : }
1038 1046 : }
1039 :
1040 : /* Reset state to show we're out of a transaction */
1041 2446 : pgfdw_reset_xact_state(entry, true);
1042 : }
1043 :
1044 : /* If there are any pending connections, finish cleaning them up */
1045 1376 : if (pending_entries || cancel_requested)
1046 : {
1047 30 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1048 : event == XACT_EVENT_PRE_COMMIT)
1049 : {
1050 : Assert(cancel_requested == NIL);
1051 26 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1052 : }
1053 : else
1054 : {
1055 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1056 : event == XACT_EVENT_ABORT);
1057 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1058 : true);
1059 : }
1060 : }
1061 :
1062 : /*
1063 : * Regardless of the event type, we can now mark ourselves as out of the
1064 : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1065 : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1066 : */
1067 1376 : xact_got_connection = false;
1068 :
1069 : /* Also reset cursor numbering for next transaction */
1070 1376 : cursor_number = 0;
1071 : }
1072 :
1073 : /*
1074 : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1075 : */
1076 : static void
1077 76 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1078 : SubTransactionId parentSubid, void *arg)
1079 : {
1080 : HASH_SEQ_STATUS scan;
1081 : ConnCacheEntry *entry;
1082 : int curlevel;
1083 76 : List *pending_entries = NIL;
1084 76 : List *cancel_requested = NIL;
1085 :
1086 : /* Nothing to do at subxact start, nor after commit. */
1087 76 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1088 : event == SUBXACT_EVENT_ABORT_SUB))
1089 46 : return;
1090 :
1091 : /* Quick exit if no connections were touched in this transaction. */
1092 30 : if (!xact_got_connection)
1093 0 : return;
1094 :
1095 : /*
1096 : * Scan all connection cache entries to find open remote subtransactions
1097 : * of the current level, and close them.
1098 : */
1099 30 : curlevel = GetCurrentTransactionNestLevel();
1100 30 : hash_seq_init(&scan, ConnectionHash);
1101 204 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1102 : {
1103 : char sql[100];
1104 :
1105 : /*
1106 : * We only care about connections with open remote subtransactions of
1107 : * the current level.
1108 : */
1109 174 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1110 158 : continue;
1111 :
1112 28 : if (entry->xact_depth > curlevel)
1113 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1114 : entry->xact_depth);
1115 :
1116 28 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1117 : {
1118 : /*
1119 : * If abort cleanup previously failed for this connection, we
1120 : * can't issue any more commands against it.
1121 : */
1122 14 : pgfdw_reject_incomplete_xact_state_change(entry);
1123 :
1124 : /* Commit all remote subtransactions during pre-commit */
1125 14 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1126 14 : entry->changing_xact_state = true;
1127 14 : if (entry->parallel_commit)
1128 : {
1129 4 : do_sql_command_begin(entry->conn, sql);
1130 4 : pending_entries = lappend(pending_entries, entry);
1131 4 : continue;
1132 : }
1133 10 : do_sql_command(entry->conn, sql);
1134 10 : entry->changing_xact_state = false;
1135 : }
1136 : else
1137 : {
1138 : /* Rollback all remote subtransactions during abort */
1139 14 : if (entry->parallel_abort)
1140 : {
1141 8 : if (pgfdw_abort_cleanup_begin(entry, false,
1142 : &pending_entries,
1143 : &cancel_requested))
1144 8 : continue;
1145 : }
1146 : else
1147 6 : pgfdw_abort_cleanup(entry, false);
1148 : }
1149 :
1150 : /* OK, we're outta that level of subtransaction */
1151 16 : pgfdw_reset_xact_state(entry, false);
1152 : }
1153 :
1154 : /* If there are any pending connections, finish cleaning them up */
1155 30 : if (pending_entries || cancel_requested)
1156 : {
1157 6 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1158 : {
1159 : Assert(cancel_requested == NIL);
1160 2 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1161 : }
1162 : else
1163 : {
1164 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1165 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1166 : false);
1167 : }
1168 : }
1169 : }
1170 :
1171 : /*
1172 : * Connection invalidation callback function
1173 : *
1174 : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1175 : * close connections depending on that entry immediately if current transaction
1176 : * has not used those connections yet. Otherwise, mark those connections as
1177 : * invalid and then make pgfdw_xact_callback() close them at the end of current
1178 : * transaction, since they cannot be closed in the midst of the transaction
1179 : * using them. Closed connections will be remade at the next opportunity if
1180 : * necessary.
1181 : *
1182 : * Although most cache invalidation callbacks blow away all the related stuff
1183 : * regardless of the given hashvalue, connections are expensive enough that
1184 : * it's worth trying to avoid that.
1185 : *
1186 : * NB: We could avoid unnecessary disconnection more strictly by examining
1187 : * individual option values, but it seems too much effort for the gain.
1188 : */
1189 : static void
1190 340 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1191 : {
1192 : HASH_SEQ_STATUS scan;
1193 : ConnCacheEntry *entry;
1194 :
1195 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1196 :
1197 : /* ConnectionHash must exist already, if we're registered */
1198 340 : hash_seq_init(&scan, ConnectionHash);
1199 2292 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1200 : {
1201 : /* Ignore invalid entries */
1202 1952 : if (entry->conn == NULL)
1203 1588 : continue;
1204 :
1205 : /* hashvalue == 0 means a cache reset, must clear all state */
1206 364 : if (hashvalue == 0 ||
1207 258 : (cacheid == FOREIGNSERVEROID &&
1208 364 : entry->server_hashvalue == hashvalue) ||
1209 106 : (cacheid == USERMAPPINGOID &&
1210 106 : entry->mapping_hashvalue == hashvalue))
1211 : {
1212 : /*
1213 : * Close the connection immediately if it's not used yet in this
1214 : * transaction. Otherwise mark it as invalid so that
1215 : * pgfdw_xact_callback() can close it at the end of this
1216 : * transaction.
1217 : */
1218 98 : if (entry->xact_depth == 0)
1219 : {
1220 92 : elog(DEBUG3, "discarding connection %p", entry->conn);
1221 92 : disconnect_pg_server(entry);
1222 : }
1223 : else
1224 6 : entry->invalidated = true;
1225 : }
1226 : }
1227 340 : }
1228 :
1229 : /*
1230 : * Raise an error if the given connection cache entry is marked as being
1231 : * in the middle of an xact state change. This should be called at which no
1232 : * such change is expected to be in progress; if one is found to be in
1233 : * progress, it means that we aborted in the middle of a previous state change
1234 : * and now don't know what the remote transaction state actually is.
1235 : * Such connections can't safely be further used. Re-establishing the
1236 : * connection would change the snapshot and roll back any writes already
1237 : * performed, so that's not an option, either. Thus, we must abort.
1238 : */
1239 : static void
1240 5560 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1241 : {
1242 : ForeignServer *server;
1243 :
1244 : /* nothing to do for inactive entries and entries of sane state */
1245 5560 : if (entry->conn == NULL || !entry->changing_xact_state)
1246 5560 : return;
1247 :
1248 : /* make sure this entry is inactive */
1249 0 : disconnect_pg_server(entry);
1250 :
1251 : /* find server name to be shown in the message below */
1252 0 : server = GetForeignServer(entry->serverid);
1253 :
1254 0 : ereport(ERROR,
1255 : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1256 : errmsg("connection to server \"%s\" was lost",
1257 : server->servername)));
1258 : }
1259 :
1260 : /*
1261 : * Reset state to show we're out of a (sub)transaction.
1262 : */
1263 : static void
1264 2514 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1265 : {
1266 2514 : if (toplevel)
1267 : {
1268 : /* Reset state to show we're out of a transaction */
1269 2486 : entry->xact_depth = 0;
1270 :
1271 : /*
1272 : * If the connection isn't in a good idle state, it is marked as
1273 : * invalid or keep_connections option of its server is disabled, then
1274 : * discard it to recover. Next GetConnection will open a new
1275 : * connection.
1276 : */
1277 4970 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1278 2484 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1279 2484 : entry->changing_xact_state ||
1280 2484 : entry->invalidated ||
1281 2480 : !entry->keep_connections)
1282 : {
1283 8 : elog(DEBUG3, "discarding connection %p", entry->conn);
1284 8 : disconnect_pg_server(entry);
1285 : }
1286 : }
1287 : else
1288 : {
1289 : /* Reset state to show we're out of a subtransaction */
1290 28 : entry->xact_depth--;
1291 : }
1292 2514 : }
1293 :
1294 : /*
1295 : * Cancel the currently-in-progress query (whose query text we do not have)
1296 : * and ignore the result. Returns true if we successfully cancel the query
1297 : * and discard any pending result, and false if not.
1298 : *
1299 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1300 : * recursion trouble, we'll end up slamming the connection shut, which will
1301 : * necessitate failing the entire toplevel transaction even if subtransactions
1302 : * were used. Try to use WARNING where we can.
1303 : *
1304 : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1305 : * query text from the pendingAreq saved in the per-connection state, then
1306 : * report the query using it.
1307 : */
1308 : static bool
1309 2 : pgfdw_cancel_query(PGconn *conn)
1310 : {
1311 : TimestampTz endtime;
1312 :
1313 : /*
1314 : * If it takes too long to cancel the query and discard the result, assume
1315 : * the connection is dead.
1316 : */
1317 2 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1318 : CONNECTION_CLEANUP_TIMEOUT);
1319 :
1320 2 : if (!pgfdw_cancel_query_begin(conn, endtime))
1321 0 : return false;
1322 2 : return pgfdw_cancel_query_end(conn, endtime, false);
1323 : }
1324 :
1325 : /*
1326 : * Submit a cancel request to the given connection, waiting only until
1327 : * the given time.
1328 : *
1329 : * We sleep interruptibly until we receive confirmation that the cancel
1330 : * request has been accepted, and if it is, return true; if the timeout
1331 : * lapses without that, or the request fails for whatever reason, return
1332 : * false.
1333 : */
1334 : static bool
1335 2 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1336 : {
1337 2 : const char *errormsg = libpqsrv_cancel(conn, endtime);
1338 :
1339 2 : if (errormsg != NULL)
1340 0 : ereport(WARNING,
1341 : errcode(ERRCODE_CONNECTION_FAILURE),
1342 : errmsg("could not send cancel request: %s", errormsg));
1343 :
1344 2 : return errormsg == NULL;
1345 : }
1346 :
1347 : static bool
1348 2 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime, bool consume_input)
1349 : {
1350 2 : PGresult *result = NULL;
1351 : bool timed_out;
1352 :
1353 : /*
1354 : * If requested, consume whatever data is available from the socket. (Note
1355 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1356 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1357 : * which would be large compared to the overhead of PQconsumeInput.)
1358 : */
1359 2 : if (consume_input && !PQconsumeInput(conn))
1360 : {
1361 0 : ereport(WARNING,
1362 : (errcode(ERRCODE_CONNECTION_FAILURE),
1363 : errmsg("could not get result of cancel request: %s",
1364 : pchomp(PQerrorMessage(conn)))));
1365 0 : return false;
1366 : }
1367 :
1368 : /* Get and discard the result of the query. */
1369 2 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1370 : {
1371 0 : if (timed_out)
1372 0 : ereport(WARNING,
1373 : (errmsg("could not get result of cancel request due to timeout")));
1374 : else
1375 0 : ereport(WARNING,
1376 : (errcode(ERRCODE_CONNECTION_FAILURE),
1377 : errmsg("could not get result of cancel request: %s",
1378 : pchomp(PQerrorMessage(conn)))));
1379 :
1380 0 : return false;
1381 : }
1382 2 : PQclear(result);
1383 :
1384 2 : return true;
1385 : }
1386 :
1387 : /*
1388 : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1389 : * result. If the query is executed without error, the return value is true.
1390 : * If the query is executed successfully but returns an error, the return
1391 : * value is true if and only if ignore_errors is set. If the query can't be
1392 : * sent or times out, the return value is false.
1393 : *
1394 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1395 : * recursion trouble, we'll end up slamming the connection shut, which will
1396 : * necessitate failing the entire toplevel transaction even if subtransactions
1397 : * were used. Try to use WARNING where we can.
1398 : */
1399 : static bool
1400 106 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1401 : {
1402 : TimestampTz endtime;
1403 :
1404 : /*
1405 : * If it takes too long to execute a cleanup query, assume the connection
1406 : * is dead. It's fairly likely that this is why we aborted in the first
1407 : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1408 : * be too long.
1409 : */
1410 106 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1411 : CONNECTION_CLEANUP_TIMEOUT);
1412 :
1413 106 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
1414 0 : return false;
1415 106 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1416 : false, ignore_errors);
1417 : }
1418 :
1419 : static bool
1420 130 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1421 : {
1422 : Assert(query != NULL);
1423 :
1424 : /*
1425 : * Submit a query. Since we don't use non-blocking mode, this also can
1426 : * block. But its risk is relatively small, so we ignore that for now.
1427 : */
1428 130 : if (!PQsendQuery(conn, query))
1429 : {
1430 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1431 0 : return false;
1432 : }
1433 :
1434 130 : return true;
1435 : }
1436 :
1437 : static bool
1438 130 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1439 : TimestampTz endtime, bool consume_input,
1440 : bool ignore_errors)
1441 : {
1442 130 : PGresult *result = NULL;
1443 : bool timed_out;
1444 :
1445 : Assert(query != NULL);
1446 :
1447 : /*
1448 : * If requested, consume whatever data is available from the socket. (Note
1449 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1450 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1451 : * which would be large compared to the overhead of PQconsumeInput.)
1452 : */
1453 130 : if (consume_input && !PQconsumeInput(conn))
1454 : {
1455 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1456 0 : return false;
1457 : }
1458 :
1459 : /* Get the result of the query. */
1460 130 : if (pgfdw_get_cleanup_result(conn, endtime, &result, &timed_out))
1461 : {
1462 0 : if (timed_out)
1463 0 : ereport(WARNING,
1464 : (errmsg("could not get query result due to timeout"),
1465 : errcontext("remote SQL command: %s", query)));
1466 : else
1467 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1468 :
1469 0 : return false;
1470 : }
1471 :
1472 : /* Issue a warning if not successful. */
1473 130 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1474 : {
1475 0 : pgfdw_report_error(WARNING, result, conn, true, query);
1476 0 : return ignore_errors;
1477 : }
1478 130 : PQclear(result);
1479 :
1480 130 : return true;
1481 : }
1482 :
1483 : /*
1484 : * Get, during abort cleanup, the result of a query that is in progress. This
1485 : * might be a query that is being interrupted by transaction abort, or it might
1486 : * be a query that was initiated as part of transaction abort to get the remote
1487 : * side back to the appropriate state.
1488 : *
1489 : * endtime is the time at which we should give up and assume the remote
1490 : * side is dead. Returns true if the timeout expired or connection trouble
1491 : * occurred, false otherwise. Sets *result except in case of a timeout.
1492 : * Sets timed_out to true only when the timeout expired.
1493 : */
1494 : static bool
1495 132 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime, PGresult **result,
1496 : bool *timed_out)
1497 : {
1498 132 : volatile bool failed = false;
1499 132 : PGresult *volatile last_res = NULL;
1500 :
1501 132 : *timed_out = false;
1502 :
1503 : /* In what follows, do not leak any PGresults on an error. */
1504 132 : PG_TRY();
1505 : {
1506 : for (;;)
1507 146 : {
1508 : PGresult *res;
1509 :
1510 400 : while (PQisBusy(conn))
1511 : {
1512 : int wc;
1513 122 : TimestampTz now = GetCurrentTimestamp();
1514 : long cur_timeout;
1515 :
1516 : /* If timeout has expired, give up, else get sleep time. */
1517 122 : cur_timeout = TimestampDifferenceMilliseconds(now, endtime);
1518 122 : if (cur_timeout <= 0)
1519 : {
1520 0 : *timed_out = true;
1521 0 : failed = true;
1522 0 : goto exit;
1523 : }
1524 :
1525 : /* first time, allocate or get the custom wait event */
1526 122 : if (pgfdw_we_cleanup_result == 0)
1527 2 : pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1528 :
1529 : /* Sleep until there's something to do */
1530 122 : wc = WaitLatchOrSocket(MyLatch,
1531 : WL_LATCH_SET | WL_SOCKET_READABLE |
1532 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1533 : PQsocket(conn),
1534 : cur_timeout, pgfdw_we_cleanup_result);
1535 122 : ResetLatch(MyLatch);
1536 :
1537 122 : CHECK_FOR_INTERRUPTS();
1538 :
1539 : /* Data available in socket? */
1540 122 : if (wc & WL_SOCKET_READABLE)
1541 : {
1542 122 : if (!PQconsumeInput(conn))
1543 : {
1544 : /* connection trouble */
1545 0 : failed = true;
1546 0 : goto exit;
1547 : }
1548 : }
1549 : }
1550 :
1551 278 : res = PQgetResult(conn);
1552 278 : if (res == NULL)
1553 132 : break; /* query is complete */
1554 :
1555 146 : PQclear(last_res);
1556 146 : last_res = res;
1557 : }
1558 132 : exit: ;
1559 : }
1560 0 : PG_CATCH();
1561 : {
1562 0 : PQclear(last_res);
1563 0 : PG_RE_THROW();
1564 : }
1565 132 : PG_END_TRY();
1566 :
1567 132 : if (failed)
1568 0 : PQclear(last_res);
1569 : else
1570 132 : *result = last_res;
1571 132 : return failed;
1572 : }
1573 :
1574 : /*
1575 : * Abort remote transaction or subtransaction.
1576 : *
1577 : * "toplevel" should be set to true if toplevel (main) transaction is
1578 : * rollbacked, false otherwise.
1579 : *
1580 : * Set entry->changing_xact_state to false on success, true on failure.
1581 : */
1582 : static void
1583 80 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1584 : {
1585 : char sql[100];
1586 :
1587 : /*
1588 : * Don't try to clean up the connection if we're already in error
1589 : * recursion trouble.
1590 : */
1591 80 : if (in_error_recursion_trouble())
1592 0 : entry->changing_xact_state = true;
1593 :
1594 : /*
1595 : * If connection is already unsalvageable, don't touch it further.
1596 : */
1597 80 : if (entry->changing_xact_state)
1598 2 : return;
1599 :
1600 : /*
1601 : * Mark this connection as in the process of changing transaction state.
1602 : */
1603 78 : entry->changing_xact_state = true;
1604 :
1605 : /* Assume we might have lost track of prepared statements */
1606 78 : entry->have_error = true;
1607 :
1608 : /*
1609 : * If a command has been submitted to the remote server by using an
1610 : * asynchronous execution function, the command might not have yet
1611 : * completed. Check to see if a command is still being processed by the
1612 : * remote server, and if so, request cancellation of the command.
1613 : */
1614 78 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1615 2 : !pgfdw_cancel_query(entry->conn))
1616 0 : return; /* Unable to cancel running query */
1617 :
1618 78 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1619 78 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1620 0 : return; /* Unable to abort remote (sub)transaction */
1621 :
1622 78 : if (toplevel)
1623 : {
1624 72 : if (entry->have_prep_stmt && entry->have_error &&
1625 28 : !pgfdw_exec_cleanup_query(entry->conn,
1626 : "DEALLOCATE ALL",
1627 : true))
1628 0 : return; /* Trouble clearing prepared statements */
1629 :
1630 72 : entry->have_prep_stmt = false;
1631 72 : entry->have_error = false;
1632 : }
1633 :
1634 : /*
1635 : * If pendingAreq of the per-connection state is not NULL, it means that
1636 : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1637 : * successfully and thus the per-connection state was not reset in
1638 : * fetch_more_data(); in that case reset the per-connection state here.
1639 : */
1640 78 : if (entry->state.pendingAreq)
1641 0 : memset(&entry->state, 0, sizeof(entry->state));
1642 :
1643 : /* Disarm changing_xact_state if it all worked */
1644 78 : entry->changing_xact_state = false;
1645 : }
1646 :
1647 : /*
1648 : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1649 : * don't wait for the result.
1650 : *
1651 : * Returns true if the abort command or cancel request is successfully issued,
1652 : * false otherwise. If the abort command is successfully issued, the given
1653 : * connection cache entry is appended to *pending_entries. Otherwise, if the
1654 : * cancel request is successfully issued, it is appended to *cancel_requested.
1655 : */
1656 : static bool
1657 16 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1658 : List **pending_entries, List **cancel_requested)
1659 : {
1660 : /*
1661 : * Don't try to clean up the connection if we're already in error
1662 : * recursion trouble.
1663 : */
1664 16 : if (in_error_recursion_trouble())
1665 0 : entry->changing_xact_state = true;
1666 :
1667 : /*
1668 : * If connection is already unsalvageable, don't touch it further.
1669 : */
1670 16 : if (entry->changing_xact_state)
1671 0 : return false;
1672 :
1673 : /*
1674 : * Mark this connection as in the process of changing transaction state.
1675 : */
1676 16 : entry->changing_xact_state = true;
1677 :
1678 : /* Assume we might have lost track of prepared statements */
1679 16 : entry->have_error = true;
1680 :
1681 : /*
1682 : * If a command has been submitted to the remote server by using an
1683 : * asynchronous execution function, the command might not have yet
1684 : * completed. Check to see if a command is still being processed by the
1685 : * remote server, and if so, request cancellation of the command.
1686 : */
1687 16 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1688 : {
1689 : TimestampTz endtime;
1690 :
1691 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1692 : CONNECTION_CLEANUP_TIMEOUT);
1693 0 : if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1694 0 : return false; /* Unable to cancel running query */
1695 0 : *cancel_requested = lappend(*cancel_requested, entry);
1696 : }
1697 : else
1698 : {
1699 : char sql[100];
1700 :
1701 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1702 16 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1703 0 : return false; /* Unable to abort remote transaction */
1704 16 : *pending_entries = lappend(*pending_entries, entry);
1705 : }
1706 :
1707 16 : return true;
1708 : }
1709 :
1710 : /*
1711 : * Finish pre-commit cleanup of connections on each of which we've sent a
1712 : * COMMIT command to the remote server.
1713 : */
1714 : static void
1715 26 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1716 : {
1717 : ConnCacheEntry *entry;
1718 26 : List *pending_deallocs = NIL;
1719 : ListCell *lc;
1720 :
1721 : Assert(pending_entries);
1722 :
1723 : /*
1724 : * Get the result of the COMMIT command for each of the pending entries
1725 : */
1726 58 : foreach(lc, pending_entries)
1727 : {
1728 32 : entry = (ConnCacheEntry *) lfirst(lc);
1729 :
1730 : Assert(entry->changing_xact_state);
1731 :
1732 : /*
1733 : * We might already have received the result on the socket, so pass
1734 : * consume_input=true to try to consume it first
1735 : */
1736 32 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1737 32 : entry->changing_xact_state = false;
1738 :
1739 : /* Do a DEALLOCATE ALL in parallel if needed */
1740 32 : if (entry->have_prep_stmt && entry->have_error)
1741 : {
1742 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1743 4 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1744 : {
1745 4 : pending_deallocs = lappend(pending_deallocs, entry);
1746 4 : continue;
1747 : }
1748 : }
1749 28 : entry->have_prep_stmt = false;
1750 28 : entry->have_error = false;
1751 :
1752 28 : pgfdw_reset_xact_state(entry, true);
1753 : }
1754 :
1755 : /* No further work if no pending entries */
1756 26 : if (!pending_deallocs)
1757 24 : return;
1758 :
1759 : /*
1760 : * Get the result of the DEALLOCATE command for each of the pending
1761 : * entries
1762 : */
1763 6 : foreach(lc, pending_deallocs)
1764 : {
1765 : PGresult *res;
1766 :
1767 4 : entry = (ConnCacheEntry *) lfirst(lc);
1768 :
1769 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1770 8 : while ((res = PQgetResult(entry->conn)) != NULL)
1771 : {
1772 4 : PQclear(res);
1773 : /* Stop if the connection is lost (else we'll loop infinitely) */
1774 4 : if (PQstatus(entry->conn) == CONNECTION_BAD)
1775 0 : break;
1776 : }
1777 4 : entry->have_prep_stmt = false;
1778 4 : entry->have_error = false;
1779 :
1780 4 : pgfdw_reset_xact_state(entry, true);
1781 : }
1782 : }
1783 :
1784 : /*
1785 : * Finish pre-subcommit cleanup of connections on each of which we've sent a
1786 : * RELEASE command to the remote server.
1787 : */
1788 : static void
1789 2 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1790 : {
1791 : ConnCacheEntry *entry;
1792 : char sql[100];
1793 : ListCell *lc;
1794 :
1795 : Assert(pending_entries);
1796 :
1797 : /*
1798 : * Get the result of the RELEASE command for each of the pending entries
1799 : */
1800 2 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1801 6 : foreach(lc, pending_entries)
1802 : {
1803 4 : entry = (ConnCacheEntry *) lfirst(lc);
1804 :
1805 : Assert(entry->changing_xact_state);
1806 :
1807 : /*
1808 : * We might already have received the result on the socket, so pass
1809 : * consume_input=true to try to consume it first
1810 : */
1811 4 : do_sql_command_end(entry->conn, sql, true);
1812 4 : entry->changing_xact_state = false;
1813 :
1814 4 : pgfdw_reset_xact_state(entry, false);
1815 : }
1816 2 : }
1817 :
1818 : /*
1819 : * Finish abort cleanup of connections on each of which we've sent an abort
1820 : * command or cancel request to the remote server.
1821 : */
1822 : static void
1823 8 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1824 : bool toplevel)
1825 : {
1826 8 : List *pending_deallocs = NIL;
1827 : ListCell *lc;
1828 :
1829 : /*
1830 : * For each of the pending cancel requests (if any), get and discard the
1831 : * result of the query, and submit an abort command to the remote server.
1832 : */
1833 8 : if (cancel_requested)
1834 : {
1835 0 : foreach(lc, cancel_requested)
1836 : {
1837 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1838 : TimestampTz endtime;
1839 : char sql[100];
1840 :
1841 : Assert(entry->changing_xact_state);
1842 :
1843 : /*
1844 : * Set end time. You might think we should do this before issuing
1845 : * cancel request like in normal mode, but that is problematic,
1846 : * because if, for example, it took longer than 30 seconds to
1847 : * process the first few entries in the cancel_requested list, it
1848 : * would cause a timeout error when processing each of the
1849 : * remaining entries in the list, leading to slamming that entry's
1850 : * connection shut.
1851 : */
1852 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1853 : CONNECTION_CLEANUP_TIMEOUT);
1854 :
1855 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime, true))
1856 : {
1857 : /* Unable to cancel running query */
1858 0 : pgfdw_reset_xact_state(entry, toplevel);
1859 0 : continue;
1860 : }
1861 :
1862 : /* Send an abort command in parallel if needed */
1863 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1864 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1865 : {
1866 : /* Unable to abort remote (sub)transaction */
1867 0 : pgfdw_reset_xact_state(entry, toplevel);
1868 : }
1869 : else
1870 0 : pending_entries = lappend(pending_entries, entry);
1871 : }
1872 : }
1873 :
1874 : /* No further work if no pending entries */
1875 8 : if (!pending_entries)
1876 0 : return;
1877 :
1878 : /*
1879 : * Get the result of the abort command for each of the pending entries
1880 : */
1881 24 : foreach(lc, pending_entries)
1882 : {
1883 16 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1884 : TimestampTz endtime;
1885 : char sql[100];
1886 :
1887 : Assert(entry->changing_xact_state);
1888 :
1889 : /*
1890 : * Set end time. We do this now, not before issuing the command like
1891 : * in normal mode, for the same reason as for the cancel_requested
1892 : * entries.
1893 : */
1894 16 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1895 : CONNECTION_CLEANUP_TIMEOUT);
1896 :
1897 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1898 16 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
1899 : true, false))
1900 : {
1901 : /* Unable to abort remote (sub)transaction */
1902 0 : pgfdw_reset_xact_state(entry, toplevel);
1903 8 : continue;
1904 : }
1905 :
1906 16 : if (toplevel)
1907 : {
1908 : /* Do a DEALLOCATE ALL in parallel if needed */
1909 8 : if (entry->have_prep_stmt && entry->have_error)
1910 : {
1911 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
1912 : "DEALLOCATE ALL"))
1913 : {
1914 : /* Trouble clearing prepared statements */
1915 0 : pgfdw_reset_xact_state(entry, toplevel);
1916 : }
1917 : else
1918 8 : pending_deallocs = lappend(pending_deallocs, entry);
1919 8 : continue;
1920 : }
1921 0 : entry->have_prep_stmt = false;
1922 0 : entry->have_error = false;
1923 : }
1924 :
1925 : /* Reset the per-connection state if needed */
1926 8 : if (entry->state.pendingAreq)
1927 0 : memset(&entry->state, 0, sizeof(entry->state));
1928 :
1929 : /* We're done with this entry; unset the changing_xact_state flag */
1930 8 : entry->changing_xact_state = false;
1931 8 : pgfdw_reset_xact_state(entry, toplevel);
1932 : }
1933 :
1934 : /* No further work if no pending entries */
1935 8 : if (!pending_deallocs)
1936 4 : return;
1937 : Assert(toplevel);
1938 :
1939 : /*
1940 : * Get the result of the DEALLOCATE command for each of the pending
1941 : * entries
1942 : */
1943 12 : foreach(lc, pending_deallocs)
1944 : {
1945 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1946 : TimestampTz endtime;
1947 :
1948 : Assert(entry->changing_xact_state);
1949 : Assert(entry->have_prep_stmt);
1950 : Assert(entry->have_error);
1951 :
1952 : /*
1953 : * Set end time. We do this now, not before issuing the command like
1954 : * in normal mode, for the same reason as for the cancel_requested
1955 : * entries.
1956 : */
1957 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1958 : CONNECTION_CLEANUP_TIMEOUT);
1959 :
1960 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
1961 : endtime, true, true))
1962 : {
1963 : /* Trouble clearing prepared statements */
1964 0 : pgfdw_reset_xact_state(entry, toplevel);
1965 0 : continue;
1966 : }
1967 8 : entry->have_prep_stmt = false;
1968 8 : entry->have_error = false;
1969 :
1970 : /* Reset the per-connection state if needed */
1971 8 : if (entry->state.pendingAreq)
1972 0 : memset(&entry->state, 0, sizeof(entry->state));
1973 :
1974 : /* We're done with this entry; unset the changing_xact_state flag */
1975 8 : entry->changing_xact_state = false;
1976 8 : pgfdw_reset_xact_state(entry, toplevel);
1977 : }
1978 : }
1979 :
1980 : /*
1981 : * List active foreign server connections.
1982 : *
1983 : * This function takes no input parameter and returns setof record made of
1984 : * following values:
1985 : * - server_name - server name of active connection. In case the foreign server
1986 : * is dropped but still the connection is active, then the server name will
1987 : * be NULL in output.
1988 : * - valid - true/false representing whether the connection is valid or not.
1989 : * Note that the connections can get invalidated in pgfdw_inval_callback.
1990 : *
1991 : * No records are returned when there are no cached connections at all.
1992 : */
1993 : Datum
1994 22 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
1995 : {
1996 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 2
1997 22 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1998 : HASH_SEQ_STATUS scan;
1999 : ConnCacheEntry *entry;
2000 :
2001 22 : InitMaterializedSRF(fcinfo, 0);
2002 :
2003 : /* If cache doesn't exist, we return no records */
2004 22 : if (!ConnectionHash)
2005 0 : PG_RETURN_VOID();
2006 :
2007 22 : hash_seq_init(&scan, ConnectionHash);
2008 186 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2009 : {
2010 : ForeignServer *server;
2011 164 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2012 164 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2013 :
2014 : /* We only look for open remote connections */
2015 164 : if (!entry->conn)
2016 142 : continue;
2017 :
2018 22 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2019 :
2020 : /*
2021 : * The foreign server may have been dropped in current explicit
2022 : * transaction. It is not possible to drop the server from another
2023 : * session when the connection associated with it is in use in the
2024 : * current transaction, if tried so, the drop query in another session
2025 : * blocks until the current transaction finishes.
2026 : *
2027 : * Even though the server is dropped in the current transaction, the
2028 : * cache can still have associated active connection entry, say we
2029 : * call such connections dangling. Since we can not fetch the server
2030 : * name from system catalogs for dangling connections, instead we show
2031 : * NULL value for server name in output.
2032 : *
2033 : * We could have done better by storing the server name in the cache
2034 : * entry instead of server oid so that it could be used in the output.
2035 : * But the server name in each cache entry requires 64 bytes of
2036 : * memory, which is huge, when there are many cached connections and
2037 : * the use case i.e. dropping the foreign server within the explicit
2038 : * current transaction seems rare. So, we chose to show NULL value for
2039 : * server name in output.
2040 : *
2041 : * Such dangling connections get closed either in next use or at the
2042 : * end of current explicit transaction in pgfdw_xact_callback.
2043 : */
2044 22 : if (!server)
2045 : {
2046 : /*
2047 : * If the server has been dropped in the current explicit
2048 : * transaction, then this entry would have been invalidated in
2049 : * pgfdw_inval_callback at the end of drop server command. Note
2050 : * that this connection would not have been closed in
2051 : * pgfdw_inval_callback because it is still being used in the
2052 : * current explicit transaction. So, assert that here.
2053 : */
2054 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2055 :
2056 : /* Show null, if no server name was found */
2057 2 : nulls[0] = true;
2058 : }
2059 : else
2060 20 : values[0] = CStringGetTextDatum(server->servername);
2061 :
2062 22 : values[1] = BoolGetDatum(!entry->invalidated);
2063 :
2064 22 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2065 : }
2066 :
2067 22 : PG_RETURN_VOID();
2068 : }
2069 :
2070 : /*
2071 : * Disconnect the specified cached connections.
2072 : *
2073 : * This function discards the open connections that are established by
2074 : * postgres_fdw from the local session to the foreign server with
2075 : * the given name. Note that there can be multiple connections to
2076 : * the given server using different user mappings. If the connections
2077 : * are used in the current local transaction, they are not disconnected
2078 : * and warning messages are reported. This function returns true
2079 : * if it disconnects at least one connection, otherwise false. If no
2080 : * foreign server with the given name is found, an error is reported.
2081 : */
2082 : Datum
2083 8 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2084 : {
2085 : ForeignServer *server;
2086 : char *servername;
2087 :
2088 8 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2089 8 : server = GetForeignServerByName(servername, false);
2090 :
2091 6 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2092 : }
2093 :
2094 : /*
2095 : * Disconnect all the cached connections.
2096 : *
2097 : * This function discards all the open connections that are established by
2098 : * postgres_fdw from the local session to the foreign servers.
2099 : * If the connections are used in the current local transaction, they are
2100 : * not disconnected and warning messages are reported. This function
2101 : * returns true if it disconnects at least one connection, otherwise false.
2102 : */
2103 : Datum
2104 8 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2105 : {
2106 8 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2107 : }
2108 :
2109 : /*
2110 : * Workhorse to disconnect cached connections.
2111 : *
2112 : * This function scans all the connection cache entries and disconnects
2113 : * the open connections whose foreign server OID matches with
2114 : * the specified one. If InvalidOid is specified, it disconnects all
2115 : * the cached connections.
2116 : *
2117 : * This function emits a warning for each connection that's used in
2118 : * the current transaction and doesn't close it. It returns true if
2119 : * it disconnects at least one connection, otherwise false.
2120 : *
2121 : * Note that this function disconnects even the connections that are
2122 : * established by other users in the same local session using different
2123 : * user mappings. This leads even non-superuser to be able to close
2124 : * the connections established by superusers in the same local session.
2125 : *
2126 : * XXX As of now we don't see any security risk doing this. But we should
2127 : * set some restrictions on that, for example, prevent non-superuser
2128 : * from closing the connections established by superusers even
2129 : * in the same session?
2130 : */
2131 : static bool
2132 14 : disconnect_cached_connections(Oid serverid)
2133 : {
2134 : HASH_SEQ_STATUS scan;
2135 : ConnCacheEntry *entry;
2136 14 : bool all = !OidIsValid(serverid);
2137 14 : bool result = false;
2138 :
2139 : /*
2140 : * Connection cache hashtable has not been initialized yet in this
2141 : * session, so return false.
2142 : */
2143 14 : if (!ConnectionHash)
2144 0 : return false;
2145 :
2146 14 : hash_seq_init(&scan, ConnectionHash);
2147 114 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2148 : {
2149 : /* Ignore cache entry if no open connection right now. */
2150 100 : if (!entry->conn)
2151 78 : continue;
2152 :
2153 22 : if (all || entry->serverid == serverid)
2154 : {
2155 : /*
2156 : * Emit a warning because the connection to close is used in the
2157 : * current transaction and cannot be disconnected right now.
2158 : */
2159 16 : if (entry->xact_depth > 0)
2160 : {
2161 : ForeignServer *server;
2162 :
2163 6 : server = GetForeignServerExtended(entry->serverid,
2164 : FSV_MISSING_OK);
2165 :
2166 6 : if (!server)
2167 : {
2168 : /*
2169 : * If the foreign server was dropped while its connection
2170 : * was used in the current transaction, the connection
2171 : * must have been marked as invalid by
2172 : * pgfdw_inval_callback at the end of DROP SERVER command.
2173 : */
2174 : Assert(entry->invalidated);
2175 :
2176 0 : ereport(WARNING,
2177 : (errmsg("cannot close dropped server connection because it is still in use")));
2178 : }
2179 : else
2180 6 : ereport(WARNING,
2181 : (errmsg("cannot close connection for server \"%s\" because it is still in use",
2182 : server->servername)));
2183 : }
2184 : else
2185 : {
2186 10 : elog(DEBUG3, "discarding connection %p", entry->conn);
2187 10 : disconnect_pg_server(entry);
2188 10 : result = true;
2189 : }
2190 : }
2191 : }
2192 :
2193 14 : return result;
2194 : }
|