Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * connection.c
4 : * Connection management functions for postgres_fdw
5 : *
6 : * Portions Copyright (c) 2012-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * contrib/postgres_fdw/connection.c
10 : *
11 : *-------------------------------------------------------------------------
12 : */
13 : #include "postgres.h"
14 :
15 : #if HAVE_POLL_H
16 : #include <poll.h>
17 : #endif
18 :
19 : #include "access/xact.h"
20 : #include "catalog/pg_user_mapping.h"
21 : #include "commands/defrem.h"
22 : #include "common/base64.h"
23 : #include "funcapi.h"
24 : #include "libpq/libpq-be.h"
25 : #include "libpq/libpq-be-fe-helpers.h"
26 : #include "mb/pg_wchar.h"
27 : #include "miscadmin.h"
28 : #include "pgstat.h"
29 : #include "postgres_fdw.h"
30 : #include "storage/latch.h"
31 : #include "utils/builtins.h"
32 : #include "utils/hsearch.h"
33 : #include "utils/inval.h"
34 : #include "utils/syscache.h"
35 :
36 : /*
37 : * Connection cache hash table entry
38 : *
39 : * The lookup key in this hash table is the user mapping OID. We use just one
40 : * connection per user mapping ID, which ensures that all the scans use the
41 : * same snapshot during a query. Using the user mapping OID rather than
42 : * the foreign server OID + user OID avoids creating multiple connections when
43 : * the public user mapping applies to all user OIDs.
44 : *
45 : * The "conn" pointer can be NULL if we don't currently have a live connection.
46 : * When we do have a connection, xact_depth tracks the current depth of
47 : * transactions and subtransactions open on the remote side. We need to issue
48 : * commands at the same nesting depth on the remote as we're executing at
49 : * ourselves, so that rolling back a subtransaction will kill the right
50 : * queries and not the wrong ones.
51 : */
52 : typedef Oid ConnCacheKey;
53 :
54 : typedef struct ConnCacheEntry
55 : {
56 : ConnCacheKey key; /* hash key (must be first) */
57 : PGconn *conn; /* connection to foreign server, or NULL */
58 : /* Remaining fields are invalid when conn is NULL: */
59 : int xact_depth; /* 0 = no xact open, 1 = main xact open, 2 =
60 : * one level of subxact open, etc */
61 : bool have_prep_stmt; /* have we prepared any stmts in this xact? */
62 : bool have_error; /* have any subxacts aborted in this xact? */
63 : bool changing_xact_state; /* xact state change in process */
64 : bool parallel_commit; /* do we commit (sub)xacts in parallel? */
65 : bool parallel_abort; /* do we abort (sub)xacts in parallel? */
66 : bool invalidated; /* true if reconnect is pending */
67 : bool keep_connections; /* setting value of keep_connections
68 : * server option */
69 : Oid serverid; /* foreign server OID used to get server name */
70 : uint32 server_hashvalue; /* hash value of foreign server OID */
71 : uint32 mapping_hashvalue; /* hash value of user mapping OID */
72 : PgFdwConnState state; /* extra per-connection state */
73 : } ConnCacheEntry;
74 :
75 : /*
76 : * Connection cache (initialized on first use)
77 : */
78 : static HTAB *ConnectionHash = NULL;
79 :
80 : /* for assigning cursor numbers and prepared statement numbers */
81 : static unsigned int cursor_number = 0;
82 : static unsigned int prep_stmt_number = 0;
83 :
84 : /* tracks whether any work is needed in callback functions */
85 : static bool xact_got_connection = false;
86 :
87 : /* custom wait event values, retrieved from shared memory */
88 : static uint32 pgfdw_we_cleanup_result = 0;
89 : static uint32 pgfdw_we_connect = 0;
90 : static uint32 pgfdw_we_get_result = 0;
91 :
92 : /*
93 : * Milliseconds to wait to cancel an in-progress query or execute a cleanup
94 : * query; if it takes longer than 30 seconds to do these, we assume the
95 : * connection is dead.
96 : */
97 : #define CONNECTION_CLEANUP_TIMEOUT 30000
98 :
99 : /*
100 : * Milliseconds to wait before issuing another cancel request. This covers
101 : * the race condition where the remote session ignored our cancel request
102 : * because it arrived while idle.
103 : */
104 : #define RETRY_CANCEL_TIMEOUT 1000
105 :
106 : /* Macro for constructing abort command to be sent */
107 : #define CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel) \
108 : do { \
109 : if (toplevel) \
110 : snprintf((sql), sizeof(sql), \
111 : "ABORT TRANSACTION"); \
112 : else \
113 : snprintf((sql), sizeof(sql), \
114 : "ROLLBACK TO SAVEPOINT s%d; RELEASE SAVEPOINT s%d", \
115 : (entry)->xact_depth, (entry)->xact_depth); \
116 : } while(0)
117 :
118 : /*
119 : * Extension version number, for supporting older extension versions' objects
120 : */
121 : enum pgfdwVersion
122 : {
123 : PGFDW_V1_1 = 0,
124 : PGFDW_V1_2,
125 : };
126 :
127 : /*
128 : * SQL functions
129 : */
130 4 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections);
131 6 : PG_FUNCTION_INFO_V1(postgres_fdw_get_connections_1_2);
132 6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect);
133 6 : PG_FUNCTION_INFO_V1(postgres_fdw_disconnect_all);
134 :
135 : /* prototypes of private functions */
136 : static void make_new_connection(ConnCacheEntry *entry, UserMapping *user);
137 : static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
138 : static void disconnect_pg_server(ConnCacheEntry *entry);
139 : static void check_conn_params(const char **keywords, const char **values, UserMapping *user);
140 : static void configure_remote_session(PGconn *conn);
141 : static void do_sql_command_begin(PGconn *conn, const char *sql);
142 : static void do_sql_command_end(PGconn *conn, const char *sql,
143 : bool consume_input);
144 : static void begin_remote_xact(ConnCacheEntry *entry);
145 : static void pgfdw_xact_callback(XactEvent event, void *arg);
146 : static void pgfdw_subxact_callback(SubXactEvent event,
147 : SubTransactionId mySubid,
148 : SubTransactionId parentSubid,
149 : void *arg);
150 : static void pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue);
151 : static void pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry);
152 : static void pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel);
153 : static bool pgfdw_cancel_query(PGconn *conn);
154 : static bool pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime);
155 : static bool pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
156 : TimestampTz retrycanceltime,
157 : bool consume_input);
158 : static bool pgfdw_exec_cleanup_query(PGconn *conn, const char *query,
159 : bool ignore_errors);
160 : static bool pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query);
161 : static bool pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
162 : TimestampTz endtime,
163 : bool consume_input,
164 : bool ignore_errors);
165 : static bool pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
166 : TimestampTz retrycanceltime,
167 : PGresult **result, bool *timed_out);
168 : static void pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel);
169 : static bool pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
170 : List **pending_entries,
171 : List **cancel_requested);
172 : static void pgfdw_finish_pre_commit_cleanup(List *pending_entries);
173 : static void pgfdw_finish_pre_subcommit_cleanup(List *pending_entries,
174 : int curlevel);
175 : static void pgfdw_finish_abort_cleanup(List *pending_entries,
176 : List *cancel_requested,
177 : bool toplevel);
178 : static void pgfdw_security_check(const char **keywords, const char **values,
179 : UserMapping *user, PGconn *conn);
180 : static bool UserMappingPasswordRequired(UserMapping *user);
181 : static bool UseScramPassthrough(ForeignServer *server, UserMapping *user);
182 : static bool disconnect_cached_connections(Oid serverid);
183 : static void postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
184 : enum pgfdwVersion api_version);
185 : static int pgfdw_conn_check(PGconn *conn);
186 : static bool pgfdw_conn_checkable(void);
187 :
188 : /*
189 : * Get a PGconn which can be used to execute queries on the remote PostgreSQL
190 : * server with the user's authorization. A new connection is established
191 : * if we don't already have a suitable one, and a transaction is opened at
192 : * the right subtransaction nesting depth if we didn't do that already.
193 : *
194 : * will_prep_stmt must be true if caller intends to create any prepared
195 : * statements. Since those don't go away automatically at transaction end
196 : * (not even on error), we need this flag to cue manual cleanup.
197 : *
198 : * If state is not NULL, *state receives the per-connection state associated
199 : * with the PGconn.
200 : */
201 : PGconn *
202 4262 : GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
203 : {
204 : bool found;
205 4262 : bool retry = false;
206 : ConnCacheEntry *entry;
207 : ConnCacheKey key;
208 4262 : MemoryContext ccxt = CurrentMemoryContext;
209 :
210 : /* First time through, initialize connection cache hashtable */
211 4262 : if (ConnectionHash == NULL)
212 : {
213 : HASHCTL ctl;
214 :
215 18 : if (pgfdw_we_get_result == 0)
216 18 : pgfdw_we_get_result =
217 18 : WaitEventExtensionNew("PostgresFdwGetResult");
218 :
219 18 : ctl.keysize = sizeof(ConnCacheKey);
220 18 : ctl.entrysize = sizeof(ConnCacheEntry);
221 18 : ConnectionHash = hash_create("postgres_fdw connections", 8,
222 : &ctl,
223 : HASH_ELEM | HASH_BLOBS);
224 :
225 : /*
226 : * Register some callback functions that manage connection cleanup.
227 : * This should be done just once in each backend.
228 : */
229 18 : RegisterXactCallback(pgfdw_xact_callback, NULL);
230 18 : RegisterSubXactCallback(pgfdw_subxact_callback, NULL);
231 18 : CacheRegisterSyscacheCallback(FOREIGNSERVEROID,
232 : pgfdw_inval_callback, (Datum) 0);
233 18 : CacheRegisterSyscacheCallback(USERMAPPINGOID,
234 : pgfdw_inval_callback, (Datum) 0);
235 : }
236 :
237 : /* Set flag that we did GetConnection during the current transaction */
238 4262 : xact_got_connection = true;
239 :
240 : /* Create hash key for the entry. Assume no pad bytes in key struct */
241 4262 : key = user->umid;
242 :
243 : /*
244 : * Find or create cached entry for requested connection.
245 : */
246 4262 : entry = hash_search(ConnectionHash, &key, HASH_ENTER, &found);
247 4262 : if (!found)
248 : {
249 : /*
250 : * We need only clear "conn" here; remaining fields will be filled
251 : * later when "conn" is set.
252 : */
253 34 : entry->conn = NULL;
254 : }
255 :
256 : /* Reject further use of connections which failed abort cleanup. */
257 4262 : pgfdw_reject_incomplete_xact_state_change(entry);
258 :
259 : /*
260 : * If the connection needs to be remade due to invalidation, disconnect as
261 : * soon as we're out of all transactions.
262 : */
263 4262 : if (entry->conn != NULL && entry->invalidated && entry->xact_depth == 0)
264 : {
265 0 : elog(DEBUG3, "closing connection %p for option changes to take effect",
266 : entry->conn);
267 0 : disconnect_pg_server(entry);
268 : }
269 :
270 : /*
271 : * If cache entry doesn't have a connection, we have to establish a new
272 : * connection. (If connect_pg_server throws an error, the cache entry
273 : * will remain in a valid empty state, ie conn == NULL.)
274 : */
275 4262 : if (entry->conn == NULL)
276 140 : make_new_connection(entry, user);
277 :
278 : /*
279 : * We check the health of the cached connection here when using it. In
280 : * cases where we're out of all transactions, if a broken connection is
281 : * detected, we try to reestablish a new connection later.
282 : */
283 4250 : PG_TRY();
284 : {
285 : /* Process a pending asynchronous request if any. */
286 4250 : if (entry->state.pendingAreq)
287 0 : process_pending_request(entry->state.pendingAreq);
288 : /* Start a new transaction or subtransaction if needed. */
289 4250 : begin_remote_xact(entry);
290 : }
291 4 : PG_CATCH();
292 : {
293 4 : MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
294 4 : ErrorData *errdata = CopyErrorData();
295 :
296 : /*
297 : * Determine whether to try to reestablish the connection.
298 : *
299 : * After a broken connection is detected in libpq, any error other
300 : * than connection failure (e.g., out-of-memory) can be thrown
301 : * somewhere between return from libpq and the expected ereport() call
302 : * in pgfdw_report_error(). In this case, since PQstatus() indicates
303 : * CONNECTION_BAD, checking only PQstatus() causes the false detection
304 : * of connection failure. To avoid this, we also verify that the
305 : * error's sqlstate is ERRCODE_CONNECTION_FAILURE. Note that also
306 : * checking only the sqlstate can cause another false detection
307 : * because pgfdw_report_error() may report ERRCODE_CONNECTION_FAILURE
308 : * for any libpq-originated error condition.
309 : */
310 4 : if (errdata->sqlerrcode != ERRCODE_CONNECTION_FAILURE ||
311 4 : PQstatus(entry->conn) != CONNECTION_BAD ||
312 4 : entry->xact_depth > 0)
313 : {
314 2 : MemoryContextSwitchTo(ecxt);
315 2 : PG_RE_THROW();
316 : }
317 :
318 : /* Clean up the error state */
319 2 : FlushErrorState();
320 2 : FreeErrorData(errdata);
321 2 : errdata = NULL;
322 :
323 2 : retry = true;
324 : }
325 4248 : PG_END_TRY();
326 :
327 : /*
328 : * If a broken connection is detected, disconnect it, reestablish a new
329 : * connection and retry a new remote transaction. If connection failure is
330 : * reported again, we give up getting a connection.
331 : */
332 4248 : if (retry)
333 : {
334 : Assert(entry->xact_depth == 0);
335 :
336 2 : ereport(DEBUG3,
337 : (errmsg_internal("could not start remote transaction on connection %p",
338 : entry->conn)),
339 : errdetail_internal("%s", pchomp(PQerrorMessage(entry->conn))));
340 :
341 2 : elog(DEBUG3, "closing connection %p to reestablish a new one",
342 : entry->conn);
343 2 : disconnect_pg_server(entry);
344 :
345 2 : make_new_connection(entry, user);
346 :
347 2 : begin_remote_xact(entry);
348 : }
349 :
350 : /* Remember if caller will prepare statements */
351 4248 : entry->have_prep_stmt |= will_prep_stmt;
352 :
353 : /* If caller needs access to the per-connection state, return it. */
354 4248 : if (state)
355 1454 : *state = &entry->state;
356 :
357 4248 : return entry->conn;
358 : }
359 :
360 : /*
361 : * Reset all transient state fields in the cached connection entry and
362 : * establish new connection to the remote server.
363 : */
364 : static void
365 142 : make_new_connection(ConnCacheEntry *entry, UserMapping *user)
366 : {
367 142 : ForeignServer *server = GetForeignServer(user->serverid);
368 : ListCell *lc;
369 :
370 : Assert(entry->conn == NULL);
371 :
372 : /* Reset all transient state fields, to be sure all are clean */
373 142 : entry->xact_depth = 0;
374 142 : entry->have_prep_stmt = false;
375 142 : entry->have_error = false;
376 142 : entry->changing_xact_state = false;
377 142 : entry->invalidated = false;
378 142 : entry->serverid = server->serverid;
379 142 : entry->server_hashvalue =
380 142 : GetSysCacheHashValue1(FOREIGNSERVEROID,
381 : ObjectIdGetDatum(server->serverid));
382 142 : entry->mapping_hashvalue =
383 142 : GetSysCacheHashValue1(USERMAPPINGOID,
384 : ObjectIdGetDatum(user->umid));
385 142 : memset(&entry->state, 0, sizeof(entry->state));
386 :
387 : /*
388 : * Determine whether to keep the connection that we're about to make here
389 : * open even after the transaction using it ends, so that the subsequent
390 : * transactions can re-use it.
391 : *
392 : * By default, all the connections to any foreign servers are kept open.
393 : *
394 : * Also determine whether to commit/abort (sub)transactions opened on the
395 : * remote server in parallel at (sub)transaction end, which is disabled by
396 : * default.
397 : *
398 : * Note: it's enough to determine these only when making a new connection
399 : * because if these settings for it are changed, it will be closed and
400 : * re-made later.
401 : */
402 142 : entry->keep_connections = true;
403 142 : entry->parallel_commit = false;
404 142 : entry->parallel_abort = false;
405 630 : foreach(lc, server->options)
406 : {
407 488 : DefElem *def = (DefElem *) lfirst(lc);
408 :
409 488 : if (strcmp(def->defname, "keep_connections") == 0)
410 16 : entry->keep_connections = defGetBoolean(def);
411 472 : else if (strcmp(def->defname, "parallel_commit") == 0)
412 4 : entry->parallel_commit = defGetBoolean(def);
413 468 : else if (strcmp(def->defname, "parallel_abort") == 0)
414 4 : entry->parallel_abort = defGetBoolean(def);
415 : }
416 :
417 : /* Now try to make the connection */
418 142 : entry->conn = connect_pg_server(server, user);
419 :
420 130 : elog(DEBUG3, "new postgres_fdw connection %p for server \"%s\" (user mapping oid %u, userid %u)",
421 : entry->conn, server->servername, user->umid, user->userid);
422 130 : }
423 :
424 : /*
425 : * Check that non-superuser has used password or delegated credentials
426 : * to establish connection; otherwise, he's piggybacking on the
427 : * postgres server's user identity. See also dblink_security_check()
428 : * in contrib/dblink and check_conn_params.
429 : */
430 : static void
431 126 : pgfdw_security_check(const char **keywords, const char **values, UserMapping *user, PGconn *conn)
432 : {
433 : /* Superusers bypass the check */
434 126 : if (superuser_arg(user->userid))
435 118 : return;
436 :
437 : #ifdef ENABLE_GSS
438 : /* Connected via GSSAPI with delegated credentials- all good. */
439 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
440 : return;
441 : #endif
442 :
443 : /* Ok if superuser set PW required false. */
444 8 : if (!UserMappingPasswordRequired(user))
445 4 : return;
446 :
447 : /* Connected via PW, with PW required true, and provided non-empty PW. */
448 4 : if (PQconnectionUsedPassword(conn))
449 : {
450 : /* ok if params contain a non-empty password */
451 0 : for (int i = 0; keywords[i] != NULL; i++)
452 : {
453 0 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
454 0 : return;
455 : }
456 : }
457 :
458 4 : ereport(ERROR,
459 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
460 : errmsg("password or GSSAPI delegated credentials required"),
461 : errdetail("Non-superuser cannot connect if the server does not request a password or use GSSAPI with delegated credentials."),
462 : errhint("Target server's authentication method must be changed or password_required=false set in the user mapping attributes.")));
463 : }
464 :
465 : /*
466 : * Connect to remote server using specified server and user mapping properties.
467 : */
468 : static PGconn *
469 142 : connect_pg_server(ForeignServer *server, UserMapping *user)
470 : {
471 142 : PGconn *volatile conn = NULL;
472 :
473 : /*
474 : * Use PG_TRY block to ensure closing connection on error.
475 : */
476 142 : PG_TRY();
477 : {
478 : const char **keywords;
479 : const char **values;
480 142 : char *appname = NULL;
481 : int n;
482 :
483 : /*
484 : * Construct connection params from generic options of ForeignServer
485 : * and UserMapping. (Some of them might not be libpq options, in
486 : * which case we'll just waste a few array slots.) Add 4 extra slots
487 : * for application_name, fallback_application_name, client_encoding,
488 : * end marker.
489 : */
490 142 : n = list_length(server->options) + list_length(user->options) + 4 + 2;
491 142 : keywords = (const char **) palloc(n * sizeof(char *));
492 142 : values = (const char **) palloc(n * sizeof(char *));
493 :
494 142 : n = 0;
495 284 : n += ExtractConnectionOptions(server->options,
496 142 : keywords + n, values + n);
497 284 : n += ExtractConnectionOptions(user->options,
498 142 : keywords + n, values + n);
499 :
500 : /*
501 : * Use pgfdw_application_name as application_name if set.
502 : *
503 : * PQconnectdbParams() processes the parameter arrays from start to
504 : * end. If any key word is repeated, the last value is used. Therefore
505 : * note that pgfdw_application_name must be added to the arrays after
506 : * options of ForeignServer are, so that it can override
507 : * application_name set in ForeignServer.
508 : */
509 142 : if (pgfdw_application_name && *pgfdw_application_name != '\0')
510 : {
511 2 : keywords[n] = "application_name";
512 2 : values[n] = pgfdw_application_name;
513 2 : n++;
514 : }
515 :
516 : /*
517 : * Search the parameter arrays to find application_name setting, and
518 : * replace escape sequences in it with status information if found.
519 : * The arrays are searched backwards because the last value is used if
520 : * application_name is repeatedly set.
521 : */
522 378 : for (int i = n - 1; i >= 0; i--)
523 : {
524 272 : if (strcmp(keywords[i], "application_name") == 0 &&
525 36 : *(values[i]) != '\0')
526 : {
527 : /*
528 : * Use this application_name setting if it's not empty string
529 : * even after any escape sequences in it are replaced.
530 : */
531 36 : appname = process_pgfdw_appname(values[i]);
532 36 : if (appname[0] != '\0')
533 : {
534 36 : values[i] = appname;
535 36 : break;
536 : }
537 :
538 : /*
539 : * This empty application_name is not used, so we set
540 : * values[i] to NULL and keep searching the array to find the
541 : * next one.
542 : */
543 0 : values[i] = NULL;
544 0 : pfree(appname);
545 0 : appname = NULL;
546 : }
547 : }
548 :
549 : /* Use "postgres_fdw" as fallback_application_name */
550 142 : keywords[n] = "fallback_application_name";
551 142 : values[n] = "postgres_fdw";
552 142 : n++;
553 :
554 : /* Set client_encoding so that libpq can convert encoding properly. */
555 142 : keywords[n] = "client_encoding";
556 142 : values[n] = GetDatabaseEncodingName();
557 142 : n++;
558 :
559 142 : if (MyProcPort->has_scram_keys && UseScramPassthrough(server, user))
560 : {
561 : int len;
562 : int encoded_len;
563 :
564 8 : keywords[n] = "scram_client_key";
565 8 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
566 : /* don't forget the zero-terminator */
567 8 : values[n] = palloc0(len + 1);
568 16 : encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ClientKey,
569 : sizeof(MyProcPort->scram_ClientKey),
570 8 : (char *) values[n], len);
571 8 : if (encoded_len < 0)
572 0 : elog(ERROR, "could not encode SCRAM client key");
573 8 : n++;
574 :
575 8 : keywords[n] = "scram_server_key";
576 8 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
577 : /* don't forget the zero-terminator */
578 8 : values[n] = palloc0(len + 1);
579 16 : encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ServerKey,
580 : sizeof(MyProcPort->scram_ServerKey),
581 8 : (char *) values[n], len);
582 8 : if (encoded_len < 0)
583 0 : elog(ERROR, "could not encode SCRAM server key");
584 8 : n++;
585 : }
586 :
587 142 : keywords[n] = values[n] = NULL;
588 :
589 : /*
590 : * Verify the set of connection parameters only if scram pass-through
591 : * is not being used because the password is not necessary.
592 : */
593 142 : if (!(MyProcPort->has_scram_keys && UseScramPassthrough(server, user)))
594 134 : check_conn_params(keywords, values, user);
595 :
596 : /* first time, allocate or get the custom wait event */
597 138 : if (pgfdw_we_connect == 0)
598 18 : pgfdw_we_connect = WaitEventExtensionNew("PostgresFdwConnect");
599 :
600 : /* OK to make connection */
601 138 : conn = libpqsrv_connect_params(keywords, values,
602 : false, /* expand_dbname */
603 : pgfdw_we_connect);
604 :
605 138 : if (!conn || PQstatus(conn) != CONNECTION_OK)
606 4 : ereport(ERROR,
607 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
608 : errmsg("could not connect to server \"%s\"",
609 : server->servername),
610 : errdetail_internal("%s", pchomp(PQerrorMessage(conn)))));
611 :
612 : /*
613 : * Perform post-connection security checks only if scram pass-through
614 : * is not being used because the password is not necessary.
615 : */
616 134 : if (!(MyProcPort->has_scram_keys && UseScramPassthrough(server, user)))
617 126 : pgfdw_security_check(keywords, values, user, conn);
618 :
619 : /* Prepare new session for use */
620 130 : configure_remote_session(conn);
621 :
622 130 : if (appname != NULL)
623 36 : pfree(appname);
624 130 : pfree(keywords);
625 130 : pfree(values);
626 : }
627 12 : PG_CATCH();
628 : {
629 12 : libpqsrv_disconnect(conn);
630 12 : PG_RE_THROW();
631 : }
632 130 : PG_END_TRY();
633 :
634 130 : return conn;
635 : }
636 :
637 : /*
638 : * Disconnect any open connection for a connection cache entry.
639 : */
640 : static void
641 112 : disconnect_pg_server(ConnCacheEntry *entry)
642 : {
643 112 : if (entry->conn != NULL)
644 : {
645 112 : libpqsrv_disconnect(entry->conn);
646 112 : entry->conn = NULL;
647 : }
648 112 : }
649 :
650 : /*
651 : * Return true if the password_required is defined and false for this user
652 : * mapping, otherwise false. The mapping has been pre-validated.
653 : */
654 : static bool
655 14 : UserMappingPasswordRequired(UserMapping *user)
656 : {
657 : ListCell *cell;
658 :
659 20 : foreach(cell, user->options)
660 : {
661 12 : DefElem *def = (DefElem *) lfirst(cell);
662 :
663 12 : if (strcmp(def->defname, "password_required") == 0)
664 6 : return defGetBoolean(def);
665 : }
666 :
667 8 : return true;
668 : }
669 :
670 : static bool
671 24 : UseScramPassthrough(ForeignServer *server, UserMapping *user)
672 : {
673 : ListCell *cell;
674 :
675 96 : foreach(cell, server->options)
676 : {
677 96 : DefElem *def = (DefElem *) lfirst(cell);
678 :
679 96 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
680 24 : return defGetBoolean(def);
681 : }
682 :
683 0 : foreach(cell, user->options)
684 : {
685 0 : DefElem *def = (DefElem *) lfirst(cell);
686 :
687 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
688 0 : return defGetBoolean(def);
689 : }
690 :
691 0 : return false;
692 : }
693 :
694 : /*
695 : * For non-superusers, insist that the connstr specify a password or that the
696 : * user provided their own GSSAPI delegated credentials. This
697 : * prevents a password from being picked up from .pgpass, a service file, the
698 : * environment, etc. We don't want the postgres user's passwords,
699 : * certificates, etc to be accessible to non-superusers. (See also
700 : * dblink_connstr_check in contrib/dblink.)
701 : */
702 : static void
703 134 : check_conn_params(const char **keywords, const char **values, UserMapping *user)
704 : {
705 : int i;
706 :
707 : /* no check required if superuser */
708 134 : if (superuser_arg(user->userid))
709 122 : return;
710 :
711 : #ifdef ENABLE_GSS
712 : /* ok if the user provided their own delegated credentials */
713 : if (be_gssapi_get_delegation(MyProcPort))
714 : return;
715 : #endif
716 :
717 : /* ok if params contain a non-empty password */
718 48 : for (i = 0; keywords[i] != NULL; i++)
719 : {
720 42 : if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
721 6 : return;
722 : }
723 :
724 : /* ok if the superuser explicitly said so at user mapping creation time */
725 6 : if (!UserMappingPasswordRequired(user))
726 2 : return;
727 :
728 4 : ereport(ERROR,
729 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
730 : errmsg("password or GSSAPI delegated credentials required"),
731 : errdetail("Non-superusers must delegate GSSAPI credentials, provide a password, or enable SCRAM pass-through in user mapping.")));
732 : }
733 :
734 : /*
735 : * Issue SET commands to make sure remote session is configured properly.
736 : *
737 : * We do this just once at connection, assuming nothing will change the
738 : * values later. Since we'll never send volatile function calls to the
739 : * remote, there shouldn't be any way to break this assumption from our end.
740 : * It's possible to think of ways to break it at the remote end, eg making
741 : * a foreign table point to a view that includes a set_config call ---
742 : * but once you admit the possibility of a malicious view definition,
743 : * there are any number of ways to break things.
744 : */
745 : static void
746 130 : configure_remote_session(PGconn *conn)
747 : {
748 130 : int remoteversion = PQserverVersion(conn);
749 :
750 : /* Force the search path to contain only pg_catalog (see deparse.c) */
751 130 : do_sql_command(conn, "SET search_path = pg_catalog");
752 :
753 : /*
754 : * Set remote timezone; this is basically just cosmetic, since all
755 : * transmitted and returned timestamptzs should specify a zone explicitly
756 : * anyway. However it makes the regression test outputs more predictable.
757 : *
758 : * We don't risk setting remote zone equal to ours, since the remote
759 : * server might use a different timezone database. Instead, use GMT
760 : * (quoted, because very old servers are picky about case). That's
761 : * guaranteed to work regardless of the remote's timezone database,
762 : * because pg_tzset() hard-wires it (at least in PG 9.2 and later).
763 : */
764 130 : do_sql_command(conn, "SET timezone = 'GMT'");
765 :
766 : /*
767 : * Set values needed to ensure unambiguous data output from remote. (This
768 : * logic should match what pg_dump does. See also set_transmission_modes
769 : * in postgres_fdw.c.)
770 : */
771 130 : do_sql_command(conn, "SET datestyle = ISO");
772 130 : if (remoteversion >= 80400)
773 130 : do_sql_command(conn, "SET intervalstyle = postgres");
774 130 : if (remoteversion >= 90000)
775 130 : do_sql_command(conn, "SET extra_float_digits = 3");
776 : else
777 0 : do_sql_command(conn, "SET extra_float_digits = 2");
778 130 : }
779 :
780 : /*
781 : * Convenience subroutine to issue a non-data-returning SQL command to remote
782 : */
783 : void
784 3520 : do_sql_command(PGconn *conn, const char *sql)
785 : {
786 3520 : do_sql_command_begin(conn, sql);
787 3520 : do_sql_command_end(conn, sql, false);
788 3514 : }
789 :
790 : static void
791 3556 : do_sql_command_begin(PGconn *conn, const char *sql)
792 : {
793 3556 : if (!PQsendQuery(conn, sql))
794 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
795 3556 : }
796 :
797 : static void
798 3556 : do_sql_command_end(PGconn *conn, const char *sql, bool consume_input)
799 : {
800 : PGresult *res;
801 :
802 : /*
803 : * If requested, consume whatever data is available from the socket. (Note
804 : * that if all data is available, this allows pgfdw_get_result to call
805 : * PQgetResult without forcing the overhead of WaitLatchOrSocket, which
806 : * would be large compared to the overhead of PQconsumeInput.)
807 : */
808 3556 : if (consume_input && !PQconsumeInput(conn))
809 0 : pgfdw_report_error(ERROR, NULL, conn, false, sql);
810 3556 : res = pgfdw_get_result(conn);
811 3556 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
812 6 : pgfdw_report_error(ERROR, res, conn, true, sql);
813 3550 : PQclear(res);
814 3550 : }
815 :
816 : /*
817 : * Start remote transaction or subtransaction, if needed.
818 : *
819 : * Note that we always use at least REPEATABLE READ in the remote session.
820 : * This is so that, if a query initiates multiple scans of the same or
821 : * different foreign tables, we will get snapshot-consistent results from
822 : * those scans. A disadvantage is that we can't provide sane emulation of
823 : * READ COMMITTED behavior --- it would be nice if we had some other way to
824 : * control which remote queries share a snapshot.
825 : */
826 : static void
827 4252 : begin_remote_xact(ConnCacheEntry *entry)
828 : {
829 4252 : int curlevel = GetCurrentTransactionNestLevel();
830 :
831 : /* Start main transaction if we haven't yet */
832 4252 : if (entry->xact_depth <= 0)
833 : {
834 : const char *sql;
835 :
836 1464 : elog(DEBUG3, "starting remote transaction on connection %p",
837 : entry->conn);
838 :
839 1464 : if (IsolationIsSerializable())
840 0 : sql = "START TRANSACTION ISOLATION LEVEL SERIALIZABLE";
841 : else
842 1464 : sql = "START TRANSACTION ISOLATION LEVEL REPEATABLE READ";
843 1464 : entry->changing_xact_state = true;
844 1464 : do_sql_command(entry->conn, sql);
845 1462 : entry->xact_depth = 1;
846 1462 : entry->changing_xact_state = false;
847 : }
848 :
849 : /*
850 : * If we're in a subtransaction, stack up savepoints to match our level.
851 : * This ensures we can rollback just the desired effects when a
852 : * subtransaction aborts.
853 : */
854 4278 : while (entry->xact_depth < curlevel)
855 : {
856 : char sql[64];
857 :
858 30 : snprintf(sql, sizeof(sql), "SAVEPOINT s%d", entry->xact_depth + 1);
859 30 : entry->changing_xact_state = true;
860 30 : do_sql_command(entry->conn, sql);
861 28 : entry->xact_depth++;
862 28 : entry->changing_xact_state = false;
863 : }
864 4248 : }
865 :
866 : /*
867 : * Release connection reference count created by calling GetConnection.
868 : */
869 : void
870 4142 : ReleaseConnection(PGconn *conn)
871 : {
872 : /*
873 : * Currently, we don't actually track connection references because all
874 : * cleanup is managed on a transaction or subtransaction basis instead. So
875 : * there's nothing to do here.
876 : */
877 4142 : }
878 :
879 : /*
880 : * Assign a "unique" number for a cursor.
881 : *
882 : * These really only need to be unique per connection within a transaction.
883 : * For the moment we ignore the per-connection point and assign them across
884 : * all connections in the transaction, but we ask for the connection to be
885 : * supplied in case we want to refine that.
886 : *
887 : * Note that even if wraparound happens in a very long transaction, actual
888 : * collisions are highly improbable; just be sure to use %u not %d to print.
889 : */
890 : unsigned int
891 1048 : GetCursorNumber(PGconn *conn)
892 : {
893 1048 : return ++cursor_number;
894 : }
895 :
896 : /*
897 : * Assign a "unique" number for a prepared statement.
898 : *
899 : * This works much like GetCursorNumber, except that we never reset the counter
900 : * within a session. That's because we can't be 100% sure we've gotten rid
901 : * of all prepared statements on all connections, and it's not really worth
902 : * increasing the risk of prepared-statement name collisions by resetting.
903 : */
904 : unsigned int
905 356 : GetPrepStmtNumber(PGconn *conn)
906 : {
907 356 : return ++prep_stmt_number;
908 : }
909 :
910 : /*
911 : * Submit a query and wait for the result.
912 : *
913 : * Since we don't use non-blocking mode, this can't process interrupts while
914 : * pushing the query text to the server. That risk is relatively small, so we
915 : * ignore that for now.
916 : *
917 : * Caller is responsible for the error handling on the result.
918 : */
919 : PGresult *
920 7896 : pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
921 : {
922 : /* First, process a pending asynchronous request, if any. */
923 7896 : if (state && state->pendingAreq)
924 8 : process_pending_request(state->pendingAreq);
925 :
926 7896 : if (!PQsendQuery(conn, query))
927 0 : return NULL;
928 7896 : return pgfdw_get_result(conn);
929 : }
930 :
931 : /*
932 : * Wrap libpqsrv_get_result_last(), adding wait event.
933 : *
934 : * Caller is responsible for the error handling on the result.
935 : */
936 : PGresult *
937 15990 : pgfdw_get_result(PGconn *conn)
938 : {
939 15990 : return libpqsrv_get_result_last(conn, pgfdw_we_get_result);
940 : }
941 :
942 : /*
943 : * Report an error we got from the remote server.
944 : *
945 : * elevel: error level to use (typically ERROR, but might be less)
946 : * res: PGresult containing the error
947 : * conn: connection we did the query on
948 : * clear: if true, PQclear the result (otherwise caller will handle it)
949 : * sql: NULL, or text of remote command we tried to execute
950 : *
951 : * Note: callers that choose not to throw ERROR for a remote error are
952 : * responsible for making sure that the associated ConnCacheEntry gets
953 : * marked with have_error = true.
954 : */
955 : void
956 32 : pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
957 : bool clear, const char *sql)
958 : {
959 : /* If requested, PGresult must be released before leaving this function. */
960 32 : PG_TRY();
961 : {
962 32 : char *diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
963 32 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
964 32 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
965 32 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
966 32 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
967 : int sqlstate;
968 :
969 32 : if (diag_sqlstate)
970 28 : sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
971 : diag_sqlstate[1],
972 : diag_sqlstate[2],
973 : diag_sqlstate[3],
974 : diag_sqlstate[4]);
975 : else
976 4 : sqlstate = ERRCODE_CONNECTION_FAILURE;
977 :
978 : /*
979 : * If we don't get a message from the PGresult, try the PGconn. This
980 : * is needed because for connection-level failures, PQgetResult may
981 : * just return NULL, not a PGresult at all.
982 : */
983 32 : if (message_primary == NULL)
984 4 : message_primary = pchomp(PQerrorMessage(conn));
985 :
986 32 : ereport(elevel,
987 : (errcode(sqlstate),
988 : (message_primary != NULL && message_primary[0] != '\0') ?
989 : errmsg_internal("%s", message_primary) :
990 : errmsg("could not obtain message string for remote error"),
991 : message_detail ? errdetail_internal("%s", message_detail) : 0,
992 : message_hint ? errhint("%s", message_hint) : 0,
993 : message_context ? errcontext("%s", message_context) : 0,
994 : sql ? errcontext("remote SQL command: %s", sql) : 0));
995 : }
996 32 : PG_FINALLY();
997 : {
998 32 : if (clear)
999 30 : PQclear(res);
1000 : }
1001 32 : PG_END_TRY();
1002 0 : }
1003 :
1004 : /*
1005 : * pgfdw_xact_callback --- cleanup at main-transaction end.
1006 : *
1007 : * This runs just late enough that it must not enter user-defined code
1008 : * locally. (Entering such code on the remote side is fine. Its remote
1009 : * COMMIT TRANSACTION may run deferred triggers.)
1010 : */
1011 : static void
1012 7726 : pgfdw_xact_callback(XactEvent event, void *arg)
1013 : {
1014 : HASH_SEQ_STATUS scan;
1015 : ConnCacheEntry *entry;
1016 7726 : List *pending_entries = NIL;
1017 7726 : List *cancel_requested = NIL;
1018 :
1019 : /* Quick exit if no connections were touched in this transaction. */
1020 7726 : if (!xact_got_connection)
1021 6326 : return;
1022 :
1023 : /*
1024 : * Scan all connection cache entries to find open remote transactions, and
1025 : * close them.
1026 : */
1027 1400 : hash_seq_init(&scan, ConnectionHash);
1028 7192 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1029 : {
1030 : PGresult *res;
1031 :
1032 : /* Ignore cache entry if no open connection right now */
1033 5794 : if (entry->conn == NULL)
1034 3280 : continue;
1035 :
1036 : /* If it has an open remote transaction, try to close it */
1037 2514 : if (entry->xact_depth > 0)
1038 : {
1039 1464 : elog(DEBUG3, "closing remote transaction on connection %p",
1040 : entry->conn);
1041 :
1042 1464 : switch (event)
1043 : {
1044 1374 : case XACT_EVENT_PARALLEL_PRE_COMMIT:
1045 : case XACT_EVENT_PRE_COMMIT:
1046 :
1047 : /*
1048 : * If abort cleanup previously failed for this connection,
1049 : * we can't issue any more commands against it.
1050 : */
1051 1374 : pgfdw_reject_incomplete_xact_state_change(entry);
1052 :
1053 : /* Commit all remote transactions during pre-commit */
1054 1374 : entry->changing_xact_state = true;
1055 1374 : if (entry->parallel_commit)
1056 : {
1057 32 : do_sql_command_begin(entry->conn, "COMMIT TRANSACTION");
1058 32 : pending_entries = lappend(pending_entries, entry);
1059 32 : continue;
1060 : }
1061 1342 : do_sql_command(entry->conn, "COMMIT TRANSACTION");
1062 1342 : entry->changing_xact_state = false;
1063 :
1064 : /*
1065 : * If there were any errors in subtransactions, and we
1066 : * made prepared statements, do a DEALLOCATE ALL to make
1067 : * sure we get rid of all prepared statements. This is
1068 : * annoying and not terribly bulletproof, but it's
1069 : * probably not worth trying harder.
1070 : *
1071 : * DEALLOCATE ALL only exists in 8.3 and later, so this
1072 : * constrains how old a server postgres_fdw can
1073 : * communicate with. We intentionally ignore errors in
1074 : * the DEALLOCATE, so that we can hobble along to some
1075 : * extent with older servers (leaking prepared statements
1076 : * as we go; but we don't really support update operations
1077 : * pre-8.3 anyway).
1078 : */
1079 1342 : if (entry->have_prep_stmt && entry->have_error)
1080 : {
1081 0 : res = pgfdw_exec_query(entry->conn, "DEALLOCATE ALL",
1082 : NULL);
1083 0 : PQclear(res);
1084 : }
1085 1342 : entry->have_prep_stmt = false;
1086 1342 : entry->have_error = false;
1087 1342 : break;
1088 2 : case XACT_EVENT_PRE_PREPARE:
1089 :
1090 : /*
1091 : * We disallow any remote transactions, since it's not
1092 : * very reasonable to hold them open until the prepared
1093 : * transaction is committed. For the moment, throw error
1094 : * unconditionally; later we might allow read-only cases.
1095 : * Note that the error will cause us to come right back
1096 : * here with event == XACT_EVENT_ABORT, so we'll clean up
1097 : * the connection state at that point.
1098 : */
1099 2 : ereport(ERROR,
1100 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1101 : errmsg("cannot PREPARE a transaction that has operated on postgres_fdw foreign tables")));
1102 : break;
1103 0 : case XACT_EVENT_PARALLEL_COMMIT:
1104 : case XACT_EVENT_COMMIT:
1105 : case XACT_EVENT_PREPARE:
1106 : /* Pre-commit should have closed the open transaction */
1107 0 : elog(ERROR, "missed cleaning up connection during pre-commit");
1108 : break;
1109 88 : case XACT_EVENT_PARALLEL_ABORT:
1110 : case XACT_EVENT_ABORT:
1111 : /* Rollback all remote transactions during abort */
1112 88 : if (entry->parallel_abort)
1113 : {
1114 8 : if (pgfdw_abort_cleanup_begin(entry, true,
1115 : &pending_entries,
1116 : &cancel_requested))
1117 8 : continue;
1118 : }
1119 : else
1120 80 : pgfdw_abort_cleanup(entry, true);
1121 80 : break;
1122 : }
1123 1050 : }
1124 :
1125 : /* Reset state to show we're out of a transaction */
1126 2472 : pgfdw_reset_xact_state(entry, true);
1127 : }
1128 :
1129 : /* If there are any pending connections, finish cleaning them up */
1130 1398 : if (pending_entries || cancel_requested)
1131 : {
1132 30 : if (event == XACT_EVENT_PARALLEL_PRE_COMMIT ||
1133 : event == XACT_EVENT_PRE_COMMIT)
1134 : {
1135 : Assert(cancel_requested == NIL);
1136 26 : pgfdw_finish_pre_commit_cleanup(pending_entries);
1137 : }
1138 : else
1139 : {
1140 : Assert(event == XACT_EVENT_PARALLEL_ABORT ||
1141 : event == XACT_EVENT_ABORT);
1142 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1143 : true);
1144 : }
1145 : }
1146 :
1147 : /*
1148 : * Regardless of the event type, we can now mark ourselves as out of the
1149 : * transaction. (Note: if we are here during PRE_COMMIT or PRE_PREPARE,
1150 : * this saves a useless scan of the hashtable during COMMIT or PREPARE.)
1151 : */
1152 1398 : xact_got_connection = false;
1153 :
1154 : /* Also reset cursor numbering for next transaction */
1155 1398 : cursor_number = 0;
1156 : }
1157 :
1158 : /*
1159 : * pgfdw_subxact_callback --- cleanup at subtransaction end.
1160 : */
1161 : static void
1162 76 : pgfdw_subxact_callback(SubXactEvent event, SubTransactionId mySubid,
1163 : SubTransactionId parentSubid, void *arg)
1164 : {
1165 : HASH_SEQ_STATUS scan;
1166 : ConnCacheEntry *entry;
1167 : int curlevel;
1168 76 : List *pending_entries = NIL;
1169 76 : List *cancel_requested = NIL;
1170 :
1171 : /* Nothing to do at subxact start, nor after commit. */
1172 76 : if (!(event == SUBXACT_EVENT_PRE_COMMIT_SUB ||
1173 : event == SUBXACT_EVENT_ABORT_SUB))
1174 46 : return;
1175 :
1176 : /* Quick exit if no connections were touched in this transaction. */
1177 30 : if (!xact_got_connection)
1178 0 : return;
1179 :
1180 : /*
1181 : * Scan all connection cache entries to find open remote subtransactions
1182 : * of the current level, and close them.
1183 : */
1184 30 : curlevel = GetCurrentTransactionNestLevel();
1185 30 : hash_seq_init(&scan, ConnectionHash);
1186 204 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1187 : {
1188 : char sql[100];
1189 :
1190 : /*
1191 : * We only care about connections with open remote subtransactions of
1192 : * the current level.
1193 : */
1194 174 : if (entry->conn == NULL || entry->xact_depth < curlevel)
1195 158 : continue;
1196 :
1197 28 : if (entry->xact_depth > curlevel)
1198 0 : elog(ERROR, "missed cleaning up remote subtransaction at level %d",
1199 : entry->xact_depth);
1200 :
1201 28 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1202 : {
1203 : /*
1204 : * If abort cleanup previously failed for this connection, we
1205 : * can't issue any more commands against it.
1206 : */
1207 14 : pgfdw_reject_incomplete_xact_state_change(entry);
1208 :
1209 : /* Commit all remote subtransactions during pre-commit */
1210 14 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1211 14 : entry->changing_xact_state = true;
1212 14 : if (entry->parallel_commit)
1213 : {
1214 4 : do_sql_command_begin(entry->conn, sql);
1215 4 : pending_entries = lappend(pending_entries, entry);
1216 4 : continue;
1217 : }
1218 10 : do_sql_command(entry->conn, sql);
1219 10 : entry->changing_xact_state = false;
1220 : }
1221 : else
1222 : {
1223 : /* Rollback all remote subtransactions during abort */
1224 14 : if (entry->parallel_abort)
1225 : {
1226 8 : if (pgfdw_abort_cleanup_begin(entry, false,
1227 : &pending_entries,
1228 : &cancel_requested))
1229 8 : continue;
1230 : }
1231 : else
1232 6 : pgfdw_abort_cleanup(entry, false);
1233 : }
1234 :
1235 : /* OK, we're outta that level of subtransaction */
1236 16 : pgfdw_reset_xact_state(entry, false);
1237 : }
1238 :
1239 : /* If there are any pending connections, finish cleaning them up */
1240 30 : if (pending_entries || cancel_requested)
1241 : {
1242 6 : if (event == SUBXACT_EVENT_PRE_COMMIT_SUB)
1243 : {
1244 : Assert(cancel_requested == NIL);
1245 2 : pgfdw_finish_pre_subcommit_cleanup(pending_entries, curlevel);
1246 : }
1247 : else
1248 : {
1249 : Assert(event == SUBXACT_EVENT_ABORT_SUB);
1250 4 : pgfdw_finish_abort_cleanup(pending_entries, cancel_requested,
1251 : false);
1252 : }
1253 : }
1254 : }
1255 :
1256 : /*
1257 : * Connection invalidation callback function
1258 : *
1259 : * After a change to a pg_foreign_server or pg_user_mapping catalog entry,
1260 : * close connections depending on that entry immediately if current transaction
1261 : * has not used those connections yet. Otherwise, mark those connections as
1262 : * invalid and then make pgfdw_xact_callback() close them at the end of current
1263 : * transaction, since they cannot be closed in the midst of the transaction
1264 : * using them. Closed connections will be remade at the next opportunity if
1265 : * necessary.
1266 : *
1267 : * Although most cache invalidation callbacks blow away all the related stuff
1268 : * regardless of the given hashvalue, connections are expensive enough that
1269 : * it's worth trying to avoid that.
1270 : *
1271 : * NB: We could avoid unnecessary disconnection more strictly by examining
1272 : * individual option values, but it seems too much effort for the gain.
1273 : */
1274 : static void
1275 344 : pgfdw_inval_callback(Datum arg, int cacheid, uint32 hashvalue)
1276 : {
1277 : HASH_SEQ_STATUS scan;
1278 : ConnCacheEntry *entry;
1279 :
1280 : Assert(cacheid == FOREIGNSERVEROID || cacheid == USERMAPPINGOID);
1281 :
1282 : /* ConnectionHash must exist already, if we're registered */
1283 344 : hash_seq_init(&scan, ConnectionHash);
1284 2332 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
1285 : {
1286 : /* Ignore invalid entries */
1287 1988 : if (entry->conn == NULL)
1288 1624 : continue;
1289 :
1290 : /* hashvalue == 0 means a cache reset, must clear all state */
1291 364 : if (hashvalue == 0 ||
1292 258 : (cacheid == FOREIGNSERVEROID &&
1293 364 : entry->server_hashvalue == hashvalue) ||
1294 106 : (cacheid == USERMAPPINGOID &&
1295 106 : entry->mapping_hashvalue == hashvalue))
1296 : {
1297 : /*
1298 : * Close the connection immediately if it's not used yet in this
1299 : * transaction. Otherwise mark it as invalid so that
1300 : * pgfdw_xact_callback() can close it at the end of this
1301 : * transaction.
1302 : */
1303 98 : if (entry->xact_depth == 0)
1304 : {
1305 92 : elog(DEBUG3, "discarding connection %p", entry->conn);
1306 92 : disconnect_pg_server(entry);
1307 : }
1308 : else
1309 6 : entry->invalidated = true;
1310 : }
1311 : }
1312 344 : }
1313 :
1314 : /*
1315 : * Raise an error if the given connection cache entry is marked as being
1316 : * in the middle of an xact state change. This should be called at which no
1317 : * such change is expected to be in progress; if one is found to be in
1318 : * progress, it means that we aborted in the middle of a previous state change
1319 : * and now don't know what the remote transaction state actually is.
1320 : * Such connections can't safely be further used. Re-establishing the
1321 : * connection would change the snapshot and roll back any writes already
1322 : * performed, so that's not an option, either. Thus, we must abort.
1323 : */
1324 : static void
1325 5650 : pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
1326 : {
1327 : ForeignServer *server;
1328 :
1329 : /* nothing to do for inactive entries and entries of sane state */
1330 5650 : if (entry->conn == NULL || !entry->changing_xact_state)
1331 5650 : return;
1332 :
1333 : /* make sure this entry is inactive */
1334 0 : disconnect_pg_server(entry);
1335 :
1336 : /* find server name to be shown in the message below */
1337 0 : server = GetForeignServer(entry->serverid);
1338 :
1339 0 : ereport(ERROR,
1340 : (errcode(ERRCODE_CONNECTION_EXCEPTION),
1341 : errmsg("connection to server \"%s\" was lost",
1342 : server->servername)));
1343 : }
1344 :
1345 : /*
1346 : * Reset state to show we're out of a (sub)transaction.
1347 : */
1348 : static void
1349 2540 : pgfdw_reset_xact_state(ConnCacheEntry *entry, bool toplevel)
1350 : {
1351 2540 : if (toplevel)
1352 : {
1353 : /* Reset state to show we're out of a transaction */
1354 2512 : entry->xact_depth = 0;
1355 :
1356 : /*
1357 : * If the connection isn't in a good idle state, it is marked as
1358 : * invalid or keep_connections option of its server is disabled, then
1359 : * discard it to recover. Next GetConnection will open a new
1360 : * connection.
1361 : */
1362 5022 : if (PQstatus(entry->conn) != CONNECTION_OK ||
1363 2510 : PQtransactionStatus(entry->conn) != PQTRANS_IDLE ||
1364 2510 : entry->changing_xact_state ||
1365 2510 : entry->invalidated ||
1366 2506 : !entry->keep_connections)
1367 : {
1368 8 : elog(DEBUG3, "discarding connection %p", entry->conn);
1369 8 : disconnect_pg_server(entry);
1370 : }
1371 : }
1372 : else
1373 : {
1374 : /* Reset state to show we're out of a subtransaction */
1375 28 : entry->xact_depth--;
1376 : }
1377 2540 : }
1378 :
1379 : /*
1380 : * Cancel the currently-in-progress query (whose query text we do not have)
1381 : * and ignore the result. Returns true if we successfully cancel the query
1382 : * and discard any pending result, and false if not.
1383 : *
1384 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1385 : * recursion trouble, we'll end up slamming the connection shut, which will
1386 : * necessitate failing the entire toplevel transaction even if subtransactions
1387 : * were used. Try to use WARNING where we can.
1388 : *
1389 : * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
1390 : * query text from the pendingAreq saved in the per-connection state, then
1391 : * report the query using it.
1392 : */
1393 : static bool
1394 4 : pgfdw_cancel_query(PGconn *conn)
1395 : {
1396 4 : TimestampTz now = GetCurrentTimestamp();
1397 : TimestampTz endtime;
1398 : TimestampTz retrycanceltime;
1399 :
1400 : /*
1401 : * If it takes too long to cancel the query and discard the result, assume
1402 : * the connection is dead.
1403 : */
1404 4 : endtime = TimestampTzPlusMilliseconds(now, CONNECTION_CLEANUP_TIMEOUT);
1405 :
1406 : /*
1407 : * Also, lose patience and re-issue the cancel request after a little bit.
1408 : * (This serves to close some race conditions.)
1409 : */
1410 4 : retrycanceltime = TimestampTzPlusMilliseconds(now, RETRY_CANCEL_TIMEOUT);
1411 :
1412 4 : if (!pgfdw_cancel_query_begin(conn, endtime))
1413 0 : return false;
1414 4 : return pgfdw_cancel_query_end(conn, endtime, retrycanceltime, false);
1415 : }
1416 :
1417 : /*
1418 : * Submit a cancel request to the given connection, waiting only until
1419 : * the given time.
1420 : *
1421 : * We sleep interruptibly until we receive confirmation that the cancel
1422 : * request has been accepted, and if it is, return true; if the timeout
1423 : * lapses without that, or the request fails for whatever reason, return
1424 : * false.
1425 : */
1426 : static bool
1427 4 : pgfdw_cancel_query_begin(PGconn *conn, TimestampTz endtime)
1428 : {
1429 4 : const char *errormsg = libpqsrv_cancel(conn, endtime);
1430 :
1431 4 : if (errormsg != NULL)
1432 0 : ereport(WARNING,
1433 : errcode(ERRCODE_CONNECTION_FAILURE),
1434 : errmsg("could not send cancel request: %s", errormsg));
1435 :
1436 4 : return errormsg == NULL;
1437 : }
1438 :
1439 : static bool
1440 4 : pgfdw_cancel_query_end(PGconn *conn, TimestampTz endtime,
1441 : TimestampTz retrycanceltime, bool consume_input)
1442 : {
1443 : PGresult *result;
1444 : bool timed_out;
1445 :
1446 : /*
1447 : * If requested, consume whatever data is available from the socket. (Note
1448 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1449 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1450 : * which would be large compared to the overhead of PQconsumeInput.)
1451 : */
1452 4 : if (consume_input && !PQconsumeInput(conn))
1453 : {
1454 0 : ereport(WARNING,
1455 : (errcode(ERRCODE_CONNECTION_FAILURE),
1456 : errmsg("could not get result of cancel request: %s",
1457 : pchomp(PQerrorMessage(conn)))));
1458 0 : return false;
1459 : }
1460 :
1461 : /* Get and discard the result of the query. */
1462 4 : if (pgfdw_get_cleanup_result(conn, endtime, retrycanceltime,
1463 : &result, &timed_out))
1464 : {
1465 0 : if (timed_out)
1466 0 : ereport(WARNING,
1467 : (errmsg("could not get result of cancel request due to timeout")));
1468 : else
1469 0 : ereport(WARNING,
1470 : (errcode(ERRCODE_CONNECTION_FAILURE),
1471 : errmsg("could not get result of cancel request: %s",
1472 : pchomp(PQerrorMessage(conn)))));
1473 :
1474 0 : return false;
1475 : }
1476 4 : PQclear(result);
1477 :
1478 4 : return true;
1479 : }
1480 :
1481 : /*
1482 : * Submit a query during (sub)abort cleanup and wait up to 30 seconds for the
1483 : * result. If the query is executed without error, the return value is true.
1484 : * If the query is executed successfully but returns an error, the return
1485 : * value is true if and only if ignore_errors is set. If the query can't be
1486 : * sent or times out, the return value is false.
1487 : *
1488 : * It's not a huge problem if we throw an ERROR here, but if we get into error
1489 : * recursion trouble, we'll end up slamming the connection shut, which will
1490 : * necessitate failing the entire toplevel transaction even if subtransactions
1491 : * were used. Try to use WARNING where we can.
1492 : */
1493 : static bool
1494 118 : pgfdw_exec_cleanup_query(PGconn *conn, const char *query, bool ignore_errors)
1495 : {
1496 : TimestampTz endtime;
1497 :
1498 : /*
1499 : * If it takes too long to execute a cleanup query, assume the connection
1500 : * is dead. It's fairly likely that this is why we aborted in the first
1501 : * place (e.g. statement timeout, user cancel), so the timeout shouldn't
1502 : * be too long.
1503 : */
1504 118 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1505 : CONNECTION_CLEANUP_TIMEOUT);
1506 :
1507 118 : if (!pgfdw_exec_cleanup_query_begin(conn, query))
1508 0 : return false;
1509 118 : return pgfdw_exec_cleanup_query_end(conn, query, endtime,
1510 : false, ignore_errors);
1511 : }
1512 :
1513 : static bool
1514 142 : pgfdw_exec_cleanup_query_begin(PGconn *conn, const char *query)
1515 : {
1516 : Assert(query != NULL);
1517 :
1518 : /*
1519 : * Submit a query. Since we don't use non-blocking mode, this also can
1520 : * block. But its risk is relatively small, so we ignore that for now.
1521 : */
1522 142 : if (!PQsendQuery(conn, query))
1523 : {
1524 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1525 0 : return false;
1526 : }
1527 :
1528 142 : return true;
1529 : }
1530 :
1531 : static bool
1532 142 : pgfdw_exec_cleanup_query_end(PGconn *conn, const char *query,
1533 : TimestampTz endtime, bool consume_input,
1534 : bool ignore_errors)
1535 : {
1536 : PGresult *result;
1537 : bool timed_out;
1538 :
1539 : Assert(query != NULL);
1540 :
1541 : /*
1542 : * If requested, consume whatever data is available from the socket. (Note
1543 : * that if all data is available, this allows pgfdw_get_cleanup_result to
1544 : * call PQgetResult without forcing the overhead of WaitLatchOrSocket,
1545 : * which would be large compared to the overhead of PQconsumeInput.)
1546 : */
1547 142 : if (consume_input && !PQconsumeInput(conn))
1548 : {
1549 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1550 0 : return false;
1551 : }
1552 :
1553 : /* Get the result of the query. */
1554 142 : if (pgfdw_get_cleanup_result(conn, endtime, endtime, &result, &timed_out))
1555 : {
1556 0 : if (timed_out)
1557 0 : ereport(WARNING,
1558 : (errmsg("could not get query result due to timeout"),
1559 : errcontext("remote SQL command: %s", query)));
1560 : else
1561 0 : pgfdw_report_error(WARNING, NULL, conn, false, query);
1562 :
1563 0 : return false;
1564 : }
1565 :
1566 : /* Issue a warning if not successful. */
1567 142 : if (PQresultStatus(result) != PGRES_COMMAND_OK)
1568 : {
1569 0 : pgfdw_report_error(WARNING, result, conn, true, query);
1570 0 : return ignore_errors;
1571 : }
1572 142 : PQclear(result);
1573 :
1574 142 : return true;
1575 : }
1576 :
1577 : /*
1578 : * Get, during abort cleanup, the result of a query that is in progress.
1579 : * This might be a query that is being interrupted by a cancel request or by
1580 : * transaction abort, or it might be a query that was initiated as part of
1581 : * transaction abort to get the remote side back to the appropriate state.
1582 : *
1583 : * endtime is the time at which we should give up and assume the remote side
1584 : * is dead. retrycanceltime is the time at which we should issue a fresh
1585 : * cancel request (pass the same value as endtime if this is not wanted).
1586 : *
1587 : * Returns true if the timeout expired or connection trouble occurred,
1588 : * false otherwise. Sets *result except in case of a true result.
1589 : * Sets *timed_out to true only when the timeout expired.
1590 : */
1591 : static bool
1592 146 : pgfdw_get_cleanup_result(PGconn *conn, TimestampTz endtime,
1593 : TimestampTz retrycanceltime,
1594 : PGresult **result,
1595 : bool *timed_out)
1596 : {
1597 146 : volatile bool failed = false;
1598 146 : PGresult *volatile last_res = NULL;
1599 :
1600 146 : *result = NULL;
1601 146 : *timed_out = false;
1602 :
1603 : /* In what follows, do not leak any PGresults on an error. */
1604 146 : PG_TRY();
1605 : {
1606 146 : int canceldelta = RETRY_CANCEL_TIMEOUT * 2;
1607 :
1608 : for (;;)
1609 160 : {
1610 : PGresult *res;
1611 :
1612 442 : while (PQisBusy(conn))
1613 : {
1614 : int wc;
1615 136 : TimestampTz now = GetCurrentTimestamp();
1616 : long cur_timeout;
1617 :
1618 : /* If timeout has expired, give up. */
1619 136 : if (now >= endtime)
1620 : {
1621 0 : *timed_out = true;
1622 0 : failed = true;
1623 0 : goto exit;
1624 : }
1625 :
1626 : /* If we need to re-issue the cancel request, do that. */
1627 136 : if (now >= retrycanceltime)
1628 : {
1629 : /* We ignore failure to issue the repeated request. */
1630 0 : (void) libpqsrv_cancel(conn, endtime);
1631 :
1632 : /* Recompute "now" in case that took measurable time. */
1633 0 : now = GetCurrentTimestamp();
1634 :
1635 : /* Adjust re-cancel timeout in increasing steps. */
1636 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
1637 : canceldelta);
1638 0 : canceldelta += canceldelta;
1639 : }
1640 :
1641 : /* If timeout has expired, give up, else get sleep time. */
1642 136 : cur_timeout = TimestampDifferenceMilliseconds(now,
1643 : Min(endtime,
1644 : retrycanceltime));
1645 136 : if (cur_timeout <= 0)
1646 : {
1647 0 : *timed_out = true;
1648 0 : failed = true;
1649 0 : goto exit;
1650 : }
1651 :
1652 : /* first time, allocate or get the custom wait event */
1653 136 : if (pgfdw_we_cleanup_result == 0)
1654 4 : pgfdw_we_cleanup_result = WaitEventExtensionNew("PostgresFdwCleanupResult");
1655 :
1656 : /* Sleep until there's something to do */
1657 136 : wc = WaitLatchOrSocket(MyLatch,
1658 : WL_LATCH_SET | WL_SOCKET_READABLE |
1659 : WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
1660 : PQsocket(conn),
1661 : cur_timeout, pgfdw_we_cleanup_result);
1662 136 : ResetLatch(MyLatch);
1663 :
1664 136 : CHECK_FOR_INTERRUPTS();
1665 :
1666 : /* Data available in socket? */
1667 136 : if (wc & WL_SOCKET_READABLE)
1668 : {
1669 136 : if (!PQconsumeInput(conn))
1670 : {
1671 : /* connection trouble */
1672 0 : failed = true;
1673 0 : goto exit;
1674 : }
1675 : }
1676 : }
1677 :
1678 306 : res = PQgetResult(conn);
1679 306 : if (res == NULL)
1680 146 : break; /* query is complete */
1681 :
1682 160 : PQclear(last_res);
1683 160 : last_res = res;
1684 : }
1685 146 : exit: ;
1686 : }
1687 0 : PG_CATCH();
1688 : {
1689 0 : PQclear(last_res);
1690 0 : PG_RE_THROW();
1691 : }
1692 146 : PG_END_TRY();
1693 :
1694 146 : if (failed)
1695 0 : PQclear(last_res);
1696 : else
1697 146 : *result = last_res;
1698 146 : return failed;
1699 : }
1700 :
1701 : /*
1702 : * Abort remote transaction or subtransaction.
1703 : *
1704 : * "toplevel" should be set to true if toplevel (main) transaction is
1705 : * rollbacked, false otherwise.
1706 : *
1707 : * Set entry->changing_xact_state to false on success, true on failure.
1708 : */
1709 : static void
1710 86 : pgfdw_abort_cleanup(ConnCacheEntry *entry, bool toplevel)
1711 : {
1712 : char sql[100];
1713 :
1714 : /*
1715 : * Don't try to clean up the connection if we're already in error
1716 : * recursion trouble.
1717 : */
1718 86 : if (in_error_recursion_trouble())
1719 0 : entry->changing_xact_state = true;
1720 :
1721 : /*
1722 : * If connection is already unsalvageable, don't touch it further.
1723 : */
1724 86 : if (entry->changing_xact_state)
1725 2 : return;
1726 :
1727 : /*
1728 : * Mark this connection as in the process of changing transaction state.
1729 : */
1730 84 : entry->changing_xact_state = true;
1731 :
1732 : /* Assume we might have lost track of prepared statements */
1733 84 : entry->have_error = true;
1734 :
1735 : /*
1736 : * If a command has been submitted to the remote server by using an
1737 : * asynchronous execution function, the command might not have yet
1738 : * completed. Check to see if a command is still being processed by the
1739 : * remote server, and if so, request cancellation of the command.
1740 : */
1741 84 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE &&
1742 4 : !pgfdw_cancel_query(entry->conn))
1743 0 : return; /* Unable to cancel running query */
1744 :
1745 84 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1746 84 : if (!pgfdw_exec_cleanup_query(entry->conn, sql, false))
1747 0 : return; /* Unable to abort remote (sub)transaction */
1748 :
1749 84 : if (toplevel)
1750 : {
1751 78 : if (entry->have_prep_stmt && entry->have_error &&
1752 34 : !pgfdw_exec_cleanup_query(entry->conn,
1753 : "DEALLOCATE ALL",
1754 : true))
1755 0 : return; /* Trouble clearing prepared statements */
1756 :
1757 78 : entry->have_prep_stmt = false;
1758 78 : entry->have_error = false;
1759 : }
1760 :
1761 : /*
1762 : * If pendingAreq of the per-connection state is not NULL, it means that
1763 : * an asynchronous fetch begun by fetch_more_data_begin() was not done
1764 : * successfully and thus the per-connection state was not reset in
1765 : * fetch_more_data(); in that case reset the per-connection state here.
1766 : */
1767 84 : if (entry->state.pendingAreq)
1768 2 : memset(&entry->state, 0, sizeof(entry->state));
1769 :
1770 : /* Disarm changing_xact_state if it all worked */
1771 84 : entry->changing_xact_state = false;
1772 : }
1773 :
1774 : /*
1775 : * Like pgfdw_abort_cleanup, submit an abort command or cancel request, but
1776 : * don't wait for the result.
1777 : *
1778 : * Returns true if the abort command or cancel request is successfully issued,
1779 : * false otherwise. If the abort command is successfully issued, the given
1780 : * connection cache entry is appended to *pending_entries. Otherwise, if the
1781 : * cancel request is successfully issued, it is appended to *cancel_requested.
1782 : */
1783 : static bool
1784 16 : pgfdw_abort_cleanup_begin(ConnCacheEntry *entry, bool toplevel,
1785 : List **pending_entries, List **cancel_requested)
1786 : {
1787 : /*
1788 : * Don't try to clean up the connection if we're already in error
1789 : * recursion trouble.
1790 : */
1791 16 : if (in_error_recursion_trouble())
1792 0 : entry->changing_xact_state = true;
1793 :
1794 : /*
1795 : * If connection is already unsalvageable, don't touch it further.
1796 : */
1797 16 : if (entry->changing_xact_state)
1798 0 : return false;
1799 :
1800 : /*
1801 : * Mark this connection as in the process of changing transaction state.
1802 : */
1803 16 : entry->changing_xact_state = true;
1804 :
1805 : /* Assume we might have lost track of prepared statements */
1806 16 : entry->have_error = true;
1807 :
1808 : /*
1809 : * If a command has been submitted to the remote server by using an
1810 : * asynchronous execution function, the command might not have yet
1811 : * completed. Check to see if a command is still being processed by the
1812 : * remote server, and if so, request cancellation of the command.
1813 : */
1814 16 : if (PQtransactionStatus(entry->conn) == PQTRANS_ACTIVE)
1815 : {
1816 : TimestampTz endtime;
1817 :
1818 0 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1819 : CONNECTION_CLEANUP_TIMEOUT);
1820 0 : if (!pgfdw_cancel_query_begin(entry->conn, endtime))
1821 0 : return false; /* Unable to cancel running query */
1822 0 : *cancel_requested = lappend(*cancel_requested, entry);
1823 : }
1824 : else
1825 : {
1826 : char sql[100];
1827 :
1828 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1829 16 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1830 0 : return false; /* Unable to abort remote transaction */
1831 16 : *pending_entries = lappend(*pending_entries, entry);
1832 : }
1833 :
1834 16 : return true;
1835 : }
1836 :
1837 : /*
1838 : * Finish pre-commit cleanup of connections on each of which we've sent a
1839 : * COMMIT command to the remote server.
1840 : */
1841 : static void
1842 26 : pgfdw_finish_pre_commit_cleanup(List *pending_entries)
1843 : {
1844 : ConnCacheEntry *entry;
1845 26 : List *pending_deallocs = NIL;
1846 : ListCell *lc;
1847 :
1848 : Assert(pending_entries);
1849 :
1850 : /*
1851 : * Get the result of the COMMIT command for each of the pending entries
1852 : */
1853 58 : foreach(lc, pending_entries)
1854 : {
1855 32 : entry = (ConnCacheEntry *) lfirst(lc);
1856 :
1857 : Assert(entry->changing_xact_state);
1858 :
1859 : /*
1860 : * We might already have received the result on the socket, so pass
1861 : * consume_input=true to try to consume it first
1862 : */
1863 32 : do_sql_command_end(entry->conn, "COMMIT TRANSACTION", true);
1864 32 : entry->changing_xact_state = false;
1865 :
1866 : /* Do a DEALLOCATE ALL in parallel if needed */
1867 32 : if (entry->have_prep_stmt && entry->have_error)
1868 : {
1869 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1870 4 : if (PQsendQuery(entry->conn, "DEALLOCATE ALL"))
1871 : {
1872 4 : pending_deallocs = lappend(pending_deallocs, entry);
1873 4 : continue;
1874 : }
1875 : }
1876 28 : entry->have_prep_stmt = false;
1877 28 : entry->have_error = false;
1878 :
1879 28 : pgfdw_reset_xact_state(entry, true);
1880 : }
1881 :
1882 : /* No further work if no pending entries */
1883 26 : if (!pending_deallocs)
1884 24 : return;
1885 :
1886 : /*
1887 : * Get the result of the DEALLOCATE command for each of the pending
1888 : * entries
1889 : */
1890 6 : foreach(lc, pending_deallocs)
1891 : {
1892 : PGresult *res;
1893 :
1894 4 : entry = (ConnCacheEntry *) lfirst(lc);
1895 :
1896 : /* Ignore errors (see notes in pgfdw_xact_callback) */
1897 8 : while ((res = PQgetResult(entry->conn)) != NULL)
1898 : {
1899 4 : PQclear(res);
1900 : /* Stop if the connection is lost (else we'll loop infinitely) */
1901 4 : if (PQstatus(entry->conn) == CONNECTION_BAD)
1902 0 : break;
1903 : }
1904 4 : entry->have_prep_stmt = false;
1905 4 : entry->have_error = false;
1906 :
1907 4 : pgfdw_reset_xact_state(entry, true);
1908 : }
1909 : }
1910 :
1911 : /*
1912 : * Finish pre-subcommit cleanup of connections on each of which we've sent a
1913 : * RELEASE command to the remote server.
1914 : */
1915 : static void
1916 2 : pgfdw_finish_pre_subcommit_cleanup(List *pending_entries, int curlevel)
1917 : {
1918 : ConnCacheEntry *entry;
1919 : char sql[100];
1920 : ListCell *lc;
1921 :
1922 : Assert(pending_entries);
1923 :
1924 : /*
1925 : * Get the result of the RELEASE command for each of the pending entries
1926 : */
1927 2 : snprintf(sql, sizeof(sql), "RELEASE SAVEPOINT s%d", curlevel);
1928 6 : foreach(lc, pending_entries)
1929 : {
1930 4 : entry = (ConnCacheEntry *) lfirst(lc);
1931 :
1932 : Assert(entry->changing_xact_state);
1933 :
1934 : /*
1935 : * We might already have received the result on the socket, so pass
1936 : * consume_input=true to try to consume it first
1937 : */
1938 4 : do_sql_command_end(entry->conn, sql, true);
1939 4 : entry->changing_xact_state = false;
1940 :
1941 4 : pgfdw_reset_xact_state(entry, false);
1942 : }
1943 2 : }
1944 :
1945 : /*
1946 : * Finish abort cleanup of connections on each of which we've sent an abort
1947 : * command or cancel request to the remote server.
1948 : */
1949 : static void
1950 8 : pgfdw_finish_abort_cleanup(List *pending_entries, List *cancel_requested,
1951 : bool toplevel)
1952 : {
1953 8 : List *pending_deallocs = NIL;
1954 : ListCell *lc;
1955 :
1956 : /*
1957 : * For each of the pending cancel requests (if any), get and discard the
1958 : * result of the query, and submit an abort command to the remote server.
1959 : */
1960 8 : if (cancel_requested)
1961 : {
1962 0 : foreach(lc, cancel_requested)
1963 : {
1964 0 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
1965 0 : TimestampTz now = GetCurrentTimestamp();
1966 : TimestampTz endtime;
1967 : TimestampTz retrycanceltime;
1968 : char sql[100];
1969 :
1970 : Assert(entry->changing_xact_state);
1971 :
1972 : /*
1973 : * Set end time. You might think we should do this before issuing
1974 : * cancel request like in normal mode, but that is problematic,
1975 : * because if, for example, it took longer than 30 seconds to
1976 : * process the first few entries in the cancel_requested list, it
1977 : * would cause a timeout error when processing each of the
1978 : * remaining entries in the list, leading to slamming that entry's
1979 : * connection shut.
1980 : */
1981 0 : endtime = TimestampTzPlusMilliseconds(now,
1982 : CONNECTION_CLEANUP_TIMEOUT);
1983 0 : retrycanceltime = TimestampTzPlusMilliseconds(now,
1984 : RETRY_CANCEL_TIMEOUT);
1985 :
1986 0 : if (!pgfdw_cancel_query_end(entry->conn, endtime,
1987 : retrycanceltime, true))
1988 : {
1989 : /* Unable to cancel running query */
1990 0 : pgfdw_reset_xact_state(entry, toplevel);
1991 0 : continue;
1992 : }
1993 :
1994 : /* Send an abort command in parallel if needed */
1995 0 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
1996 0 : if (!pgfdw_exec_cleanup_query_begin(entry->conn, sql))
1997 : {
1998 : /* Unable to abort remote (sub)transaction */
1999 0 : pgfdw_reset_xact_state(entry, toplevel);
2000 : }
2001 : else
2002 0 : pending_entries = lappend(pending_entries, entry);
2003 : }
2004 : }
2005 :
2006 : /* No further work if no pending entries */
2007 8 : if (!pending_entries)
2008 0 : return;
2009 :
2010 : /*
2011 : * Get the result of the abort command for each of the pending entries
2012 : */
2013 24 : foreach(lc, pending_entries)
2014 : {
2015 16 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2016 : TimestampTz endtime;
2017 : char sql[100];
2018 :
2019 : Assert(entry->changing_xact_state);
2020 :
2021 : /*
2022 : * Set end time. We do this now, not before issuing the command like
2023 : * in normal mode, for the same reason as for the cancel_requested
2024 : * entries.
2025 : */
2026 16 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2027 : CONNECTION_CLEANUP_TIMEOUT);
2028 :
2029 16 : CONSTRUCT_ABORT_COMMAND(sql, entry, toplevel);
2030 16 : if (!pgfdw_exec_cleanup_query_end(entry->conn, sql, endtime,
2031 : true, false))
2032 : {
2033 : /* Unable to abort remote (sub)transaction */
2034 0 : pgfdw_reset_xact_state(entry, toplevel);
2035 8 : continue;
2036 : }
2037 :
2038 16 : if (toplevel)
2039 : {
2040 : /* Do a DEALLOCATE ALL in parallel if needed */
2041 8 : if (entry->have_prep_stmt && entry->have_error)
2042 : {
2043 8 : if (!pgfdw_exec_cleanup_query_begin(entry->conn,
2044 : "DEALLOCATE ALL"))
2045 : {
2046 : /* Trouble clearing prepared statements */
2047 0 : pgfdw_reset_xact_state(entry, toplevel);
2048 : }
2049 : else
2050 8 : pending_deallocs = lappend(pending_deallocs, entry);
2051 8 : continue;
2052 : }
2053 0 : entry->have_prep_stmt = false;
2054 0 : entry->have_error = false;
2055 : }
2056 :
2057 : /* Reset the per-connection state if needed */
2058 8 : if (entry->state.pendingAreq)
2059 0 : memset(&entry->state, 0, sizeof(entry->state));
2060 :
2061 : /* We're done with this entry; unset the changing_xact_state flag */
2062 8 : entry->changing_xact_state = false;
2063 8 : pgfdw_reset_xact_state(entry, toplevel);
2064 : }
2065 :
2066 : /* No further work if no pending entries */
2067 8 : if (!pending_deallocs)
2068 4 : return;
2069 : Assert(toplevel);
2070 :
2071 : /*
2072 : * Get the result of the DEALLOCATE command for each of the pending
2073 : * entries
2074 : */
2075 12 : foreach(lc, pending_deallocs)
2076 : {
2077 8 : ConnCacheEntry *entry = (ConnCacheEntry *) lfirst(lc);
2078 : TimestampTz endtime;
2079 :
2080 : Assert(entry->changing_xact_state);
2081 : Assert(entry->have_prep_stmt);
2082 : Assert(entry->have_error);
2083 :
2084 : /*
2085 : * Set end time. We do this now, not before issuing the command like
2086 : * in normal mode, for the same reason as for the cancel_requested
2087 : * entries.
2088 : */
2089 8 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
2090 : CONNECTION_CLEANUP_TIMEOUT);
2091 :
2092 8 : if (!pgfdw_exec_cleanup_query_end(entry->conn, "DEALLOCATE ALL",
2093 : endtime, true, true))
2094 : {
2095 : /* Trouble clearing prepared statements */
2096 0 : pgfdw_reset_xact_state(entry, toplevel);
2097 0 : continue;
2098 : }
2099 8 : entry->have_prep_stmt = false;
2100 8 : entry->have_error = false;
2101 :
2102 : /* Reset the per-connection state if needed */
2103 8 : if (entry->state.pendingAreq)
2104 0 : memset(&entry->state, 0, sizeof(entry->state));
2105 :
2106 : /* We're done with this entry; unset the changing_xact_state flag */
2107 8 : entry->changing_xact_state = false;
2108 8 : pgfdw_reset_xact_state(entry, toplevel);
2109 : }
2110 : }
2111 :
2112 : /* Number of output arguments (columns) for various API versions */
2113 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1 2
2114 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2 5
2115 : #define POSTGRES_FDW_GET_CONNECTIONS_COLS 5 /* maximum of above */
2116 :
2117 : /*
2118 : * Internal function used by postgres_fdw_get_connections variants.
2119 : *
2120 : * For API version 1.1, this function takes no input parameter and
2121 : * returns a set of records with the following values:
2122 : *
2123 : * - server_name - server name of active connection. In case the foreign server
2124 : * is dropped but still the connection is active, then the server name will
2125 : * be NULL in output.
2126 : * - valid - true/false representing whether the connection is valid or not.
2127 : * Note that connections can become invalid in pgfdw_inval_callback.
2128 : *
2129 : * For API version 1.2 and later, this function takes an input parameter
2130 : * to check a connection status and returns the following
2131 : * additional values along with the three values from version 1.1:
2132 : *
2133 : * - user_name - the local user name of the active connection. In case the
2134 : * user mapping is dropped but the connection is still active, then the
2135 : * user name will be NULL in the output.
2136 : * - used_in_xact - true if the connection is used in the current transaction.
2137 : * - closed - true if the connection is closed.
2138 : *
2139 : * No records are returned when there are no cached connections at all.
2140 : */
2141 : static void
2142 26 : postgres_fdw_get_connections_internal(FunctionCallInfo fcinfo,
2143 : enum pgfdwVersion api_version)
2144 : {
2145 26 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
2146 : HASH_SEQ_STATUS scan;
2147 : ConnCacheEntry *entry;
2148 :
2149 26 : InitMaterializedSRF(fcinfo, 0);
2150 :
2151 : /* If cache doesn't exist, we return no records */
2152 26 : if (!ConnectionHash)
2153 0 : return;
2154 :
2155 : /* Check we have the expected number of output arguments */
2156 26 : switch (rsinfo->setDesc->natts)
2157 : {
2158 0 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_1:
2159 0 : if (api_version != PGFDW_V1_1)
2160 0 : elog(ERROR, "incorrect number of output arguments");
2161 0 : break;
2162 26 : case POSTGRES_FDW_GET_CONNECTIONS_COLS_V1_2:
2163 26 : if (api_version != PGFDW_V1_2)
2164 0 : elog(ERROR, "incorrect number of output arguments");
2165 26 : break;
2166 0 : default:
2167 0 : elog(ERROR, "incorrect number of output arguments");
2168 : }
2169 :
2170 26 : hash_seq_init(&scan, ConnectionHash);
2171 226 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2172 : {
2173 : ForeignServer *server;
2174 200 : Datum values[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2175 200 : bool nulls[POSTGRES_FDW_GET_CONNECTIONS_COLS] = {0};
2176 200 : int i = 0;
2177 :
2178 : /* We only look for open remote connections */
2179 200 : if (!entry->conn)
2180 174 : continue;
2181 :
2182 26 : server = GetForeignServerExtended(entry->serverid, FSV_MISSING_OK);
2183 :
2184 : /*
2185 : * The foreign server may have been dropped in current explicit
2186 : * transaction. It is not possible to drop the server from another
2187 : * session when the connection associated with it is in use in the
2188 : * current transaction, if tried so, the drop query in another session
2189 : * blocks until the current transaction finishes.
2190 : *
2191 : * Even though the server is dropped in the current transaction, the
2192 : * cache can still have associated active connection entry, say we
2193 : * call such connections dangling. Since we can not fetch the server
2194 : * name from system catalogs for dangling connections, instead we show
2195 : * NULL value for server name in output.
2196 : *
2197 : * We could have done better by storing the server name in the cache
2198 : * entry instead of server oid so that it could be used in the output.
2199 : * But the server name in each cache entry requires 64 bytes of
2200 : * memory, which is huge, when there are many cached connections and
2201 : * the use case i.e. dropping the foreign server within the explicit
2202 : * current transaction seems rare. So, we chose to show NULL value for
2203 : * server name in output.
2204 : *
2205 : * Such dangling connections get closed either in next use or at the
2206 : * end of current explicit transaction in pgfdw_xact_callback.
2207 : */
2208 26 : if (!server)
2209 : {
2210 : /*
2211 : * If the server has been dropped in the current explicit
2212 : * transaction, then this entry would have been invalidated in
2213 : * pgfdw_inval_callback at the end of drop server command. Note
2214 : * that this connection would not have been closed in
2215 : * pgfdw_inval_callback because it is still being used in the
2216 : * current explicit transaction. So, assert that here.
2217 : */
2218 : Assert(entry->conn && entry->xact_depth > 0 && entry->invalidated);
2219 :
2220 : /* Show null, if no server name was found */
2221 2 : nulls[i++] = true;
2222 : }
2223 : else
2224 24 : values[i++] = CStringGetTextDatum(server->servername);
2225 :
2226 26 : if (api_version >= PGFDW_V1_2)
2227 : {
2228 : HeapTuple tp;
2229 :
2230 : /* Use the system cache to obtain the user mapping */
2231 26 : tp = SearchSysCache1(USERMAPPINGOID, ObjectIdGetDatum(entry->key));
2232 :
2233 : /*
2234 : * Just like in the foreign server case, user mappings can also be
2235 : * dropped in the current explicit transaction. Therefore, the
2236 : * similar check as in the server case is required.
2237 : */
2238 26 : if (!HeapTupleIsValid(tp))
2239 : {
2240 : /*
2241 : * If we reach here, this entry must have been invalidated in
2242 : * pgfdw_inval_callback, same as in the server case.
2243 : */
2244 : Assert(entry->conn && entry->xact_depth > 0 &&
2245 : entry->invalidated);
2246 :
2247 2 : nulls[i++] = true;
2248 : }
2249 : else
2250 : {
2251 : Oid userid;
2252 :
2253 24 : userid = ((Form_pg_user_mapping) GETSTRUCT(tp))->umuser;
2254 24 : values[i++] = CStringGetTextDatum(MappingUserName(userid));
2255 24 : ReleaseSysCache(tp);
2256 : }
2257 : }
2258 :
2259 26 : values[i++] = BoolGetDatum(!entry->invalidated);
2260 :
2261 26 : if (api_version >= PGFDW_V1_2)
2262 : {
2263 26 : bool check_conn = PG_GETARG_BOOL(0);
2264 :
2265 : /* Is this connection used in the current transaction? */
2266 26 : values[i++] = BoolGetDatum(entry->xact_depth > 0);
2267 :
2268 : /*
2269 : * If a connection status check is requested and supported, return
2270 : * whether the connection is closed. Otherwise, return NULL.
2271 : */
2272 26 : if (check_conn && pgfdw_conn_checkable())
2273 4 : values[i++] = BoolGetDatum(pgfdw_conn_check(entry->conn) != 0);
2274 : else
2275 22 : nulls[i++] = true;
2276 : }
2277 :
2278 26 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
2279 : }
2280 : }
2281 :
2282 : /*
2283 : * List active foreign server connections.
2284 : *
2285 : * The SQL API of this function has changed multiple times, and will likely
2286 : * do so again in future. To support the case where a newer version of this
2287 : * loadable module is being used with an old SQL declaration of the function,
2288 : * we continue to support the older API versions.
2289 : */
2290 : Datum
2291 26 : postgres_fdw_get_connections_1_2(PG_FUNCTION_ARGS)
2292 : {
2293 26 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_2);
2294 :
2295 26 : PG_RETURN_VOID();
2296 : }
2297 :
2298 : Datum
2299 0 : postgres_fdw_get_connections(PG_FUNCTION_ARGS)
2300 : {
2301 0 : postgres_fdw_get_connections_internal(fcinfo, PGFDW_V1_1);
2302 :
2303 0 : PG_RETURN_VOID();
2304 : }
2305 :
2306 : /*
2307 : * Disconnect the specified cached connections.
2308 : *
2309 : * This function discards the open connections that are established by
2310 : * postgres_fdw from the local session to the foreign server with
2311 : * the given name. Note that there can be multiple connections to
2312 : * the given server using different user mappings. If the connections
2313 : * are used in the current local transaction, they are not disconnected
2314 : * and warning messages are reported. This function returns true
2315 : * if it disconnects at least one connection, otherwise false. If no
2316 : * foreign server with the given name is found, an error is reported.
2317 : */
2318 : Datum
2319 8 : postgres_fdw_disconnect(PG_FUNCTION_ARGS)
2320 : {
2321 : ForeignServer *server;
2322 : char *servername;
2323 :
2324 8 : servername = text_to_cstring(PG_GETARG_TEXT_PP(0));
2325 8 : server = GetForeignServerByName(servername, false);
2326 :
2327 6 : PG_RETURN_BOOL(disconnect_cached_connections(server->serverid));
2328 : }
2329 :
2330 : /*
2331 : * Disconnect all the cached connections.
2332 : *
2333 : * This function discards all the open connections that are established by
2334 : * postgres_fdw from the local session to the foreign servers.
2335 : * If the connections are used in the current local transaction, they are
2336 : * not disconnected and warning messages are reported. This function
2337 : * returns true if it disconnects at least one connection, otherwise false.
2338 : */
2339 : Datum
2340 10 : postgres_fdw_disconnect_all(PG_FUNCTION_ARGS)
2341 : {
2342 10 : PG_RETURN_BOOL(disconnect_cached_connections(InvalidOid));
2343 : }
2344 :
2345 : /*
2346 : * Workhorse to disconnect cached connections.
2347 : *
2348 : * This function scans all the connection cache entries and disconnects
2349 : * the open connections whose foreign server OID matches with
2350 : * the specified one. If InvalidOid is specified, it disconnects all
2351 : * the cached connections.
2352 : *
2353 : * This function emits a warning for each connection that's used in
2354 : * the current transaction and doesn't close it. It returns true if
2355 : * it disconnects at least one connection, otherwise false.
2356 : *
2357 : * Note that this function disconnects even the connections that are
2358 : * established by other users in the same local session using different
2359 : * user mappings. This leads even non-superuser to be able to close
2360 : * the connections established by superusers in the same local session.
2361 : *
2362 : * XXX As of now we don't see any security risk doing this. But we should
2363 : * set some restrictions on that, for example, prevent non-superuser
2364 : * from closing the connections established by superusers even
2365 : * in the same session?
2366 : */
2367 : static bool
2368 16 : disconnect_cached_connections(Oid serverid)
2369 : {
2370 : HASH_SEQ_STATUS scan;
2371 : ConnCacheEntry *entry;
2372 16 : bool all = !OidIsValid(serverid);
2373 16 : bool result = false;
2374 :
2375 : /*
2376 : * Connection cache hashtable has not been initialized yet in this
2377 : * session, so return false.
2378 : */
2379 16 : if (!ConnectionHash)
2380 0 : return false;
2381 :
2382 16 : hash_seq_init(&scan, ConnectionHash);
2383 134 : while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
2384 : {
2385 : /* Ignore cache entry if no open connection right now. */
2386 118 : if (!entry->conn)
2387 96 : continue;
2388 :
2389 22 : if (all || entry->serverid == serverid)
2390 : {
2391 : /*
2392 : * Emit a warning because the connection to close is used in the
2393 : * current transaction and cannot be disconnected right now.
2394 : */
2395 16 : if (entry->xact_depth > 0)
2396 : {
2397 : ForeignServer *server;
2398 :
2399 6 : server = GetForeignServerExtended(entry->serverid,
2400 : FSV_MISSING_OK);
2401 :
2402 6 : if (!server)
2403 : {
2404 : /*
2405 : * If the foreign server was dropped while its connection
2406 : * was used in the current transaction, the connection
2407 : * must have been marked as invalid by
2408 : * pgfdw_inval_callback at the end of DROP SERVER command.
2409 : */
2410 : Assert(entry->invalidated);
2411 :
2412 0 : ereport(WARNING,
2413 : (errmsg("cannot close dropped server connection because it is still in use")));
2414 : }
2415 : else
2416 6 : ereport(WARNING,
2417 : (errmsg("cannot close connection for server \"%s\" because it is still in use",
2418 : server->servername)));
2419 : }
2420 : else
2421 : {
2422 10 : elog(DEBUG3, "discarding connection %p", entry->conn);
2423 10 : disconnect_pg_server(entry);
2424 10 : result = true;
2425 : }
2426 : }
2427 : }
2428 :
2429 16 : return result;
2430 : }
2431 :
2432 : /*
2433 : * Check if the remote server closed the connection.
2434 : *
2435 : * Returns 1 if the connection is closed, -1 if an error occurred,
2436 : * and 0 if it's not closed or if the connection check is unavailable
2437 : * on this platform.
2438 : */
2439 : static int
2440 4 : pgfdw_conn_check(PGconn *conn)
2441 : {
2442 4 : int sock = PQsocket(conn);
2443 :
2444 4 : if (PQstatus(conn) != CONNECTION_OK || sock == -1)
2445 0 : return -1;
2446 :
2447 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2448 : {
2449 : struct pollfd input_fd;
2450 : int result;
2451 :
2452 4 : input_fd.fd = sock;
2453 4 : input_fd.events = POLLRDHUP;
2454 4 : input_fd.revents = 0;
2455 :
2456 : do
2457 4 : result = poll(&input_fd, 1, 0);
2458 4 : while (result < 0 && errno == EINTR);
2459 :
2460 4 : if (result < 0)
2461 0 : return -1;
2462 :
2463 4 : return (input_fd.revents &
2464 4 : (POLLRDHUP | POLLHUP | POLLERR | POLLNVAL)) ? 1 : 0;
2465 : }
2466 : #else
2467 : return 0;
2468 : #endif
2469 : }
2470 :
2471 : /*
2472 : * Check if connection status checking is available on this platform.
2473 : *
2474 : * Returns true if available, false otherwise.
2475 : */
2476 : static bool
2477 4 : pgfdw_conn_checkable(void)
2478 : {
2479 : #if (defined(HAVE_POLL) && defined(POLLRDHUP))
2480 4 : return true;
2481 : #else
2482 : return false;
2483 : #endif
2484 : }
|