Line data Source code
1 : /*
2 : * dblink.c
3 : *
4 : * Functions returning results from a remote database
5 : *
6 : * Joe Conway <mail@joeconway.com>
7 : * And contributors:
8 : * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 : * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 : *
11 : * contrib/dblink/dblink.c
12 : * Copyright (c) 2001-2026, PostgreSQL Global Development Group
13 : * ALL RIGHTS RESERVED;
14 : *
15 : * Permission to use, copy, modify, and distribute this software and its
16 : * documentation for any purpose, without fee, and without a written agreement
17 : * is hereby granted, provided that the above copyright notice and this
18 : * paragraph and the following two paragraphs appear in all copies.
19 : *
20 : * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 : * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 : * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 : * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 : * POSSIBILITY OF SUCH DAMAGE.
25 : *
26 : * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 : * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 : * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 : * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 : * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 : *
32 : */
33 : #include "postgres.h"
34 :
35 : #include <limits.h>
36 :
37 : #include "access/htup_details.h"
38 : #include "access/relation.h"
39 : #include "access/reloptions.h"
40 : #include "access/table.h"
41 : #include "catalog/namespace.h"
42 : #include "catalog/pg_foreign_data_wrapper.h"
43 : #include "catalog/pg_foreign_server.h"
44 : #include "catalog/pg_type.h"
45 : #include "catalog/pg_user_mapping.h"
46 : #include "commands/defrem.h"
47 : #include "common/base64.h"
48 : #include "executor/spi.h"
49 : #include "foreign/foreign.h"
50 : #include "funcapi.h"
51 : #include "lib/stringinfo.h"
52 : #include "libpq-fe.h"
53 : #include "libpq/libpq-be.h"
54 : #include "libpq/libpq-be-fe-helpers.h"
55 : #include "mb/pg_wchar.h"
56 : #include "miscadmin.h"
57 : #include "parser/scansup.h"
58 : #include "utils/acl.h"
59 : #include "utils/builtins.h"
60 : #include "utils/fmgroids.h"
61 : #include "utils/guc.h"
62 : #include "utils/hsearch.h"
63 : #include "utils/lsyscache.h"
64 : #include "utils/memutils.h"
65 : #include "utils/rel.h"
66 : #include "utils/tuplestore.h"
67 : #include "utils/varlena.h"
68 : #include "utils/wait_event.h"
69 :
70 17 : PG_MODULE_MAGIC_EXT(
71 : .name = "dblink",
72 : .version = PG_VERSION
73 : );
74 :
75 : typedef struct remoteConn
76 : {
77 : PGconn *conn; /* Hold the remote connection */
78 : int openCursorCount; /* The number of open cursors */
79 : bool newXactForCursor; /* Opened a transaction for a cursor */
80 : } remoteConn;
81 :
82 : typedef struct storeInfo
83 : {
84 : FunctionCallInfo fcinfo;
85 : Tuplestorestate *tuplestore;
86 : AttInMetadata *attinmeta;
87 : MemoryContext tmpcontext;
88 : char **cstrs;
89 : /* temp storage for results to avoid leaks on exception */
90 : PGresult *last_res;
91 : PGresult *cur_res;
92 : } storeInfo;
93 :
94 : /*
95 : * Internal declarations
96 : */
97 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
98 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
99 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
100 : PGresult *res);
101 : static void materializeQueryResult(FunctionCallInfo fcinfo,
102 : PGconn *conn,
103 : const char *conname,
104 : const char *sql,
105 : bool fail);
106 : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
107 : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
108 : static remoteConn *getConnectionByName(const char *name);
109 : static HTAB *createConnHash(void);
110 : static remoteConn *createNewConnection(const char *name);
111 : static void deleteConnection(const char *name);
112 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
113 : static char **get_text_array_contents(ArrayType *array, int *numitems);
114 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
115 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
116 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
117 : static char *quote_ident_cstr(char *rawstr);
118 : static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
119 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
120 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
121 : static char *generate_relation_name(Relation rel);
122 : static void dblink_connstr_check(const char *connstr);
123 : static bool dblink_connstr_has_pw(const char *connstr);
124 : static void dblink_security_check(PGconn *conn, const char *connname,
125 : const char *connstr);
126 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
127 : bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
128 : static char *get_connect_string(const char *servername);
129 : static char *escape_param_str(const char *str);
130 : static void validate_pkattnums(Relation rel,
131 : int2vector *pkattnums_arg, int32 pknumatts_arg,
132 : int **pkattnums, int *pknumatts);
133 : static bool is_valid_dblink_option(const PQconninfoOption *options,
134 : const char *option, Oid context);
135 : static int applyRemoteGucs(PGconn *conn);
136 : static void restoreLocalGucs(int nestlevel);
137 : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
138 : static void appendSCRAMKeysInfo(StringInfo buf);
139 : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
140 : Oid context);
141 : static bool dblink_connstr_has_required_scram_options(const char *connstr);
142 :
143 : /* Global */
144 : static remoteConn *pconn = NULL;
145 : static HTAB *remoteConnHash = NULL;
146 :
147 : /* custom wait event values, retrieved from shared memory */
148 : static uint32 dblink_we_connect = 0;
149 : static uint32 dblink_we_get_conn = 0;
150 : static uint32 dblink_we_get_result = 0;
151 :
152 : /*
153 : * Following is hash that holds multiple remote connections.
154 : * Calling convention of each dblink function changes to accept
155 : * connection name as the first parameter. The connection hash is
156 : * much like ecpg e.g. a mapping between a name and a PGconn object.
157 : *
158 : * To avoid potentially leaking a PGconn object in case of out-of-memory
159 : * errors, we first create the hash entry, then open the PGconn.
160 : * Hence, a hash entry whose rconn.conn pointer is NULL must be
161 : * understood as a leftover from a failed create; it should be ignored
162 : * by lookup operations, and silently replaced by create operations.
163 : */
164 :
165 : typedef struct remoteConnHashEnt
166 : {
167 : char name[NAMEDATALEN];
168 : remoteConn rconn;
169 : } remoteConnHashEnt;
170 :
171 : /* initial number of connection hashes */
172 : #define NUMCONN 16
173 :
174 : pg_noreturn static void
175 0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
176 : {
177 0 : char *msg = pchomp(PQerrorMessage(conn));
178 :
179 0 : PQclear(res);
180 0 : elog(ERROR, "%s: %s", p2, msg);
181 : }
182 :
183 : pg_noreturn static void
184 3 : dblink_conn_not_avail(const char *conname)
185 : {
186 3 : if (conname)
187 1 : ereport(ERROR,
188 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
189 : errmsg("connection \"%s\" not available", conname)));
190 : else
191 2 : ereport(ERROR,
192 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
193 : errmsg("connection not available")));
194 : }
195 :
196 : static void
197 37 : dblink_get_conn(char *conname_or_str,
198 : PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
199 : {
200 37 : remoteConn *rconn = getConnectionByName(conname_or_str);
201 : PGconn *conn;
202 : char *conname;
203 : bool freeconn;
204 :
205 37 : if (rconn)
206 : {
207 27 : conn = rconn->conn;
208 27 : conname = conname_or_str;
209 27 : freeconn = false;
210 : }
211 : else
212 : {
213 : const char *connstr;
214 :
215 10 : connstr = get_connect_string(conname_or_str);
216 10 : if (connstr == NULL)
217 6 : connstr = conname_or_str;
218 10 : dblink_connstr_check(connstr);
219 :
220 : /* first time, allocate or get the custom wait event */
221 9 : if (dblink_we_get_conn == 0)
222 5 : dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
223 :
224 : /* OK to make connection */
225 9 : conn = libpqsrv_connect(connstr, dblink_we_get_conn);
226 :
227 9 : if (PQstatus(conn) == CONNECTION_BAD)
228 : {
229 3 : char *msg = pchomp(PQerrorMessage(conn));
230 :
231 3 : libpqsrv_disconnect(conn);
232 3 : ereport(ERROR,
233 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
234 : errmsg("could not establish connection"),
235 : errdetail_internal("%s", msg)));
236 : }
237 :
238 6 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
239 : "received message via remote connection");
240 :
241 6 : dblink_security_check(conn, NULL, connstr);
242 6 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
243 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
244 6 : freeconn = true;
245 6 : conname = NULL;
246 : }
247 :
248 33 : *conn_p = conn;
249 33 : *conname_p = conname;
250 33 : *freeconn_p = freeconn;
251 33 : }
252 :
253 : static PGconn *
254 17 : dblink_get_named_conn(const char *conname)
255 : {
256 17 : remoteConn *rconn = getConnectionByName(conname);
257 :
258 17 : if (rconn)
259 17 : return rconn->conn;
260 :
261 0 : dblink_conn_not_avail(conname);
262 : return NULL; /* keep compiler quiet */
263 : }
264 :
265 : static void
266 124 : dblink_init(void)
267 : {
268 124 : if (!pconn)
269 : {
270 7 : if (dblink_we_get_result == 0)
271 7 : dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
272 :
273 7 : pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
274 7 : pconn->conn = NULL;
275 7 : pconn->openCursorCount = 0;
276 7 : pconn->newXactForCursor = false;
277 : }
278 124 : }
279 :
280 : /*
281 : * Create a persistent connection to another database
282 : */
283 13 : PG_FUNCTION_INFO_V1(dblink_connect);
284 : Datum
285 16 : dblink_connect(PG_FUNCTION_ARGS)
286 : {
287 16 : char *conname_or_str = NULL;
288 16 : char *connstr = NULL;
289 16 : char *connname = NULL;
290 : char *msg;
291 16 : PGconn *conn = NULL;
292 16 : remoteConn *rconn = NULL;
293 :
294 16 : dblink_init();
295 :
296 16 : if (PG_NARGS() == 2)
297 : {
298 11 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
299 11 : connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
300 : }
301 5 : else if (PG_NARGS() == 1)
302 5 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
303 :
304 : /* first check for valid foreign data server */
305 16 : connstr = get_connect_string(conname_or_str);
306 16 : if (connstr == NULL)
307 14 : connstr = conname_or_str;
308 :
309 : /* check password in connection string if not superuser */
310 16 : dblink_connstr_check(connstr);
311 :
312 : /* first time, allocate or get the custom wait event */
313 15 : if (dblink_we_connect == 0)
314 2 : dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
315 :
316 : /* if we need a hashtable entry, make that first, since it might fail */
317 15 : if (connname)
318 : {
319 10 : rconn = createNewConnection(connname);
320 : Assert(rconn->conn == NULL);
321 : }
322 :
323 : /* OK to make connection */
324 14 : conn = libpqsrv_connect(connstr, dblink_we_connect);
325 :
326 14 : if (PQstatus(conn) == CONNECTION_BAD)
327 : {
328 0 : msg = pchomp(PQerrorMessage(conn));
329 0 : libpqsrv_disconnect(conn);
330 0 : if (connname)
331 0 : deleteConnection(connname);
332 :
333 0 : ereport(ERROR,
334 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
335 : errmsg("could not establish connection"),
336 : errdetail_internal("%s", msg)));
337 : }
338 :
339 14 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
340 : "received message via remote connection");
341 :
342 : /* check password actually used if not superuser */
343 14 : dblink_security_check(conn, connname, connstr);
344 :
345 : /* attempt to set client encoding to match server encoding, if needed */
346 14 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
347 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
348 :
349 : /* all OK, save away the conn */
350 14 : if (connname)
351 : {
352 9 : rconn->conn = conn;
353 : }
354 : else
355 : {
356 5 : if (pconn->conn)
357 1 : libpqsrv_disconnect(pconn->conn);
358 5 : pconn->conn = conn;
359 : }
360 :
361 14 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
362 : }
363 :
364 : /*
365 : * Clear a persistent connection to another database
366 : */
367 8 : PG_FUNCTION_INFO_V1(dblink_disconnect);
368 : Datum
369 13 : dblink_disconnect(PG_FUNCTION_ARGS)
370 : {
371 13 : char *conname = NULL;
372 13 : remoteConn *rconn = NULL;
373 13 : PGconn *conn = NULL;
374 :
375 13 : dblink_init();
376 :
377 13 : if (PG_NARGS() == 1)
378 : {
379 9 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
380 9 : rconn = getConnectionByName(conname);
381 9 : if (rconn)
382 8 : conn = rconn->conn;
383 : }
384 : else
385 4 : conn = pconn->conn;
386 :
387 13 : if (!conn)
388 1 : dblink_conn_not_avail(conname);
389 :
390 12 : libpqsrv_disconnect(conn);
391 12 : if (rconn)
392 8 : deleteConnection(conname);
393 : else
394 4 : pconn->conn = NULL;
395 :
396 12 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
397 : }
398 :
399 : /*
400 : * opens a cursor using a persistent connection
401 : */
402 13 : PG_FUNCTION_INFO_V1(dblink_open);
403 : Datum
404 9 : dblink_open(PG_FUNCTION_ARGS)
405 : {
406 9 : PGresult *res = NULL;
407 : PGconn *conn;
408 9 : char *curname = NULL;
409 9 : char *sql = NULL;
410 9 : char *conname = NULL;
411 : StringInfoData buf;
412 9 : remoteConn *rconn = NULL;
413 9 : bool fail = true; /* default to backward compatible behavior */
414 :
415 9 : dblink_init();
416 9 : initStringInfo(&buf);
417 :
418 9 : if (PG_NARGS() == 2)
419 : {
420 : /* text,text */
421 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
422 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
423 2 : rconn = pconn;
424 : }
425 7 : else if (PG_NARGS() == 3)
426 : {
427 : /* might be text,text,text or text,text,bool */
428 6 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
429 : {
430 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
431 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
432 1 : fail = PG_GETARG_BOOL(2);
433 1 : rconn = pconn;
434 : }
435 : else
436 : {
437 5 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
438 5 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
439 5 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
440 5 : rconn = getConnectionByName(conname);
441 : }
442 : }
443 1 : else if (PG_NARGS() == 4)
444 : {
445 : /* text,text,text,bool */
446 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
447 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
448 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
449 1 : fail = PG_GETARG_BOOL(3);
450 1 : rconn = getConnectionByName(conname);
451 : }
452 :
453 9 : if (!rconn || !rconn->conn)
454 0 : dblink_conn_not_avail(conname);
455 :
456 9 : conn = rconn->conn;
457 :
458 : /* If we are not in a transaction, start one */
459 9 : if (PQtransactionStatus(conn) == PQTRANS_IDLE)
460 : {
461 7 : res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
462 7 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
463 0 : dblink_res_internalerror(conn, res, "begin error");
464 7 : PQclear(res);
465 7 : rconn->newXactForCursor = true;
466 :
467 : /*
468 : * Since transaction state was IDLE, we force cursor count to
469 : * initially be 0. This is needed as a previous ABORT might have wiped
470 : * out our transaction without maintaining the cursor count for us.
471 : */
472 7 : rconn->openCursorCount = 0;
473 : }
474 :
475 : /* if we started a transaction, increment cursor count */
476 9 : if (rconn->newXactForCursor)
477 9 : (rconn->openCursorCount)++;
478 :
479 9 : appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
480 9 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
481 9 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
482 : {
483 2 : dblink_res_error(conn, conname, res, fail,
484 : "while opening cursor \"%s\"", curname);
485 2 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
486 : }
487 :
488 7 : PQclear(res);
489 7 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
490 : }
491 :
492 : /*
493 : * closes a cursor
494 : */
495 10 : PG_FUNCTION_INFO_V1(dblink_close);
496 : Datum
497 5 : dblink_close(PG_FUNCTION_ARGS)
498 : {
499 : PGconn *conn;
500 5 : PGresult *res = NULL;
501 5 : char *curname = NULL;
502 5 : char *conname = NULL;
503 : StringInfoData buf;
504 5 : remoteConn *rconn = NULL;
505 5 : bool fail = true; /* default to backward compatible behavior */
506 :
507 5 : dblink_init();
508 5 : initStringInfo(&buf);
509 :
510 5 : if (PG_NARGS() == 1)
511 : {
512 : /* text */
513 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
514 0 : rconn = pconn;
515 : }
516 5 : else if (PG_NARGS() == 2)
517 : {
518 : /* might be text,text or text,bool */
519 5 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
520 : {
521 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
522 2 : fail = PG_GETARG_BOOL(1);
523 2 : rconn = pconn;
524 : }
525 : else
526 : {
527 3 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
528 3 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
529 3 : rconn = getConnectionByName(conname);
530 : }
531 : }
532 5 : if (PG_NARGS() == 3)
533 : {
534 : /* text,text,bool */
535 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
536 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
537 0 : fail = PG_GETARG_BOOL(2);
538 0 : rconn = getConnectionByName(conname);
539 : }
540 :
541 5 : if (!rconn || !rconn->conn)
542 0 : dblink_conn_not_avail(conname);
543 :
544 5 : conn = rconn->conn;
545 :
546 5 : appendStringInfo(&buf, "CLOSE %s", curname);
547 :
548 : /* close the cursor */
549 5 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
550 5 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
551 : {
552 1 : dblink_res_error(conn, conname, res, fail,
553 : "while closing cursor \"%s\"", curname);
554 1 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
555 : }
556 :
557 4 : PQclear(res);
558 :
559 : /* if we started a transaction, decrement cursor count */
560 4 : if (rconn->newXactForCursor)
561 : {
562 4 : (rconn->openCursorCount)--;
563 :
564 : /* if count is zero, commit the transaction */
565 4 : if (rconn->openCursorCount == 0)
566 : {
567 2 : rconn->newXactForCursor = false;
568 :
569 2 : res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
570 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
571 0 : dblink_res_internalerror(conn, res, "commit error");
572 2 : PQclear(res);
573 : }
574 : }
575 :
576 4 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
577 : }
578 :
579 : /*
580 : * Fetch results from an open cursor
581 : */
582 13 : PG_FUNCTION_INFO_V1(dblink_fetch);
583 : Datum
584 13 : dblink_fetch(PG_FUNCTION_ARGS)
585 : {
586 13 : PGresult *res = NULL;
587 13 : char *conname = NULL;
588 13 : remoteConn *rconn = NULL;
589 13 : PGconn *conn = NULL;
590 : StringInfoData buf;
591 13 : char *curname = NULL;
592 13 : int howmany = 0;
593 13 : bool fail = true; /* default to backward compatible */
594 :
595 13 : prepTuplestoreResult(fcinfo);
596 :
597 13 : dblink_init();
598 :
599 13 : if (PG_NARGS() == 4)
600 : {
601 : /* text,text,int,bool */
602 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
603 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
604 1 : howmany = PG_GETARG_INT32(2);
605 1 : fail = PG_GETARG_BOOL(3);
606 :
607 1 : rconn = getConnectionByName(conname);
608 1 : if (rconn)
609 1 : conn = rconn->conn;
610 : }
611 12 : else if (PG_NARGS() == 3)
612 : {
613 : /* text,text,int or text,int,bool */
614 8 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
615 : {
616 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
617 2 : howmany = PG_GETARG_INT32(1);
618 2 : fail = PG_GETARG_BOOL(2);
619 2 : conn = pconn->conn;
620 : }
621 : else
622 : {
623 6 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
624 6 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
625 6 : howmany = PG_GETARG_INT32(2);
626 :
627 6 : rconn = getConnectionByName(conname);
628 6 : if (rconn)
629 6 : conn = rconn->conn;
630 : }
631 : }
632 4 : else if (PG_NARGS() == 2)
633 : {
634 : /* text,int */
635 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
636 4 : howmany = PG_GETARG_INT32(1);
637 4 : conn = pconn->conn;
638 : }
639 :
640 13 : if (!conn)
641 0 : dblink_conn_not_avail(conname);
642 :
643 13 : initStringInfo(&buf);
644 13 : appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
645 :
646 : /*
647 : * Try to execute the query. Note that since libpq uses malloc, the
648 : * PGresult will be long-lived even though we are still in a short-lived
649 : * memory context.
650 : */
651 13 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
652 26 : if (!res ||
653 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
654 13 : PQresultStatus(res) != PGRES_TUPLES_OK))
655 : {
656 5 : dblink_res_error(conn, conname, res, fail,
657 : "while fetching from cursor \"%s\"", curname);
658 3 : return (Datum) 0;
659 : }
660 8 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
661 : {
662 : /* cursor does not exist - closed already or bad name */
663 0 : PQclear(res);
664 0 : ereport(ERROR,
665 : (errcode(ERRCODE_INVALID_CURSOR_NAME),
666 : errmsg("cursor \"%s\" does not exist", curname)));
667 : }
668 :
669 8 : materializeResult(fcinfo, conn, res);
670 7 : return (Datum) 0;
671 : }
672 :
673 : /*
674 : * Note: this is the new preferred version of dblink
675 : */
676 19 : PG_FUNCTION_INFO_V1(dblink_record);
677 : Datum
678 29 : dblink_record(PG_FUNCTION_ARGS)
679 : {
680 29 : return dblink_record_internal(fcinfo, false);
681 : }
682 :
683 4 : PG_FUNCTION_INFO_V1(dblink_send_query);
684 : Datum
685 6 : dblink_send_query(PG_FUNCTION_ARGS)
686 : {
687 : PGconn *conn;
688 : char *sql;
689 : int retval;
690 :
691 6 : if (PG_NARGS() == 2)
692 : {
693 6 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
694 6 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
695 : }
696 : else
697 : /* shouldn't happen */
698 0 : elog(ERROR, "wrong number of arguments");
699 :
700 : /* async query send */
701 6 : retval = PQsendQuery(conn, sql);
702 6 : if (retval != 1)
703 0 : elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
704 :
705 6 : PG_RETURN_INT32(retval);
706 : }
707 :
708 6 : PG_FUNCTION_INFO_V1(dblink_get_result);
709 : Datum
710 8 : dblink_get_result(PG_FUNCTION_ARGS)
711 : {
712 8 : return dblink_record_internal(fcinfo, true);
713 : }
714 :
715 : static Datum
716 37 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
717 : {
718 37 : PGconn *volatile conn = NULL;
719 37 : volatile bool freeconn = false;
720 :
721 37 : prepTuplestoreResult(fcinfo);
722 :
723 37 : dblink_init();
724 :
725 37 : PG_TRY();
726 : {
727 37 : char *sql = NULL;
728 37 : char *conname = NULL;
729 37 : bool fail = true; /* default to backward compatible */
730 :
731 37 : if (!is_async)
732 : {
733 29 : if (PG_NARGS() == 3)
734 : {
735 : /* text,text,bool */
736 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
737 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
738 1 : fail = PG_GETARG_BOOL(2);
739 1 : dblink_get_conn(conname, &conn, &conname, &freeconn);
740 : }
741 28 : else if (PG_NARGS() == 2)
742 : {
743 : /* text,text or text,bool */
744 21 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
745 : {
746 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
747 1 : fail = PG_GETARG_BOOL(1);
748 1 : conn = pconn->conn;
749 : }
750 : else
751 : {
752 20 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
753 20 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
754 20 : dblink_get_conn(conname, &conn, &conname, &freeconn);
755 : }
756 : }
757 7 : else if (PG_NARGS() == 1)
758 : {
759 : /* text */
760 7 : conn = pconn->conn;
761 7 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
762 : }
763 : else
764 : /* shouldn't happen */
765 0 : elog(ERROR, "wrong number of arguments");
766 : }
767 : else /* is_async */
768 : {
769 : /* get async result */
770 8 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
771 :
772 8 : if (PG_NARGS() == 2)
773 : {
774 : /* text,bool */
775 0 : fail = PG_GETARG_BOOL(1);
776 0 : conn = dblink_get_named_conn(conname);
777 : }
778 8 : else if (PG_NARGS() == 1)
779 : {
780 : /* text */
781 8 : conn = dblink_get_named_conn(conname);
782 : }
783 : else
784 : /* shouldn't happen */
785 0 : elog(ERROR, "wrong number of arguments");
786 : }
787 :
788 33 : if (!conn)
789 2 : dblink_conn_not_avail(conname);
790 :
791 31 : if (!is_async)
792 : {
793 : /* synchronous query, use efficient tuple collection method */
794 23 : materializeQueryResult(fcinfo, conn, conname, sql, fail);
795 : }
796 : else
797 : {
798 : /* async result retrieval, do it the old way */
799 8 : PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);
800 :
801 : /* NULL means we're all done with the async results */
802 8 : if (res)
803 : {
804 5 : if (PQresultStatus(res) != PGRES_COMMAND_OK &&
805 5 : PQresultStatus(res) != PGRES_TUPLES_OK)
806 : {
807 0 : dblink_res_error(conn, conname, res, fail,
808 : "while executing query");
809 : /* if fail isn't set, we'll return an empty query result */
810 : }
811 : else
812 : {
813 5 : materializeResult(fcinfo, conn, res);
814 : }
815 : }
816 : }
817 : }
818 6 : PG_FINALLY();
819 : {
820 : /* if needed, close the connection to the database */
821 37 : if (freeconn)
822 5 : libpqsrv_disconnect(conn);
823 : }
824 37 : PG_END_TRY();
825 :
826 31 : return (Datum) 0;
827 : }
828 :
829 : /*
830 : * Verify function caller can handle a tuplestore result, and set up for that.
831 : *
832 : * Note: if the caller returns without actually creating a tuplestore, the
833 : * executor will treat the function result as an empty set.
834 : */
835 : static void
836 50 : prepTuplestoreResult(FunctionCallInfo fcinfo)
837 : {
838 50 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
839 :
840 : /* check to see if query supports us returning a tuplestore */
841 50 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
842 0 : ereport(ERROR,
843 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
844 : errmsg("set-valued function called in context that cannot accept a set")));
845 50 : if (!(rsinfo->allowedModes & SFRM_Materialize))
846 0 : ereport(ERROR,
847 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
848 : errmsg("materialize mode required, but it is not allowed in this context")));
849 :
850 : /* let the executor know we're sending back a tuplestore */
851 50 : rsinfo->returnMode = SFRM_Materialize;
852 :
853 : /* caller must fill these to return a non-empty result */
854 50 : rsinfo->setResult = NULL;
855 50 : rsinfo->setDesc = NULL;
856 50 : }
857 :
858 : /*
859 : * Copy the contents of the PGresult into a tuplestore to be returned
860 : * as the result of the current function.
861 : * The PGresult will be released in this function.
862 : */
863 : static void
864 13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
865 : {
866 13 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
867 : TupleDesc tupdesc;
868 : bool is_sql_cmd;
869 : int ntuples;
870 : int nfields;
871 :
872 : /* prepTuplestoreResult must have been called previously */
873 : Assert(rsinfo->returnMode == SFRM_Materialize);
874 :
875 13 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
876 : {
877 0 : is_sql_cmd = true;
878 :
879 : /*
880 : * need a tuple descriptor representing one TEXT column to return the
881 : * command status string as our result tuple
882 : */
883 0 : tupdesc = CreateTemplateTupleDesc(1);
884 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
885 : TEXTOID, -1, 0);
886 0 : TupleDescFinalize(tupdesc);
887 0 : ntuples = 1;
888 0 : nfields = 1;
889 : }
890 : else
891 : {
892 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
893 :
894 13 : is_sql_cmd = false;
895 :
896 : /* get a tuple descriptor for our result type */
897 13 : switch (get_call_result_type(fcinfo, NULL, &tupdesc))
898 : {
899 13 : case TYPEFUNC_COMPOSITE:
900 : /* success */
901 13 : break;
902 0 : case TYPEFUNC_RECORD:
903 : /* failed to determine actual type of RECORD */
904 0 : ereport(ERROR,
905 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
906 : errmsg("function returning record called in context "
907 : "that cannot accept type record")));
908 : break;
909 0 : default:
910 : /* result type isn't composite */
911 0 : elog(ERROR, "return type must be a row type");
912 : break;
913 : }
914 :
915 : /* make sure we have a persistent copy of the tupdesc */
916 13 : tupdesc = CreateTupleDescCopy(tupdesc);
917 13 : ntuples = PQntuples(res);
918 13 : nfields = PQnfields(res);
919 : }
920 :
921 : /*
922 : * check result and tuple descriptor have the same number of columns
923 : */
924 13 : if (nfields != tupdesc->natts)
925 0 : ereport(ERROR,
926 : (errcode(ERRCODE_DATATYPE_MISMATCH),
927 : errmsg("remote query result rowtype does not match "
928 : "the specified FROM clause rowtype")));
929 :
930 13 : if (ntuples > 0)
931 : {
932 : AttInMetadata *attinmeta;
933 13 : int nestlevel = -1;
934 : Tuplestorestate *tupstore;
935 : MemoryContext oldcontext;
936 : int row;
937 : char **values;
938 :
939 13 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
940 :
941 : /* Set GUCs to ensure we read GUC-sensitive data types correctly */
942 13 : if (!is_sql_cmd)
943 13 : nestlevel = applyRemoteGucs(conn);
944 :
945 13 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
946 13 : tupstore = tuplestore_begin_heap(true, false, work_mem);
947 13 : rsinfo->setResult = tupstore;
948 13 : rsinfo->setDesc = tupdesc;
949 13 : MemoryContextSwitchTo(oldcontext);
950 :
951 13 : values = palloc_array(char *, nfields);
952 :
953 : /* put all tuples into the tuplestore */
954 49 : for (row = 0; row < ntuples; row++)
955 : {
956 : HeapTuple tuple;
957 :
958 37 : if (!is_sql_cmd)
959 : {
960 : int i;
961 :
962 138 : for (i = 0; i < nfields; i++)
963 : {
964 101 : if (PQgetisnull(res, row, i))
965 0 : values[i] = NULL;
966 : else
967 101 : values[i] = PQgetvalue(res, row, i);
968 : }
969 : }
970 : else
971 : {
972 0 : values[0] = PQcmdStatus(res);
973 : }
974 :
975 : /* build the tuple and put it into the tuplestore. */
976 37 : tuple = BuildTupleFromCStrings(attinmeta, values);
977 36 : tuplestore_puttuple(tupstore, tuple);
978 : }
979 :
980 : /* clean up GUC settings, if we changed any */
981 12 : restoreLocalGucs(nestlevel);
982 : }
983 :
984 12 : PQclear(res);
985 12 : }
986 :
987 : /*
988 : * Execute the given SQL command and store its results into a tuplestore
989 : * to be returned as the result of the current function.
990 : *
991 : * This is equivalent to PQexec followed by materializeResult, but we make
992 : * use of libpq's single-row mode to avoid accumulating the whole result
993 : * inside libpq before it gets transferred to the tuplestore.
994 : */
995 : static void
996 23 : materializeQueryResult(FunctionCallInfo fcinfo,
997 : PGconn *conn,
998 : const char *conname,
999 : const char *sql,
1000 : bool fail)
1001 : {
1002 23 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1003 :
1004 : /* prepTuplestoreResult must have been called previously */
1005 : Assert(rsinfo->returnMode == SFRM_Materialize);
1006 :
1007 : /* Use a PG_TRY block to ensure we pump libpq dry of results */
1008 23 : PG_TRY();
1009 : {
1010 23 : storeInfo sinfo = {0};
1011 : PGresult *res;
1012 :
1013 23 : sinfo.fcinfo = fcinfo;
1014 : /* Create short-lived memory context for data conversions */
1015 23 : sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1016 : "dblink temporary context",
1017 : ALLOCSET_DEFAULT_SIZES);
1018 :
1019 : /* execute query, collecting any tuples into the tuplestore */
1020 23 : res = storeQueryResult(&sinfo, conn, sql);
1021 :
1022 23 : if (!res ||
1023 23 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1024 23 : PQresultStatus(res) != PGRES_TUPLES_OK))
1025 : {
1026 2 : dblink_res_error(conn, conname, res, fail,
1027 : "while executing query");
1028 : /* if fail isn't set, we'll return an empty query result */
1029 : }
1030 21 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1031 : {
1032 : /*
1033 : * storeRow didn't get called, so we need to convert the command
1034 : * status string to a tuple manually
1035 : */
1036 : TupleDesc tupdesc;
1037 : AttInMetadata *attinmeta;
1038 : Tuplestorestate *tupstore;
1039 : HeapTuple tuple;
1040 : char *values[1];
1041 : MemoryContext oldcontext;
1042 :
1043 : /*
1044 : * need a tuple descriptor representing one TEXT column to return
1045 : * the command status string as our result tuple
1046 : */
1047 0 : tupdesc = CreateTemplateTupleDesc(1);
1048 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1049 : TEXTOID, -1, 0);
1050 0 : TupleDescFinalize(tupdesc);
1051 0 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1052 :
1053 0 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1054 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1055 0 : rsinfo->setResult = tupstore;
1056 0 : rsinfo->setDesc = tupdesc;
1057 0 : MemoryContextSwitchTo(oldcontext);
1058 :
1059 0 : values[0] = PQcmdStatus(res);
1060 :
1061 : /* build the tuple and put it into the tuplestore. */
1062 0 : tuple = BuildTupleFromCStrings(attinmeta, values);
1063 0 : tuplestore_puttuple(tupstore, tuple);
1064 :
1065 0 : PQclear(res);
1066 : }
1067 : else
1068 : {
1069 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1070 : /* storeRow should have created a tuplestore */
1071 : Assert(rsinfo->setResult != NULL);
1072 :
1073 21 : PQclear(res);
1074 : }
1075 :
1076 : /* clean up data conversion short-lived memory context */
1077 23 : if (sinfo.tmpcontext != NULL)
1078 23 : MemoryContextDelete(sinfo.tmpcontext);
1079 :
1080 23 : PQclear(sinfo.last_res);
1081 23 : PQclear(sinfo.cur_res);
1082 : }
1083 0 : PG_CATCH();
1084 : {
1085 : PGresult *res;
1086 :
1087 : /* be sure to clear out any pending data in libpq */
1088 0 : while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
1089 : NULL)
1090 0 : PQclear(res);
1091 0 : PG_RE_THROW();
1092 : }
1093 23 : PG_END_TRY();
1094 23 : }
1095 :
1096 : /*
1097 : * Execute query, and send any result rows to sinfo->tuplestore.
1098 : */
1099 : static PGresult *
1100 23 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
1101 : {
1102 23 : bool first = true;
1103 23 : int nestlevel = -1;
1104 : PGresult *res;
1105 :
1106 23 : if (!PQsendQuery(conn, sql))
1107 0 : elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1108 :
1109 23 : if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1110 0 : elog(ERROR, "failed to set single-row mode for dblink query");
1111 :
1112 : for (;;)
1113 : {
1114 198 : CHECK_FOR_INTERRUPTS();
1115 :
1116 198 : sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
1117 198 : if (!sinfo->cur_res)
1118 23 : break;
1119 :
1120 175 : if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1121 : {
1122 : /* got one row from possibly-bigger resultset */
1123 :
1124 : /*
1125 : * Set GUCs to ensure we read GUC-sensitive data types correctly.
1126 : * We shouldn't do this until we have a row in hand, to ensure
1127 : * libpq has seen any earlier ParameterStatus protocol messages.
1128 : */
1129 152 : if (first && nestlevel < 0)
1130 21 : nestlevel = applyRemoteGucs(conn);
1131 :
1132 152 : storeRow(sinfo, sinfo->cur_res, first);
1133 :
1134 152 : PQclear(sinfo->cur_res);
1135 152 : sinfo->cur_res = NULL;
1136 152 : first = false;
1137 : }
1138 : else
1139 : {
1140 : /* if empty resultset, fill tuplestore header */
1141 23 : if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1142 0 : storeRow(sinfo, sinfo->cur_res, first);
1143 :
1144 : /* store completed result at last_res */
1145 23 : PQclear(sinfo->last_res);
1146 23 : sinfo->last_res = sinfo->cur_res;
1147 23 : sinfo->cur_res = NULL;
1148 23 : first = true;
1149 : }
1150 : }
1151 :
1152 : /* clean up GUC settings, if we changed any */
1153 23 : restoreLocalGucs(nestlevel);
1154 :
1155 : /* return last_res */
1156 23 : res = sinfo->last_res;
1157 23 : sinfo->last_res = NULL;
1158 23 : return res;
1159 : }
1160 :
1161 : /*
1162 : * Send single row to sinfo->tuplestore.
1163 : *
1164 : * If "first" is true, create the tuplestore using PGresult's metadata
1165 : * (in this case the PGresult might contain either zero or one row).
1166 : */
1167 : static void
1168 152 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
1169 : {
1170 152 : int nfields = PQnfields(res);
1171 : HeapTuple tuple;
1172 : int i;
1173 : MemoryContext oldcontext;
1174 :
1175 152 : if (first)
1176 : {
1177 : /* Prepare for new result set */
1178 21 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1179 : TupleDesc tupdesc;
1180 :
1181 : /*
1182 : * It's possible to get more than one result set if the query string
1183 : * contained multiple SQL commands. In that case, we follow PQexec's
1184 : * traditional behavior of throwing away all but the last result.
1185 : */
1186 21 : if (sinfo->tuplestore)
1187 0 : tuplestore_end(sinfo->tuplestore);
1188 21 : sinfo->tuplestore = NULL;
1189 :
1190 : /* get a tuple descriptor for our result type */
1191 21 : switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1192 : {
1193 21 : case TYPEFUNC_COMPOSITE:
1194 : /* success */
1195 21 : break;
1196 0 : case TYPEFUNC_RECORD:
1197 : /* failed to determine actual type of RECORD */
1198 0 : ereport(ERROR,
1199 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1200 : errmsg("function returning record called in context "
1201 : "that cannot accept type record")));
1202 : break;
1203 0 : default:
1204 : /* result type isn't composite */
1205 0 : elog(ERROR, "return type must be a row type");
1206 : break;
1207 : }
1208 :
1209 : /* make sure we have a persistent copy of the tupdesc */
1210 21 : tupdesc = CreateTupleDescCopy(tupdesc);
1211 :
1212 : /* check result and tuple descriptor have the same number of columns */
1213 21 : if (nfields != tupdesc->natts)
1214 0 : ereport(ERROR,
1215 : (errcode(ERRCODE_DATATYPE_MISMATCH),
1216 : errmsg("remote query result rowtype does not match "
1217 : "the specified FROM clause rowtype")));
1218 :
1219 : /* Prepare attinmeta for later data conversions */
1220 21 : sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1221 :
1222 : /* Create a new, empty tuplestore */
1223 21 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1224 21 : sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1225 21 : rsinfo->setResult = sinfo->tuplestore;
1226 21 : rsinfo->setDesc = tupdesc;
1227 21 : MemoryContextSwitchTo(oldcontext);
1228 :
1229 : /* Done if empty resultset */
1230 21 : if (PQntuples(res) == 0)
1231 0 : return;
1232 :
1233 : /*
1234 : * Set up sufficiently-wide string pointers array; this won't change
1235 : * in size so it's easy to preallocate.
1236 : */
1237 21 : if (sinfo->cstrs)
1238 0 : pfree(sinfo->cstrs);
1239 21 : sinfo->cstrs = palloc_array(char *, nfields);
1240 : }
1241 :
1242 : /* Should have a single-row result if we get here */
1243 : Assert(PQntuples(res) == 1);
1244 :
1245 : /*
1246 : * Do the following work in a temp context that we reset after each tuple.
1247 : * This cleans up not only the data we have direct access to, but any
1248 : * cruft the I/O functions might leak.
1249 : */
1250 152 : oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1251 :
1252 : /*
1253 : * Fill cstrs with null-terminated strings of column values.
1254 : */
1255 570 : for (i = 0; i < nfields; i++)
1256 : {
1257 418 : if (PQgetisnull(res, 0, i))
1258 0 : sinfo->cstrs[i] = NULL;
1259 : else
1260 418 : sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1261 : }
1262 :
1263 : /* Convert row to a tuple, and add it to the tuplestore */
1264 152 : tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1265 :
1266 152 : tuplestore_puttuple(sinfo->tuplestore, tuple);
1267 :
1268 : /* Clean up */
1269 152 : MemoryContextSwitchTo(oldcontext);
1270 152 : MemoryContextReset(sinfo->tmpcontext);
1271 : }
1272 :
1273 : /*
1274 : * List all open dblink connections by name.
1275 : * Returns an array of all connection names.
1276 : * Takes no params
1277 : */
1278 3 : PG_FUNCTION_INFO_V1(dblink_get_connections);
1279 : Datum
1280 1 : dblink_get_connections(PG_FUNCTION_ARGS)
1281 : {
1282 : HASH_SEQ_STATUS status;
1283 : remoteConnHashEnt *hentry;
1284 1 : ArrayBuildState *astate = NULL;
1285 :
1286 1 : if (remoteConnHash)
1287 : {
1288 1 : hash_seq_init(&status, remoteConnHash);
1289 4 : while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1290 : {
1291 : /* ignore it if it's not an open connection */
1292 3 : if (hentry->rconn.conn == NULL)
1293 0 : continue;
1294 : /* stash away current value */
1295 3 : astate = accumArrayResult(astate,
1296 3 : CStringGetTextDatum(hentry->name),
1297 : false, TEXTOID, CurrentMemoryContext);
1298 : }
1299 : }
1300 :
1301 1 : if (astate)
1302 1 : PG_RETURN_DATUM(makeArrayResult(astate,
1303 : CurrentMemoryContext));
1304 : else
1305 0 : PG_RETURN_NULL();
1306 : }
1307 :
1308 : /*
1309 : * Checks if a given remote connection is busy
1310 : *
1311 : * Returns 1 if the connection is busy, 0 otherwise
1312 : * Params:
1313 : * text connection_name - name of the connection to check
1314 : *
1315 : */
1316 3 : PG_FUNCTION_INFO_V1(dblink_is_busy);
1317 : Datum
1318 1 : dblink_is_busy(PG_FUNCTION_ARGS)
1319 : {
1320 : PGconn *conn;
1321 :
1322 1 : dblink_init();
1323 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1324 :
1325 1 : PQconsumeInput(conn);
1326 1 : PG_RETURN_INT32(PQisBusy(conn));
1327 : }
1328 :
1329 : /*
1330 : * Cancels a running request on a connection
1331 : *
1332 : * Returns text:
1333 : * "OK" if the cancel request has been sent correctly,
1334 : * an error message otherwise
1335 : *
1336 : * Params:
1337 : * text connection_name - name of the connection to check
1338 : *
1339 : */
1340 3 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
1341 : Datum
1342 1 : dblink_cancel_query(PG_FUNCTION_ARGS)
1343 : {
1344 : PGconn *conn;
1345 : const char *msg;
1346 : TimestampTz endtime;
1347 :
1348 1 : dblink_init();
1349 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1350 1 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1351 : 30000);
1352 1 : msg = libpqsrv_cancel(conn, endtime);
1353 1 : if (msg == NULL)
1354 1 : msg = "OK";
1355 :
1356 1 : PG_RETURN_TEXT_P(cstring_to_text(msg));
1357 : }
1358 :
1359 :
1360 : /*
1361 : * Get error message from a connection
1362 : *
1363 : * Returns text:
1364 : * "OK" if no error, an error message otherwise
1365 : *
1366 : * Params:
1367 : * text connection_name - name of the connection to check
1368 : *
1369 : */
1370 3 : PG_FUNCTION_INFO_V1(dblink_error_message);
1371 : Datum
1372 1 : dblink_error_message(PG_FUNCTION_ARGS)
1373 : {
1374 : char *msg;
1375 : PGconn *conn;
1376 :
1377 1 : dblink_init();
1378 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1379 :
1380 1 : msg = PQerrorMessage(conn);
1381 1 : if (msg == NULL || msg[0] == '\0')
1382 1 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1383 : else
1384 0 : PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1385 : }
1386 :
1387 : /*
1388 : * Execute an SQL non-SELECT command
1389 : */
1390 13 : PG_FUNCTION_INFO_V1(dblink_exec);
1391 : Datum
1392 26 : dblink_exec(PG_FUNCTION_ARGS)
1393 : {
1394 26 : text *volatile sql_cmd_status = NULL;
1395 26 : PGconn *volatile conn = NULL;
1396 26 : volatile bool freeconn = false;
1397 :
1398 26 : dblink_init();
1399 :
1400 26 : PG_TRY();
1401 : {
1402 26 : PGresult *res = NULL;
1403 26 : char *sql = NULL;
1404 26 : char *conname = NULL;
1405 26 : bool fail = true; /* default to backward compatible behavior */
1406 :
1407 26 : if (PG_NARGS() == 3)
1408 : {
1409 : /* must be text,text,bool */
1410 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1411 0 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1412 0 : fail = PG_GETARG_BOOL(2);
1413 0 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1414 : }
1415 26 : else if (PG_NARGS() == 2)
1416 : {
1417 : /* might be text,text or text,bool */
1418 17 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1419 : {
1420 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1421 1 : fail = PG_GETARG_BOOL(1);
1422 1 : conn = pconn->conn;
1423 : }
1424 : else
1425 : {
1426 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1427 16 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1428 16 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1429 : }
1430 : }
1431 9 : else if (PG_NARGS() == 1)
1432 : {
1433 : /* must be single text argument */
1434 9 : conn = pconn->conn;
1435 9 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1436 : }
1437 : else
1438 : /* shouldn't happen */
1439 0 : elog(ERROR, "wrong number of arguments");
1440 :
1441 26 : if (!conn)
1442 0 : dblink_conn_not_avail(conname);
1443 :
1444 26 : res = libpqsrv_exec(conn, sql, dblink_we_get_result);
1445 26 : if (!res ||
1446 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1447 2 : PQresultStatus(res) != PGRES_TUPLES_OK))
1448 : {
1449 2 : dblink_res_error(conn, conname, res, fail,
1450 : "while executing command");
1451 :
1452 : /*
1453 : * and save a copy of the command status string to return as our
1454 : * result tuple
1455 : */
1456 1 : sql_cmd_status = cstring_to_text("ERROR");
1457 : }
1458 24 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1459 : {
1460 : /*
1461 : * and save a copy of the command status string to return as our
1462 : * result tuple
1463 : */
1464 24 : sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1465 24 : PQclear(res);
1466 : }
1467 : else
1468 : {
1469 0 : PQclear(res);
1470 0 : ereport(ERROR,
1471 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1472 : errmsg("statement returning results not allowed")));
1473 : }
1474 : }
1475 1 : PG_FINALLY();
1476 : {
1477 : /* if needed, close the connection to the database */
1478 26 : if (freeconn)
1479 1 : libpqsrv_disconnect(conn);
1480 : }
1481 26 : PG_END_TRY();
1482 :
1483 25 : PG_RETURN_TEXT_P(sql_cmd_status);
1484 : }
1485 :
1486 :
1487 : /*
1488 : * dblink_get_pkey
1489 : *
1490 : * Return list of primary key fields for the supplied relation,
1491 : * or NULL if none exists.
1492 : */
1493 3 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
1494 : Datum
1495 9 : dblink_get_pkey(PG_FUNCTION_ARGS)
1496 : {
1497 : int16 indnkeyatts;
1498 : char **results;
1499 : FuncCallContext *funcctx;
1500 : int32 call_cntr;
1501 : int32 max_calls;
1502 : AttInMetadata *attinmeta;
1503 : MemoryContext oldcontext;
1504 :
1505 : /* stuff done only on the first call of the function */
1506 9 : if (SRF_IS_FIRSTCALL())
1507 : {
1508 : Relation rel;
1509 : TupleDesc tupdesc;
1510 :
1511 : /* create a function context for cross-call persistence */
1512 3 : funcctx = SRF_FIRSTCALL_INIT();
1513 :
1514 : /*
1515 : * switch to memory context appropriate for multiple function calls
1516 : */
1517 3 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1518 :
1519 : /* open target relation */
1520 3 : rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1521 :
1522 : /* get the array of attnums */
1523 3 : results = get_pkey_attnames(rel, &indnkeyatts);
1524 :
1525 3 : relation_close(rel, AccessShareLock);
1526 :
1527 : /*
1528 : * need a tuple descriptor representing one INT and one TEXT column
1529 : */
1530 3 : tupdesc = CreateTemplateTupleDesc(2);
1531 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1532 : INT4OID, -1, 0);
1533 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1534 : TEXTOID, -1, 0);
1535 :
1536 3 : TupleDescFinalize(tupdesc);
1537 :
1538 : /*
1539 : * Generate attribute metadata needed later to produce tuples from raw
1540 : * C strings
1541 : */
1542 3 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1543 3 : funcctx->attinmeta = attinmeta;
1544 :
1545 3 : if ((results != NULL) && (indnkeyatts > 0))
1546 : {
1547 3 : funcctx->max_calls = indnkeyatts;
1548 :
1549 : /* got results, keep track of them */
1550 3 : funcctx->user_fctx = results;
1551 : }
1552 : else
1553 : {
1554 : /* fast track when no results */
1555 0 : MemoryContextSwitchTo(oldcontext);
1556 0 : SRF_RETURN_DONE(funcctx);
1557 : }
1558 :
1559 3 : MemoryContextSwitchTo(oldcontext);
1560 : }
1561 :
1562 : /* stuff done on every call of the function */
1563 9 : funcctx = SRF_PERCALL_SETUP();
1564 :
1565 : /*
1566 : * initialize per-call variables
1567 : */
1568 9 : call_cntr = funcctx->call_cntr;
1569 9 : max_calls = funcctx->max_calls;
1570 :
1571 9 : results = (char **) funcctx->user_fctx;
1572 9 : attinmeta = funcctx->attinmeta;
1573 :
1574 9 : if (call_cntr < max_calls) /* do when there is more left to send */
1575 : {
1576 : char **values;
1577 : HeapTuple tuple;
1578 : Datum result;
1579 :
1580 6 : values = palloc_array(char *, 2);
1581 6 : values[0] = psprintf("%d", call_cntr + 1);
1582 6 : values[1] = results[call_cntr];
1583 :
1584 : /* build the tuple */
1585 6 : tuple = BuildTupleFromCStrings(attinmeta, values);
1586 :
1587 : /* make the tuple into a datum */
1588 6 : result = HeapTupleGetDatum(tuple);
1589 :
1590 6 : SRF_RETURN_NEXT(funcctx, result);
1591 : }
1592 : else
1593 : {
1594 : /* do when there is no more left */
1595 3 : SRF_RETURN_DONE(funcctx);
1596 : }
1597 : }
1598 :
1599 :
1600 : /*
1601 : * dblink_build_sql_insert
1602 : *
1603 : * Used to generate an SQL insert statement
1604 : * based on an existing tuple in a local relation.
1605 : * This is useful for selectively replicating data
1606 : * to another server via dblink.
1607 : *
1608 : * API:
1609 : * <relname> - name of local table of interest
1610 : * <pkattnums> - an int2vector of attnums which will be used
1611 : * to identify the local tuple of interest
1612 : * <pknumatts> - number of attnums in pkattnums
1613 : * <src_pkattvals_arry> - text array of key values which will be used
1614 : * to identify the local tuple of interest
1615 : * <tgt_pkattvals_arry> - text array of key values which will be used
1616 : * to build the string for execution remotely. These are substituted
1617 : * for their counterparts in src_pkattvals_arry
1618 : */
1619 4 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1620 : Datum
1621 6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
1622 : {
1623 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1624 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1625 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1626 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1627 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1628 : Relation rel;
1629 : int *pkattnums;
1630 : int pknumatts;
1631 : char **src_pkattvals;
1632 : char **tgt_pkattvals;
1633 : int src_nitems;
1634 : int tgt_nitems;
1635 : char *sql;
1636 :
1637 : /*
1638 : * Open target relation.
1639 : */
1640 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1641 :
1642 : /*
1643 : * Process pkattnums argument.
1644 : */
1645 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1646 : &pkattnums, &pknumatts);
1647 :
1648 : /*
1649 : * Source array is made up of key values that will be used to locate the
1650 : * tuple of interest from the local system.
1651 : */
1652 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1653 :
1654 : /*
1655 : * There should be one source array key value for each key attnum
1656 : */
1657 4 : if (src_nitems != pknumatts)
1658 0 : ereport(ERROR,
1659 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1660 : errmsg("source key array length must match number of key attributes")));
1661 :
1662 : /*
1663 : * Target array is made up of key values that will be used to build the
1664 : * SQL string for use on the remote system.
1665 : */
1666 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1667 :
1668 : /*
1669 : * There should be one target array key value for each key attnum
1670 : */
1671 4 : if (tgt_nitems != pknumatts)
1672 0 : ereport(ERROR,
1673 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1674 : errmsg("target key array length must match number of key attributes")));
1675 :
1676 : /*
1677 : * Prep work is finally done. Go get the SQL string.
1678 : */
1679 4 : sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1680 :
1681 : /*
1682 : * Now we can close the relation.
1683 : */
1684 4 : relation_close(rel, AccessShareLock);
1685 :
1686 : /*
1687 : * And send it
1688 : */
1689 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1690 : }
1691 :
1692 :
1693 : /*
1694 : * dblink_build_sql_delete
1695 : *
1696 : * Used to generate an SQL delete statement.
1697 : * This is useful for selectively replicating a
1698 : * delete to another server via dblink.
1699 : *
1700 : * API:
1701 : * <relname> - name of remote table of interest
1702 : * <pkattnums> - an int2vector of attnums which will be used
1703 : * to identify the remote tuple of interest
1704 : * <pknumatts> - number of attnums in pkattnums
1705 : * <tgt_pkattvals_arry> - text array of key values which will be used
1706 : * to build the string for execution remotely.
1707 : */
1708 4 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1709 : Datum
1710 6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
1711 : {
1712 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1713 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1714 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1715 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1716 : Relation rel;
1717 : int *pkattnums;
1718 : int pknumatts;
1719 : char **tgt_pkattvals;
1720 : int tgt_nitems;
1721 : char *sql;
1722 :
1723 : /*
1724 : * Open target relation.
1725 : */
1726 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1727 :
1728 : /*
1729 : * Process pkattnums argument.
1730 : */
1731 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1732 : &pkattnums, &pknumatts);
1733 :
1734 : /*
1735 : * Target array is made up of key values that will be used to build the
1736 : * SQL string for use on the remote system.
1737 : */
1738 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1739 :
1740 : /*
1741 : * There should be one target array key value for each key attnum
1742 : */
1743 4 : if (tgt_nitems != pknumatts)
1744 0 : ereport(ERROR,
1745 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1746 : errmsg("target key array length must match number of key attributes")));
1747 :
1748 : /*
1749 : * Prep work is finally done. Go get the SQL string.
1750 : */
1751 4 : sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1752 :
1753 : /*
1754 : * Now we can close the relation.
1755 : */
1756 4 : relation_close(rel, AccessShareLock);
1757 :
1758 : /*
1759 : * And send it
1760 : */
1761 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1762 : }
1763 :
1764 :
1765 : /*
1766 : * dblink_build_sql_update
1767 : *
1768 : * Used to generate an SQL update statement
1769 : * based on an existing tuple in a local relation.
1770 : * This is useful for selectively replicating data
1771 : * to another server via dblink.
1772 : *
1773 : * API:
1774 : * <relname> - name of local table of interest
1775 : * <pkattnums> - an int2vector of attnums which will be used
1776 : * to identify the local tuple of interest
1777 : * <pknumatts> - number of attnums in pkattnums
1778 : * <src_pkattvals_arry> - text array of key values which will be used
1779 : * to identify the local tuple of interest
1780 : * <tgt_pkattvals_arry> - text array of key values which will be used
1781 : * to build the string for execution remotely. These are substituted
1782 : * for their counterparts in src_pkattvals_arry
1783 : */
1784 4 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1785 : Datum
1786 6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
1787 : {
1788 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1789 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1790 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1791 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1792 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1793 : Relation rel;
1794 : int *pkattnums;
1795 : int pknumatts;
1796 : char **src_pkattvals;
1797 : char **tgt_pkattvals;
1798 : int src_nitems;
1799 : int tgt_nitems;
1800 : char *sql;
1801 :
1802 : /*
1803 : * Open target relation.
1804 : */
1805 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1806 :
1807 : /*
1808 : * Process pkattnums argument.
1809 : */
1810 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1811 : &pkattnums, &pknumatts);
1812 :
1813 : /*
1814 : * Source array is made up of key values that will be used to locate the
1815 : * tuple of interest from the local system.
1816 : */
1817 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1818 :
1819 : /*
1820 : * There should be one source array key value for each key attnum
1821 : */
1822 4 : if (src_nitems != pknumatts)
1823 0 : ereport(ERROR,
1824 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1825 : errmsg("source key array length must match number of key attributes")));
1826 :
1827 : /*
1828 : * Target array is made up of key values that will be used to build the
1829 : * SQL string for use on the remote system.
1830 : */
1831 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1832 :
1833 : /*
1834 : * There should be one target array key value for each key attnum
1835 : */
1836 4 : if (tgt_nitems != pknumatts)
1837 0 : ereport(ERROR,
1838 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1839 : errmsg("target key array length must match number of key attributes")));
1840 :
1841 : /*
1842 : * Prep work is finally done. Go get the SQL string.
1843 : */
1844 4 : sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1845 :
1846 : /*
1847 : * Now we can close the relation.
1848 : */
1849 4 : relation_close(rel, AccessShareLock);
1850 :
1851 : /*
1852 : * And send it
1853 : */
1854 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1855 : }
1856 :
1857 : /*
1858 : * dblink_current_query
1859 : * return the current query string
1860 : * to allow its use in (among other things)
1861 : * rewrite rules
1862 : */
1863 2 : PG_FUNCTION_INFO_V1(dblink_current_query);
1864 : Datum
1865 0 : dblink_current_query(PG_FUNCTION_ARGS)
1866 : {
1867 : /* This is now just an alias for the built-in function current_query() */
1868 0 : PG_RETURN_DATUM(current_query(fcinfo));
1869 : }
1870 :
1871 : /*
1872 : * Retrieve async notifications for a connection.
1873 : *
1874 : * Returns a setof record of notifications, or an empty set if none received.
1875 : * Can optionally take a named connection as parameter, but uses the unnamed
1876 : * connection per default.
1877 : *
1878 : */
1879 : #define DBLINK_NOTIFY_COLS 3
1880 :
1881 5 : PG_FUNCTION_INFO_V1(dblink_get_notify);
1882 : Datum
1883 2 : dblink_get_notify(PG_FUNCTION_ARGS)
1884 : {
1885 : PGconn *conn;
1886 : PGnotify *notify;
1887 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1888 :
1889 2 : dblink_init();
1890 2 : if (PG_NARGS() == 1)
1891 0 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1892 : else
1893 2 : conn = pconn->conn;
1894 :
1895 2 : InitMaterializedSRF(fcinfo, 0);
1896 :
1897 2 : PQconsumeInput(conn);
1898 4 : while ((notify = PQnotifies(conn)) != NULL)
1899 : {
1900 : Datum values[DBLINK_NOTIFY_COLS];
1901 : bool nulls[DBLINK_NOTIFY_COLS];
1902 :
1903 2 : memset(values, 0, sizeof(values));
1904 2 : memset(nulls, 0, sizeof(nulls));
1905 :
1906 2 : if (notify->relname != NULL)
1907 2 : values[0] = CStringGetTextDatum(notify->relname);
1908 : else
1909 0 : nulls[0] = true;
1910 :
1911 2 : values[1] = Int32GetDatum(notify->be_pid);
1912 :
1913 2 : if (notify->extra != NULL)
1914 2 : values[2] = CStringGetTextDatum(notify->extra);
1915 : else
1916 0 : nulls[2] = true;
1917 :
1918 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1919 :
1920 2 : PQfreemem(notify);
1921 2 : PQconsumeInput(conn);
1922 : }
1923 :
1924 2 : return (Datum) 0;
1925 : }
1926 :
1927 : /*
1928 : * Validate the options given to a dblink foreign server or user mapping.
1929 : * Raise an error if any option is invalid.
1930 : *
1931 : * We just check the names of options here, so semantic errors in options,
1932 : * such as invalid numeric format, will be detected at the attempt to connect.
1933 : */
1934 14 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1935 : Datum
1936 19 : dblink_fdw_validator(PG_FUNCTION_ARGS)
1937 : {
1938 19 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1939 19 : Oid context = PG_GETARG_OID(1);
1940 : ListCell *cell;
1941 :
1942 : static const PQconninfoOption *options = NULL;
1943 :
1944 : /*
1945 : * Get list of valid libpq options.
1946 : *
1947 : * To avoid unnecessary work, we get the list once and use it throughout
1948 : * the lifetime of this backend process. We don't need to care about
1949 : * memory context issues, because PQconndefaults allocates with malloc.
1950 : */
1951 19 : if (!options)
1952 : {
1953 12 : options = PQconndefaults();
1954 12 : if (!options) /* assume reason for failure is OOM */
1955 0 : ereport(ERROR,
1956 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1957 : errmsg("out of memory"),
1958 : errdetail("Could not get libpq's default connection options.")));
1959 : }
1960 :
1961 : /* Validate each supplied option. */
1962 50 : foreach(cell, options_list)
1963 : {
1964 39 : DefElem *def = (DefElem *) lfirst(cell);
1965 :
1966 39 : if (!is_valid_dblink_fdw_option(options, def->defname, context))
1967 : {
1968 : /*
1969 : * Unknown option, or invalid option for the context specified, so
1970 : * complain about it. Provide a hint with a valid option that
1971 : * looks similar, if there is one.
1972 : */
1973 : const PQconninfoOption *opt;
1974 : const char *closest_match;
1975 : ClosestMatchState match_state;
1976 8 : bool has_valid_options = false;
1977 :
1978 8 : initClosestMatch(&match_state, def->defname, 4);
1979 424 : for (opt = options; opt->keyword; opt++)
1980 : {
1981 416 : if (is_valid_dblink_option(options, opt->keyword, context))
1982 : {
1983 93 : has_valid_options = true;
1984 93 : updateClosestMatch(&match_state, opt->keyword);
1985 : }
1986 : }
1987 :
1988 8 : closest_match = getClosestMatch(&match_state);
1989 8 : ereport(ERROR,
1990 : (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
1991 : errmsg("invalid option \"%s\"", def->defname),
1992 : has_valid_options ? closest_match ?
1993 : errhint("Perhaps you meant the option \"%s\".",
1994 : closest_match) : 0 :
1995 : errhint("There are no valid options in this context.")));
1996 : }
1997 : }
1998 :
1999 11 : PG_RETURN_VOID();
2000 : }
2001 :
2002 :
2003 : /*************************************************************
2004 : * internal functions
2005 : */
2006 :
2007 :
2008 : /*
2009 : * get_pkey_attnames
2010 : *
2011 : * Get the primary key attnames for the given relation.
2012 : * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2013 : */
2014 : static char **
2015 3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2016 : {
2017 : Relation indexRelation;
2018 : ScanKeyData skey;
2019 : SysScanDesc scan;
2020 : HeapTuple indexTuple;
2021 : int i;
2022 3 : char **result = NULL;
2023 : TupleDesc tupdesc;
2024 :
2025 : /* initialize indnkeyatts to 0 in case no primary key exists */
2026 3 : *indnkeyatts = 0;
2027 :
2028 3 : tupdesc = rel->rd_att;
2029 :
2030 : /* Prepare to scan pg_index for entries having indrelid = this rel. */
2031 3 : indexRelation = table_open(IndexRelationId, AccessShareLock);
2032 3 : ScanKeyInit(&skey,
2033 : Anum_pg_index_indrelid,
2034 : BTEqualStrategyNumber, F_OIDEQ,
2035 : ObjectIdGetDatum(RelationGetRelid(rel)));
2036 :
2037 3 : scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2038 : NULL, 1, &skey);
2039 :
2040 3 : while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2041 : {
2042 3 : Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2043 :
2044 : /* we're only interested if it is the primary key */
2045 3 : if (index->indisprimary)
2046 : {
2047 3 : *indnkeyatts = index->indnkeyatts;
2048 3 : if (*indnkeyatts > 0)
2049 : {
2050 3 : result = palloc_array(char *, *indnkeyatts);
2051 :
2052 9 : for (i = 0; i < *indnkeyatts; i++)
2053 6 : result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2054 : }
2055 3 : break;
2056 : }
2057 : }
2058 :
2059 3 : systable_endscan(scan);
2060 3 : table_close(indexRelation, AccessShareLock);
2061 :
2062 3 : return result;
2063 : }
2064 :
2065 : /*
2066 : * Deconstruct a text[] into C-strings (note any NULL elements will be
2067 : * returned as NULL pointers)
2068 : */
2069 : static char **
2070 20 : get_text_array_contents(ArrayType *array, int *numitems)
2071 : {
2072 20 : int ndim = ARR_NDIM(array);
2073 20 : int *dims = ARR_DIMS(array);
2074 : int nitems;
2075 : int16 typlen;
2076 : bool typbyval;
2077 : char typalign;
2078 : uint8 typalignby;
2079 : char **values;
2080 : char *ptr;
2081 : uint8 *bitmap;
2082 : int bitmask;
2083 : int i;
2084 :
2085 : Assert(ARR_ELEMTYPE(array) == TEXTOID);
2086 :
2087 20 : *numitems = nitems = ArrayGetNItems(ndim, dims);
2088 :
2089 20 : get_typlenbyvalalign(ARR_ELEMTYPE(array),
2090 : &typlen, &typbyval, &typalign);
2091 20 : typalignby = typalign_to_alignby(typalign);
2092 :
2093 20 : values = palloc_array(char *, nitems);
2094 :
2095 20 : ptr = ARR_DATA_PTR(array);
2096 20 : bitmap = ARR_NULLBITMAP(array);
2097 20 : bitmask = 1;
2098 :
2099 55 : for (i = 0; i < nitems; i++)
2100 : {
2101 35 : if (bitmap && (*bitmap & bitmask) == 0)
2102 : {
2103 0 : values[i] = NULL;
2104 : }
2105 : else
2106 : {
2107 35 : values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2108 35 : ptr = att_addlength_pointer(ptr, typlen, ptr);
2109 35 : ptr = (char *) att_nominal_alignby(ptr, typalignby);
2110 : }
2111 :
2112 : /* advance bitmap pointer if any */
2113 35 : if (bitmap)
2114 : {
2115 0 : bitmask <<= 1;
2116 0 : if (bitmask == 0x100)
2117 : {
2118 0 : bitmap++;
2119 0 : bitmask = 1;
2120 : }
2121 : }
2122 : }
2123 :
2124 20 : return values;
2125 : }
2126 :
2127 : static char *
2128 4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2129 : {
2130 : char *relname;
2131 : HeapTuple tuple;
2132 : TupleDesc tupdesc;
2133 : int natts;
2134 : StringInfoData buf;
2135 : char *val;
2136 : int key;
2137 : int i;
2138 : bool needComma;
2139 :
2140 4 : initStringInfo(&buf);
2141 :
2142 : /* get relation name including any needed schema prefix and quoting */
2143 4 : relname = generate_relation_name(rel);
2144 :
2145 4 : tupdesc = rel->rd_att;
2146 4 : natts = tupdesc->natts;
2147 :
2148 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2149 4 : if (!tuple)
2150 0 : ereport(ERROR,
2151 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2152 : errmsg("source row not found")));
2153 :
2154 4 : appendStringInfo(&buf, "INSERT INTO %s(", relname);
2155 :
2156 4 : needComma = false;
2157 19 : for (i = 0; i < natts; i++)
2158 : {
2159 15 : Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2160 :
2161 15 : if (att->attisdropped)
2162 2 : continue;
2163 :
2164 13 : if (needComma)
2165 9 : appendStringInfoChar(&buf, ',');
2166 :
2167 13 : appendStringInfoString(&buf,
2168 13 : quote_ident_cstr(NameStr(att->attname)));
2169 13 : needComma = true;
2170 : }
2171 :
2172 4 : appendStringInfoString(&buf, ") VALUES(");
2173 :
2174 : /*
2175 : * Note: i is physical column number (counting from 0).
2176 : */
2177 4 : needComma = false;
2178 19 : for (i = 0; i < natts; i++)
2179 : {
2180 15 : if (TupleDescAttr(tupdesc, i)->attisdropped)
2181 2 : continue;
2182 :
2183 13 : if (needComma)
2184 9 : appendStringInfoChar(&buf, ',');
2185 :
2186 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2187 :
2188 13 : if (key >= 0)
2189 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2190 : else
2191 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2192 :
2193 13 : if (val != NULL)
2194 : {
2195 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
2196 13 : pfree(val);
2197 : }
2198 : else
2199 0 : appendStringInfoString(&buf, "NULL");
2200 13 : needComma = true;
2201 : }
2202 4 : appendStringInfoChar(&buf, ')');
2203 :
2204 4 : return buf.data;
2205 : }
2206 :
2207 : static char *
2208 4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2209 : {
2210 : char *relname;
2211 : TupleDesc tupdesc;
2212 : StringInfoData buf;
2213 : int i;
2214 :
2215 4 : initStringInfo(&buf);
2216 :
2217 : /* get relation name including any needed schema prefix and quoting */
2218 4 : relname = generate_relation_name(rel);
2219 :
2220 4 : tupdesc = rel->rd_att;
2221 :
2222 4 : appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2223 11 : for (i = 0; i < pknumatts; i++)
2224 : {
2225 7 : int pkattnum = pkattnums[i];
2226 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2227 :
2228 7 : if (i > 0)
2229 3 : appendStringInfoString(&buf, " AND ");
2230 :
2231 7 : appendStringInfoString(&buf,
2232 7 : quote_ident_cstr(NameStr(attr->attname)));
2233 :
2234 7 : if (tgt_pkattvals[i] != NULL)
2235 7 : appendStringInfo(&buf, " = %s",
2236 7 : quote_literal_cstr(tgt_pkattvals[i]));
2237 : else
2238 0 : appendStringInfoString(&buf, " IS NULL");
2239 : }
2240 :
2241 4 : return buf.data;
2242 : }
2243 :
2244 : static char *
2245 4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2246 : {
2247 : char *relname;
2248 : HeapTuple tuple;
2249 : TupleDesc tupdesc;
2250 : int natts;
2251 : StringInfoData buf;
2252 : char *val;
2253 : int key;
2254 : int i;
2255 : bool needComma;
2256 :
2257 4 : initStringInfo(&buf);
2258 :
2259 : /* get relation name including any needed schema prefix and quoting */
2260 4 : relname = generate_relation_name(rel);
2261 :
2262 4 : tupdesc = rel->rd_att;
2263 4 : natts = tupdesc->natts;
2264 :
2265 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2266 4 : if (!tuple)
2267 0 : ereport(ERROR,
2268 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2269 : errmsg("source row not found")));
2270 :
2271 4 : appendStringInfo(&buf, "UPDATE %s SET ", relname);
2272 :
2273 : /*
2274 : * Note: i is physical column number (counting from 0).
2275 : */
2276 4 : needComma = false;
2277 19 : for (i = 0; i < natts; i++)
2278 : {
2279 15 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2280 :
2281 15 : if (attr->attisdropped)
2282 2 : continue;
2283 :
2284 13 : if (needComma)
2285 9 : appendStringInfoString(&buf, ", ");
2286 :
2287 13 : appendStringInfo(&buf, "%s = ",
2288 13 : quote_ident_cstr(NameStr(attr->attname)));
2289 :
2290 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2291 :
2292 13 : if (key >= 0)
2293 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2294 : else
2295 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2296 :
2297 13 : if (val != NULL)
2298 : {
2299 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
2300 13 : pfree(val);
2301 : }
2302 : else
2303 0 : appendStringInfoString(&buf, "NULL");
2304 13 : needComma = true;
2305 : }
2306 :
2307 4 : appendStringInfoString(&buf, " WHERE ");
2308 :
2309 11 : for (i = 0; i < pknumatts; i++)
2310 : {
2311 7 : int pkattnum = pkattnums[i];
2312 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2313 :
2314 7 : if (i > 0)
2315 3 : appendStringInfoString(&buf, " AND ");
2316 :
2317 7 : appendStringInfoString(&buf,
2318 7 : quote_ident_cstr(NameStr(attr->attname)));
2319 :
2320 7 : val = tgt_pkattvals[i];
2321 :
2322 7 : if (val != NULL)
2323 7 : appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2324 : else
2325 0 : appendStringInfoString(&buf, " IS NULL");
2326 : }
2327 :
2328 4 : return buf.data;
2329 : }
2330 :
2331 : /*
2332 : * Return a properly quoted identifier.
2333 : * Uses quote_ident in quote.c
2334 : */
2335 : static char *
2336 80 : quote_ident_cstr(char *rawstr)
2337 : {
2338 : text *rawstr_text;
2339 : text *result_text;
2340 : char *result;
2341 :
2342 80 : rawstr_text = cstring_to_text(rawstr);
2343 80 : result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2344 : PointerGetDatum(rawstr_text)));
2345 80 : result = text_to_cstring(result_text);
2346 :
2347 80 : return result;
2348 : }
2349 :
2350 : static int
2351 26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2352 : {
2353 : int i;
2354 :
2355 : /*
2356 : * Not likely a long list anyway, so just scan for the value
2357 : */
2358 50 : for (i = 0; i < pknumatts; i++)
2359 38 : if (key == pkattnums[i])
2360 14 : return i;
2361 :
2362 12 : return -1;
2363 : }
2364 :
2365 : static HeapTuple
2366 8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2367 : {
2368 : char *relname;
2369 : TupleDesc tupdesc;
2370 : int natts;
2371 : StringInfoData buf;
2372 : int ret;
2373 : HeapTuple tuple;
2374 : int i;
2375 :
2376 : /*
2377 : * Connect to SPI manager
2378 : */
2379 8 : SPI_connect();
2380 :
2381 8 : initStringInfo(&buf);
2382 :
2383 : /* get relation name including any needed schema prefix and quoting */
2384 8 : relname = generate_relation_name(rel);
2385 :
2386 8 : tupdesc = rel->rd_att;
2387 8 : natts = tupdesc->natts;
2388 :
2389 : /*
2390 : * Build sql statement to look up tuple of interest, ie, the one matching
2391 : * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2392 : * generate a result tuple that matches the table's physical structure,
2393 : * with NULLs for any dropped columns. Otherwise we have to deal with two
2394 : * different tupdescs and everything's very confusing.
2395 : */
2396 8 : appendStringInfoString(&buf, "SELECT ");
2397 :
2398 38 : for (i = 0; i < natts; i++)
2399 : {
2400 30 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2401 :
2402 30 : if (i > 0)
2403 22 : appendStringInfoString(&buf, ", ");
2404 :
2405 30 : if (attr->attisdropped)
2406 4 : appendStringInfoString(&buf, "NULL");
2407 : else
2408 26 : appendStringInfoString(&buf,
2409 26 : quote_ident_cstr(NameStr(attr->attname)));
2410 : }
2411 :
2412 8 : appendStringInfo(&buf, " FROM %s WHERE ", relname);
2413 :
2414 22 : for (i = 0; i < pknumatts; i++)
2415 : {
2416 14 : int pkattnum = pkattnums[i];
2417 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2418 :
2419 14 : if (i > 0)
2420 6 : appendStringInfoString(&buf, " AND ");
2421 :
2422 14 : appendStringInfoString(&buf,
2423 14 : quote_ident_cstr(NameStr(attr->attname)));
2424 :
2425 14 : if (src_pkattvals[i] != NULL)
2426 14 : appendStringInfo(&buf, " = %s",
2427 14 : quote_literal_cstr(src_pkattvals[i]));
2428 : else
2429 0 : appendStringInfoString(&buf, " IS NULL");
2430 : }
2431 :
2432 : /*
2433 : * Retrieve the desired tuple
2434 : */
2435 8 : ret = SPI_exec(buf.data, 0);
2436 8 : pfree(buf.data);
2437 :
2438 : /*
2439 : * Only allow one qualifying tuple
2440 : */
2441 8 : if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2442 0 : ereport(ERROR,
2443 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2444 : errmsg("source criteria matched more than one record")));
2445 :
2446 8 : else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2447 : {
2448 8 : SPITupleTable *tuptable = SPI_tuptable;
2449 :
2450 8 : tuple = SPI_copytuple(tuptable->vals[0]);
2451 8 : SPI_finish();
2452 :
2453 8 : return tuple;
2454 : }
2455 : else
2456 : {
2457 : /*
2458 : * no qualifying tuples
2459 : */
2460 0 : SPI_finish();
2461 :
2462 0 : return NULL;
2463 : }
2464 :
2465 : /*
2466 : * never reached, but keep compiler quiet
2467 : */
2468 : return NULL;
2469 : }
2470 :
2471 : static void
2472 21 : RangeVarCallbackForDblink(const RangeVar *relation,
2473 : Oid relId, Oid oldRelId, void *arg)
2474 : {
2475 : AclResult aclresult;
2476 :
2477 21 : if (!OidIsValid(relId))
2478 0 : return;
2479 :
2480 21 : aclresult = pg_class_aclcheck(relId, GetUserId(), *((AclMode *) arg));
2481 21 : if (aclresult != ACLCHECK_OK)
2482 0 : aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relId)),
2483 0 : relation->relname);
2484 : }
2485 :
2486 : /*
2487 : * Open the relation named by relname_text, acquire specified type of lock,
2488 : * verify we have specified permissions.
2489 : * Caller must close rel when done with it.
2490 : */
2491 : static Relation
2492 21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2493 : {
2494 : RangeVar *relvar;
2495 : Oid relid;
2496 :
2497 21 : relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2498 21 : relid = RangeVarGetRelidExtended(relvar, lockmode, 0,
2499 : RangeVarCallbackForDblink, &aclmode);
2500 :
2501 21 : return table_open(relid, NoLock);
2502 : }
2503 :
2504 : /*
2505 : * generate_relation_name - copied from ruleutils.c
2506 : * Compute the name to display for a relation
2507 : *
2508 : * The result includes all necessary quoting and schema-prefixing.
2509 : */
2510 : static char *
2511 20 : generate_relation_name(Relation rel)
2512 : {
2513 : char *nspname;
2514 : char *result;
2515 :
2516 : /* Qualify the name if not visible in search path */
2517 20 : if (RelationIsVisible(RelationGetRelid(rel)))
2518 15 : nspname = NULL;
2519 : else
2520 5 : nspname = get_namespace_name(rel->rd_rel->relnamespace);
2521 :
2522 20 : result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2523 :
2524 20 : return result;
2525 : }
2526 :
2527 :
2528 : static remoteConn *
2529 79 : getConnectionByName(const char *name)
2530 : {
2531 : remoteConnHashEnt *hentry;
2532 : char *key;
2533 :
2534 79 : if (!remoteConnHash)
2535 6 : remoteConnHash = createConnHash();
2536 :
2537 79 : key = pstrdup(name);
2538 79 : truncate_identifier(key, strlen(key), false);
2539 79 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2540 : key, HASH_FIND, NULL);
2541 :
2542 79 : if (hentry && hentry->rconn.conn != NULL)
2543 68 : return &hentry->rconn;
2544 :
2545 11 : return NULL;
2546 : }
2547 :
2548 : static HTAB *
2549 7 : createConnHash(void)
2550 : {
2551 : HASHCTL ctl;
2552 :
2553 7 : ctl.keysize = NAMEDATALEN;
2554 7 : ctl.entrysize = sizeof(remoteConnHashEnt);
2555 :
2556 7 : return hash_create("Remote Con hash", NUMCONN, &ctl,
2557 : HASH_ELEM | HASH_STRINGS);
2558 : }
2559 :
2560 : static remoteConn *
2561 10 : createNewConnection(const char *name)
2562 : {
2563 : remoteConnHashEnt *hentry;
2564 : bool found;
2565 : char *key;
2566 :
2567 10 : if (!remoteConnHash)
2568 1 : remoteConnHash = createConnHash();
2569 :
2570 10 : key = pstrdup(name);
2571 10 : truncate_identifier(key, strlen(key), true);
2572 10 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2573 : HASH_ENTER, &found);
2574 :
2575 10 : if (found && hentry->rconn.conn != NULL)
2576 1 : ereport(ERROR,
2577 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2578 : errmsg("duplicate connection name")));
2579 :
2580 : /* New, or reusable, so initialize the rconn struct to zeroes */
2581 9 : memset(&hentry->rconn, 0, sizeof(remoteConn));
2582 :
2583 9 : return &hentry->rconn;
2584 : }
2585 :
2586 : static void
2587 8 : deleteConnection(const char *name)
2588 : {
2589 : remoteConnHashEnt *hentry;
2590 : bool found;
2591 : char *key;
2592 :
2593 8 : if (!remoteConnHash)
2594 0 : remoteConnHash = createConnHash();
2595 :
2596 8 : key = pstrdup(name);
2597 8 : truncate_identifier(key, strlen(key), false);
2598 8 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2599 : key, HASH_REMOVE, &found);
2600 :
2601 8 : if (!hentry)
2602 0 : ereport(ERROR,
2603 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2604 : errmsg("undefined connection name")));
2605 8 : }
2606 :
2607 : /*
2608 : * Ensure that require_auth and SCRAM keys are correctly set on connstr.
2609 : * SCRAM keys used to pass-through are coming from the initial connection
2610 : * from the client with the server.
2611 : *
2612 : * All required SCRAM options are set by dblink, so we just need to ensure
2613 : * that these options are not overwritten by the user.
2614 : *
2615 : * See appendSCRAMKeysInfo and its usage for more.
2616 : */
2617 : bool
2618 6 : dblink_connstr_has_required_scram_options(const char *connstr)
2619 : {
2620 : PQconninfoOption *options;
2621 6 : bool has_scram_server_key = false;
2622 6 : bool has_scram_client_key = false;
2623 6 : bool has_require_auth = false;
2624 6 : bool has_scram_keys = false;
2625 :
2626 6 : options = PQconninfoParse(connstr, NULL);
2627 6 : if (options)
2628 : {
2629 : /*
2630 : * Continue iterating even if we found the keys that we need to
2631 : * validate to make sure that there is no other declaration of these
2632 : * keys that can overwrite the first.
2633 : */
2634 318 : for (PQconninfoOption *option = options; option->keyword != NULL; option++)
2635 : {
2636 312 : if (strcmp(option->keyword, "require_auth") == 0)
2637 : {
2638 6 : if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
2639 5 : has_require_auth = true;
2640 : else
2641 1 : has_require_auth = false;
2642 : }
2643 :
2644 312 : if (strcmp(option->keyword, "scram_client_key") == 0)
2645 : {
2646 6 : if (option->val != NULL && option->val[0] != '\0')
2647 6 : has_scram_client_key = true;
2648 : else
2649 0 : has_scram_client_key = false;
2650 : }
2651 :
2652 312 : if (strcmp(option->keyword, "scram_server_key") == 0)
2653 : {
2654 6 : if (option->val != NULL && option->val[0] != '\0')
2655 6 : has_scram_server_key = true;
2656 : else
2657 0 : has_scram_server_key = false;
2658 : }
2659 : }
2660 6 : PQconninfoFree(options);
2661 : }
2662 :
2663 6 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
2664 :
2665 6 : return (has_scram_keys && has_require_auth);
2666 : }
2667 :
2668 : /*
2669 : * We need to make sure that the connection made used credentials
2670 : * which were provided by the user, so check what credentials were
2671 : * used to connect and then make sure that they came from the user.
2672 : *
2673 : * On failure, we close "conn" and also delete the hashtable entry
2674 : * identified by "connname" (if that's not NULL).
2675 : */
2676 : static void
2677 20 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
2678 : {
2679 : /* Superuser bypasses security check */
2680 20 : if (superuser())
2681 18 : return;
2682 :
2683 : /* If password was used to connect, make sure it was one provided */
2684 2 : if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
2685 0 : return;
2686 :
2687 : /*
2688 : * Password was not used to connect, check if SCRAM pass-through is in
2689 : * use.
2690 : *
2691 : * If dblink_connstr_has_required_scram_options is true we assume that
2692 : * UseScramPassthrough is also true because the required SCRAM keys are
2693 : * only added if UseScramPassthrough is set, and the user is not allowed
2694 : * to add the SCRAM keys on fdw and user mapping options.
2695 : */
2696 2 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2697 2 : return;
2698 :
2699 : #ifdef ENABLE_GSS
2700 : /* If GSSAPI creds used to connect, make sure it was one delegated */
2701 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
2702 : return;
2703 : #endif
2704 :
2705 : /* Otherwise, fail out */
2706 0 : libpqsrv_disconnect(conn);
2707 0 : if (connname)
2708 0 : deleteConnection(connname);
2709 :
2710 0 : ereport(ERROR,
2711 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2712 : errmsg("password or GSSAPI delegated credentials required"),
2713 : errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2714 : errhint("Ensure provided credentials match target server's authentication method.")));
2715 : }
2716 :
2717 : /*
2718 : * Function to check if the connection string includes an explicit
2719 : * password, needed to ensure that non-superuser password-based auth
2720 : * is using a provided password and not one picked up from the
2721 : * environment.
2722 : */
2723 : static bool
2724 7 : dblink_connstr_has_pw(const char *connstr)
2725 : {
2726 : PQconninfoOption *options;
2727 : PQconninfoOption *option;
2728 7 : bool connstr_gives_password = false;
2729 :
2730 7 : options = PQconninfoParse(connstr, NULL);
2731 7 : if (options)
2732 : {
2733 371 : for (option = options; option->keyword != NULL; option++)
2734 : {
2735 364 : if (strcmp(option->keyword, "password") == 0)
2736 : {
2737 7 : if (option->val != NULL && option->val[0] != '\0')
2738 : {
2739 0 : connstr_gives_password = true;
2740 0 : break;
2741 : }
2742 : }
2743 : }
2744 7 : PQconninfoFree(options);
2745 : }
2746 :
2747 7 : return connstr_gives_password;
2748 : }
2749 :
2750 : /*
2751 : * For non-superusers, insist that the connstr specify a password, except if
2752 : * GSSAPI credentials have been delegated (and we check that they are used for
2753 : * the connection in dblink_security_check later) or if SCRAM pass-through is
2754 : * being used. This prevents a password or GSSAPI credentials from being
2755 : * picked up from .pgpass, a service file, the environment, etc. We don't want
2756 : * the postgres user's passwords or Kerberos credentials to be accessible to
2757 : * non-superusers. In case of SCRAM pass-through insist that the connstr
2758 : * has the required SCRAM pass-through options.
2759 : */
2760 : static void
2761 26 : dblink_connstr_check(const char *connstr)
2762 : {
2763 26 : if (superuser())
2764 21 : return;
2765 :
2766 5 : if (dblink_connstr_has_pw(connstr))
2767 0 : return;
2768 :
2769 5 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2770 3 : return;
2771 :
2772 : #ifdef ENABLE_GSS
2773 : if (be_gssapi_get_delegation(MyProcPort))
2774 : return;
2775 : #endif
2776 :
2777 2 : ereport(ERROR,
2778 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2779 : errmsg("password or GSSAPI delegated credentials required"),
2780 : errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2781 : }
2782 :
2783 : /*
2784 : * Report an error received from the remote server
2785 : *
2786 : * res: the received error result
2787 : * fail: true for ERROR ereport, false for NOTICE
2788 : * fmt and following args: sprintf-style format and values for errcontext;
2789 : * the resulting string should be worded like "while <some action>"
2790 : *
2791 : * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
2792 : * in which case memory context cleanup will clear it eventually).
2793 : */
2794 : static void
2795 12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2796 : bool fail, const char *fmt,...)
2797 : {
2798 : int level;
2799 12 : char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2800 12 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2801 12 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2802 12 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2803 12 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2804 : int sqlstate;
2805 : va_list ap;
2806 : char dblink_context_msg[512];
2807 :
2808 12 : if (fail)
2809 3 : level = ERROR;
2810 : else
2811 9 : level = NOTICE;
2812 :
2813 12 : if (pg_diag_sqlstate)
2814 12 : sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2815 : pg_diag_sqlstate[1],
2816 : pg_diag_sqlstate[2],
2817 : pg_diag_sqlstate[3],
2818 : pg_diag_sqlstate[4]);
2819 : else
2820 0 : sqlstate = ERRCODE_CONNECTION_FAILURE;
2821 :
2822 : /*
2823 : * If we don't get a message from the PGresult, try the PGconn. This is
2824 : * needed because for connection-level failures, PQgetResult may just
2825 : * return NULL, not a PGresult at all.
2826 : */
2827 12 : if (message_primary == NULL)
2828 0 : message_primary = pchomp(PQerrorMessage(conn));
2829 :
2830 : /*
2831 : * Format the basic errcontext string. Below, we'll add on something
2832 : * about the connection name. That's a violation of the translatability
2833 : * guidelines about constructing error messages out of parts, but since
2834 : * there's no translation support for dblink, there's no need to worry
2835 : * about that (yet).
2836 : */
2837 12 : va_start(ap, fmt);
2838 12 : vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2839 12 : va_end(ap);
2840 :
2841 12 : ereport(level,
2842 : (errcode(sqlstate),
2843 : (message_primary != NULL && message_primary[0] != '\0') ?
2844 : errmsg_internal("%s", message_primary) :
2845 : errmsg("could not obtain message string for remote error"),
2846 : message_detail ? errdetail_internal("%s", message_detail) : 0,
2847 : message_hint ? errhint("%s", message_hint) : 0,
2848 : message_context ? (errcontext("%s", message_context)) : 0,
2849 : conname ?
2850 : (errcontext("%s on dblink connection named \"%s\"",
2851 : dblink_context_msg, conname)) :
2852 : (errcontext("%s on unnamed dblink connection",
2853 : dblink_context_msg))));
2854 9 : PQclear(res);
2855 9 : }
2856 :
2857 : /*
2858 : * Obtain connection string for a foreign server
2859 : */
2860 : static char *
2861 26 : get_connect_string(const char *servername)
2862 : {
2863 26 : ForeignServer *foreign_server = NULL;
2864 : UserMapping *user_mapping;
2865 : ListCell *cell;
2866 : StringInfoData buf;
2867 : ForeignDataWrapper *fdw;
2868 : AclResult aclresult;
2869 : char *srvname;
2870 :
2871 : static const PQconninfoOption *options = NULL;
2872 :
2873 26 : initStringInfo(&buf);
2874 :
2875 : /*
2876 : * Get list of valid libpq options.
2877 : *
2878 : * To avoid unnecessary work, we get the list once and use it throughout
2879 : * the lifetime of this backend process. We don't need to care about
2880 : * memory context issues, because PQconndefaults allocates with malloc.
2881 : */
2882 26 : if (!options)
2883 : {
2884 7 : options = PQconndefaults();
2885 7 : if (!options) /* assume reason for failure is OOM */
2886 0 : ereport(ERROR,
2887 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2888 : errmsg("out of memory"),
2889 : errdetail("Could not get libpq's default connection options.")));
2890 : }
2891 :
2892 : /* first gather the server connstr options */
2893 26 : srvname = pstrdup(servername);
2894 26 : truncate_identifier(srvname, strlen(srvname), false);
2895 26 : foreign_server = GetForeignServerByName(srvname, true);
2896 :
2897 26 : if (foreign_server)
2898 : {
2899 6 : Oid serverid = foreign_server->serverid;
2900 6 : Oid fdwid = foreign_server->fdwid;
2901 6 : Oid userid = GetUserId();
2902 :
2903 6 : user_mapping = GetUserMapping(userid, serverid);
2904 6 : fdw = GetForeignDataWrapper(fdwid);
2905 :
2906 : /* Check permissions, user must have usage on the server. */
2907 6 : aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2908 6 : if (aclresult != ACLCHECK_OK)
2909 0 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2910 :
2911 : /*
2912 : * First append hardcoded options needed for SCRAM pass-through, so if
2913 : * the user overwrites these options we can ereport on
2914 : * dblink_connstr_check and dblink_security_check.
2915 : */
2916 6 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
2917 4 : appendSCRAMKeysInfo(&buf);
2918 :
2919 6 : foreach(cell, fdw->options)
2920 : {
2921 0 : DefElem *def = lfirst(cell);
2922 :
2923 0 : if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2924 0 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2925 0 : escape_param_str(strVal(def->arg)));
2926 : }
2927 :
2928 27 : foreach(cell, foreign_server->options)
2929 : {
2930 21 : DefElem *def = lfirst(cell);
2931 :
2932 21 : if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2933 17 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2934 17 : escape_param_str(strVal(def->arg)));
2935 : }
2936 :
2937 12 : foreach(cell, user_mapping->options)
2938 : {
2939 :
2940 6 : DefElem *def = lfirst(cell);
2941 :
2942 6 : if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2943 6 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2944 6 : escape_param_str(strVal(def->arg)));
2945 : }
2946 :
2947 6 : return buf.data;
2948 : }
2949 : else
2950 20 : return NULL;
2951 : }
2952 :
2953 : /*
2954 : * Escaping libpq connect parameter strings.
2955 : *
2956 : * Replaces "'" with "\'" and "\" with "\\".
2957 : */
2958 : static char *
2959 23 : escape_param_str(const char *str)
2960 : {
2961 : const char *cp;
2962 : StringInfoData buf;
2963 :
2964 23 : initStringInfo(&buf);
2965 :
2966 205 : for (cp = str; *cp; cp++)
2967 : {
2968 182 : if (*cp == '\\' || *cp == '\'')
2969 0 : appendStringInfoChar(&buf, '\\');
2970 182 : appendStringInfoChar(&buf, *cp);
2971 : }
2972 :
2973 23 : return buf.data;
2974 : }
2975 :
2976 : /*
2977 : * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2978 : * functions, and translate to the internal representation.
2979 : *
2980 : * The user supplies an int2vector of 1-based logical attnums, plus a count
2981 : * argument (the need for the separate count argument is historical, but we
2982 : * still check it). We check that each attnum corresponds to a valid,
2983 : * non-dropped attribute of the rel. We do *not* prevent attnums from being
2984 : * listed twice, though the actual use-case for such things is dubious.
2985 : * Note that before Postgres 9.0, the user's attnums were interpreted as
2986 : * physical not logical column numbers; this was changed for future-proofing.
2987 : *
2988 : * The internal representation is a palloc'd int array of 0-based physical
2989 : * attnums.
2990 : */
2991 : static void
2992 18 : validate_pkattnums(Relation rel,
2993 : int2vector *pkattnums_arg, int32 pknumatts_arg,
2994 : int **pkattnums, int *pknumatts)
2995 : {
2996 18 : TupleDesc tupdesc = rel->rd_att;
2997 18 : int natts = tupdesc->natts;
2998 : int i;
2999 :
3000 : /* Don't take more array elements than there are */
3001 18 : pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
3002 :
3003 : /* Must have at least one pk attnum selected */
3004 18 : if (pknumatts_arg <= 0)
3005 0 : ereport(ERROR,
3006 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3007 : errmsg("number of key attributes must be > 0")));
3008 :
3009 : /* Allocate output array */
3010 18 : *pkattnums = palloc_array(int, pknumatts_arg);
3011 18 : *pknumatts = pknumatts_arg;
3012 :
3013 : /* Validate attnums and convert to internal form */
3014 57 : for (i = 0; i < pknumatts_arg; i++)
3015 : {
3016 45 : int pkattnum = pkattnums_arg->values[i];
3017 : int lnum;
3018 : int j;
3019 :
3020 : /* Can throw error immediately if out of range */
3021 45 : if (pkattnum <= 0 || pkattnum > natts)
3022 6 : ereport(ERROR,
3023 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3024 : errmsg("invalid attribute number %d", pkattnum)));
3025 :
3026 : /* Identify which physical column has this logical number */
3027 39 : lnum = 0;
3028 69 : for (j = 0; j < natts; j++)
3029 : {
3030 : /* dropped columns don't count */
3031 69 : if (TupleDescCompactAttr(tupdesc, j)->attisdropped)
3032 3 : continue;
3033 :
3034 66 : if (++lnum == pkattnum)
3035 39 : break;
3036 : }
3037 :
3038 39 : if (j < natts)
3039 39 : (*pkattnums)[i] = j;
3040 : else
3041 0 : ereport(ERROR,
3042 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3043 : errmsg("invalid attribute number %d", pkattnum)));
3044 : }
3045 12 : }
3046 :
3047 : /*
3048 : * Check if the specified connection option is valid.
3049 : *
3050 : * We basically allow whatever libpq thinks is an option, with these
3051 : * restrictions:
3052 : * debug options: disallowed
3053 : * "client_encoding": disallowed
3054 : * "user": valid only in USER MAPPING options
3055 : * secure options (eg password): valid only in USER MAPPING options
3056 : * others: valid only in FOREIGN SERVER options
3057 : *
3058 : * We disallow client_encoding because it would be overridden anyway via
3059 : * PQclientEncoding; allowing it to be specified would merely promote
3060 : * confusion.
3061 : */
3062 : static bool
3063 478 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3064 : Oid context)
3065 : {
3066 : const PQconninfoOption *opt;
3067 :
3068 : /* Look up the option in libpq result */
3069 12050 : for (opt = options; opt->keyword; opt++)
3070 : {
3071 12044 : if (strcmp(opt->keyword, option) == 0)
3072 472 : break;
3073 : }
3074 478 : if (opt->keyword == NULL)
3075 6 : return false;
3076 :
3077 : /* Disallow debug options (particularly "replication") */
3078 472 : if (strchr(opt->dispchar, 'D'))
3079 34 : return false;
3080 :
3081 : /* Disallow "client_encoding" */
3082 438 : if (strcmp(opt->keyword, "client_encoding") == 0)
3083 8 : return false;
3084 :
3085 : /*
3086 : * Disallow OAuth options for now, since the builtin flow communicates on
3087 : * stderr by default and can't cache tokens yet.
3088 : */
3089 430 : if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
3090 44 : return false;
3091 :
3092 : /*
3093 : * If the option is "user" or marked secure, it should be specified only
3094 : * in USER MAPPING. Others should be specified only in SERVER.
3095 : */
3096 386 : if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3097 : {
3098 38 : if (context != UserMappingRelationId)
3099 9 : return false;
3100 : }
3101 : else
3102 : {
3103 348 : if (context != ForeignServerRelationId)
3104 234 : return false;
3105 : }
3106 :
3107 143 : return true;
3108 : }
3109 :
3110 : /*
3111 : * Same as is_valid_dblink_option but also check for only dblink_fdw specific
3112 : * options.
3113 : */
3114 : static bool
3115 39 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
3116 : Oid context)
3117 : {
3118 39 : if (strcmp(option, "use_scram_passthrough") == 0)
3119 4 : return true;
3120 :
3121 35 : return is_valid_dblink_option(options, option, context);
3122 : }
3123 :
3124 : /*
3125 : * Copy the remote session's values of GUCs that affect datatype I/O
3126 : * and apply them locally in a new GUC nesting level. Returns the new
3127 : * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3128 : * or -1 if no new nestlevel was needed.
3129 : *
3130 : * We use the equivalent of a function SET option to allow the settings to
3131 : * persist only until the caller calls restoreLocalGucs. If an error is
3132 : * thrown in between, guc.c will take care of undoing the settings.
3133 : */
3134 : static int
3135 34 : applyRemoteGucs(PGconn *conn)
3136 : {
3137 : static const char *const GUCsAffectingIO[] = {
3138 : "DateStyle",
3139 : "IntervalStyle"
3140 : };
3141 :
3142 34 : int nestlevel = -1;
3143 : int i;
3144 :
3145 102 : for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3146 : {
3147 68 : const char *gucName = GUCsAffectingIO[i];
3148 68 : const char *remoteVal = PQparameterStatus(conn, gucName);
3149 : const char *localVal;
3150 :
3151 : /*
3152 : * If the remote server is pre-8.4, it won't have IntervalStyle, but
3153 : * that's okay because its output format won't be ambiguous. So just
3154 : * skip the GUC if we don't get a value for it. (We might eventually
3155 : * need more complicated logic with remote-version checks here.)
3156 : */
3157 68 : if (remoteVal == NULL)
3158 0 : continue;
3159 :
3160 : /*
3161 : * Avoid GUC-setting overhead if the remote and local GUCs already
3162 : * have the same value.
3163 : */
3164 68 : localVal = GetConfigOption(gucName, false, false);
3165 : Assert(localVal != NULL);
3166 :
3167 68 : if (strcmp(remoteVal, localVal) == 0)
3168 51 : continue;
3169 :
3170 : /* Create new GUC nest level if we didn't already */
3171 17 : if (nestlevel < 0)
3172 9 : nestlevel = NewGUCNestLevel();
3173 :
3174 : /* Apply the option (this will throw error on failure) */
3175 17 : (void) set_config_option(gucName, remoteVal,
3176 : PGC_USERSET, PGC_S_SESSION,
3177 : GUC_ACTION_SAVE, true, 0, false);
3178 : }
3179 :
3180 34 : return nestlevel;
3181 : }
3182 :
3183 : /*
3184 : * Restore local GUCs after they have been overlaid with remote settings.
3185 : */
3186 : static void
3187 35 : restoreLocalGucs(int nestlevel)
3188 : {
3189 : /* Do nothing if no new nestlevel was created */
3190 35 : if (nestlevel > 0)
3191 8 : AtEOXact_GUC(true, nestlevel);
3192 35 : }
3193 :
3194 : /*
3195 : * Append SCRAM client key and server key information from the global
3196 : * MyProcPort into the given StringInfo buffer.
3197 : */
3198 : static void
3199 4 : appendSCRAMKeysInfo(StringInfo buf)
3200 : {
3201 : int len;
3202 : int encoded_len;
3203 : char *client_key;
3204 : char *server_key;
3205 :
3206 4 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
3207 : /* don't forget the zero-terminator */
3208 4 : client_key = palloc0(len + 1);
3209 4 : encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
3210 : sizeof(MyProcPort->scram_ClientKey),
3211 : client_key, len);
3212 4 : if (encoded_len < 0)
3213 0 : elog(ERROR, "could not encode SCRAM client key");
3214 :
3215 4 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
3216 : /* don't forget the zero-terminator */
3217 4 : server_key = palloc0(len + 1);
3218 4 : encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
3219 : sizeof(MyProcPort->scram_ServerKey),
3220 : server_key, len);
3221 4 : if (encoded_len < 0)
3222 0 : elog(ERROR, "could not encode SCRAM server key");
3223 :
3224 4 : appendStringInfo(buf, "scram_client_key='%s' ", client_key);
3225 4 : appendStringInfo(buf, "scram_server_key='%s' ", server_key);
3226 4 : appendStringInfoString(buf, "require_auth='scram-sha-256' ");
3227 :
3228 4 : pfree(client_key);
3229 4 : pfree(server_key);
3230 4 : }
3231 :
3232 :
3233 : static bool
3234 4 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
3235 : {
3236 : ListCell *cell;
3237 :
3238 16 : foreach(cell, foreign_server->options)
3239 : {
3240 16 : DefElem *def = lfirst(cell);
3241 :
3242 16 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3243 4 : return defGetBoolean(def);
3244 : }
3245 :
3246 0 : foreach(cell, user->options)
3247 : {
3248 0 : DefElem *def = (DefElem *) lfirst(cell);
3249 :
3250 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3251 0 : return defGetBoolean(def);
3252 : }
3253 :
3254 0 : return false;
3255 : }
|