Line data Source code
1 : /*
2 : * dblink.c
3 : *
4 : * Functions returning results from a remote database
5 : *
6 : * Joe Conway <mail@joeconway.com>
7 : * And contributors:
8 : * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 : * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 : *
11 : * contrib/dblink/dblink.c
12 : * Copyright (c) 2001-2026, PostgreSQL Global Development Group
13 : * ALL RIGHTS RESERVED;
14 : *
15 : * Permission to use, copy, modify, and distribute this software and its
16 : * documentation for any purpose, without fee, and without a written agreement
17 : * is hereby granted, provided that the above copyright notice and this
18 : * paragraph and the following two paragraphs appear in all copies.
19 : *
20 : * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 : * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 : * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 : * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 : * POSSIBILITY OF SUCH DAMAGE.
25 : *
26 : * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 : * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 : * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 : * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 : * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 : *
32 : */
33 : #include "postgres.h"
34 :
35 : #include <limits.h>
36 :
37 : #include "access/htup_details.h"
38 : #include "access/relation.h"
39 : #include "access/reloptions.h"
40 : #include "access/table.h"
41 : #include "catalog/namespace.h"
42 : #include "catalog/pg_foreign_data_wrapper.h"
43 : #include "catalog/pg_foreign_server.h"
44 : #include "catalog/pg_type.h"
45 : #include "catalog/pg_user_mapping.h"
46 : #include "commands/defrem.h"
47 : #include "common/base64.h"
48 : #include "executor/spi.h"
49 : #include "foreign/foreign.h"
50 : #include "funcapi.h"
51 : #include "lib/stringinfo.h"
52 : #include "libpq-fe.h"
53 : #include "libpq/libpq-be.h"
54 : #include "libpq/libpq-be-fe-helpers.h"
55 : #include "mb/pg_wchar.h"
56 : #include "miscadmin.h"
57 : #include "parser/scansup.h"
58 : #include "utils/acl.h"
59 : #include "utils/builtins.h"
60 : #include "utils/fmgroids.h"
61 : #include "utils/guc.h"
62 : #include "utils/hsearch.h"
63 : #include "utils/lsyscache.h"
64 : #include "utils/memutils.h"
65 : #include "utils/rel.h"
66 : #include "utils/tuplestore.h"
67 : #include "utils/varlena.h"
68 : #include "utils/wait_event.h"
69 :
70 17 : PG_MODULE_MAGIC_EXT(
71 : .name = "dblink",
72 : .version = PG_VERSION
73 : );
74 :
75 : typedef struct remoteConn
76 : {
77 : PGconn *conn; /* Hold the remote connection */
78 : int openCursorCount; /* The number of open cursors */
79 : bool newXactForCursor; /* Opened a transaction for a cursor */
80 : } remoteConn;
81 :
82 : typedef struct storeInfo
83 : {
84 : FunctionCallInfo fcinfo;
85 : Tuplestorestate *tuplestore;
86 : AttInMetadata *attinmeta;
87 : MemoryContext tmpcontext;
88 : char **cstrs;
89 : /* temp storage for results to avoid leaks on exception */
90 : PGresult *last_res;
91 : PGresult *cur_res;
92 : } storeInfo;
93 :
94 : /*
95 : * Internal declarations
96 : */
97 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
98 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
99 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
100 : PGresult *res);
101 : static void materializeQueryResult(FunctionCallInfo fcinfo,
102 : PGconn *conn,
103 : const char *conname,
104 : const char *sql,
105 : bool fail);
106 : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
107 : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
108 : static remoteConn *getConnectionByName(const char *name);
109 : static HTAB *createConnHash(void);
110 : static remoteConn *createNewConnection(const char *name);
111 : static void deleteConnection(const char *name);
112 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
113 : static char **get_text_array_contents(ArrayType *array, int *numitems);
114 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
115 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
116 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
117 : static char *quote_ident_cstr(char *rawstr);
118 : static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
119 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
120 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
121 : static char *generate_relation_name(Relation rel);
122 : static void dblink_connstr_check(const char *connstr);
123 : static bool dblink_connstr_has_pw(const char *connstr);
124 : static void dblink_security_check(PGconn *conn, const char *connname,
125 : const char *connstr);
126 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
127 : bool fail, const char *fmt, ...) pg_attribute_printf(5, 6);
128 : static char *get_connect_string(const char *servername);
129 : static char *escape_param_str(const char *str);
130 : static void validate_pkattnums(Relation rel,
131 : int2vector *pkattnums_arg, int32 pknumatts_arg,
132 : int **pkattnums, int *pknumatts);
133 : static bool is_valid_dblink_option(const PQconninfoOption *options,
134 : const char *option, Oid context);
135 : static int applyRemoteGucs(PGconn *conn);
136 : static void restoreLocalGucs(int nestlevel);
137 : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
138 : static void appendSCRAMKeysInfo(StringInfo buf);
139 : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
140 : Oid context);
141 : static bool dblink_connstr_has_required_scram_options(const char *connstr);
142 :
143 : /* Global */
144 : static remoteConn *pconn = NULL;
145 : static HTAB *remoteConnHash = NULL;
146 :
147 : /* custom wait event values, retrieved from shared memory */
148 : static uint32 dblink_we_connect = 0;
149 : static uint32 dblink_we_get_conn = 0;
150 : static uint32 dblink_we_get_result = 0;
151 :
152 : /*
153 : * Following is hash that holds multiple remote connections.
154 : * Calling convention of each dblink function changes to accept
155 : * connection name as the first parameter. The connection hash is
156 : * much like ecpg e.g. a mapping between a name and a PGconn object.
157 : *
158 : * To avoid potentially leaking a PGconn object in case of out-of-memory
159 : * errors, we first create the hash entry, then open the PGconn.
160 : * Hence, a hash entry whose rconn.conn pointer is NULL must be
161 : * understood as a leftover from a failed create; it should be ignored
162 : * by lookup operations, and silently replaced by create operations.
163 : */
164 :
165 : typedef struct remoteConnHashEnt
166 : {
167 : char name[NAMEDATALEN];
168 : remoteConn rconn;
169 : } remoteConnHashEnt;
170 :
171 : /* initial number of connection hashes */
172 : #define NUMCONN 16
173 :
174 : pg_noreturn static void
175 0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
176 : {
177 0 : char *msg = pchomp(PQerrorMessage(conn));
178 :
179 0 : PQclear(res);
180 0 : elog(ERROR, "%s: %s", p2, msg);
181 : }
182 :
183 : pg_noreturn static void
184 3 : dblink_conn_not_avail(const char *conname)
185 : {
186 3 : if (conname)
187 1 : ereport(ERROR,
188 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
189 : errmsg("connection \"%s\" not available", conname)));
190 : else
191 2 : ereport(ERROR,
192 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
193 : errmsg("connection not available")));
194 : }
195 :
196 : static void
197 37 : dblink_get_conn(char *conname_or_str,
198 : PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
199 : {
200 37 : remoteConn *rconn = getConnectionByName(conname_or_str);
201 : PGconn *conn;
202 : char *conname;
203 : bool freeconn;
204 :
205 37 : if (rconn)
206 : {
207 27 : conn = rconn->conn;
208 27 : conname = conname_or_str;
209 27 : freeconn = false;
210 : }
211 : else
212 : {
213 : const char *connstr;
214 :
215 10 : connstr = get_connect_string(conname_or_str);
216 10 : if (connstr == NULL)
217 6 : connstr = conname_or_str;
218 10 : dblink_connstr_check(connstr);
219 :
220 : /* first time, allocate or get the custom wait event */
221 9 : if (dblink_we_get_conn == 0)
222 5 : dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
223 :
224 : /* OK to make connection */
225 9 : conn = libpqsrv_connect_start(connstr);
226 9 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
227 : "received message via remote connection");
228 9 : libpqsrv_connect_complete(conn, dblink_we_get_conn);
229 :
230 9 : if (PQstatus(conn) == CONNECTION_BAD)
231 : {
232 3 : char *msg = pchomp(PQerrorMessage(conn));
233 :
234 3 : libpqsrv_disconnect(conn);
235 3 : ereport(ERROR,
236 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
237 : errmsg("could not establish connection"),
238 : errdetail_internal("%s", msg)));
239 : }
240 :
241 6 : dblink_security_check(conn, NULL, connstr);
242 6 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
243 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
244 6 : freeconn = true;
245 6 : conname = NULL;
246 : }
247 :
248 33 : *conn_p = conn;
249 33 : *conname_p = conname;
250 33 : *freeconn_p = freeconn;
251 33 : }
252 :
253 : static PGconn *
254 17 : dblink_get_named_conn(const char *conname)
255 : {
256 17 : remoteConn *rconn = getConnectionByName(conname);
257 :
258 17 : if (rconn)
259 17 : return rconn->conn;
260 :
261 0 : dblink_conn_not_avail(conname);
262 : return NULL; /* keep compiler quiet */
263 : }
264 :
265 : static void
266 124 : dblink_init(void)
267 : {
268 124 : if (!pconn)
269 : {
270 7 : if (dblink_we_get_result == 0)
271 7 : dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
272 :
273 7 : pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
274 7 : pconn->conn = NULL;
275 7 : pconn->openCursorCount = 0;
276 7 : pconn->newXactForCursor = false;
277 : }
278 124 : }
279 :
280 : /*
281 : * Create a persistent connection to another database
282 : */
283 13 : PG_FUNCTION_INFO_V1(dblink_connect);
284 : Datum
285 16 : dblink_connect(PG_FUNCTION_ARGS)
286 : {
287 16 : char *conname_or_str = NULL;
288 16 : char *connstr = NULL;
289 16 : char *connname = NULL;
290 : char *msg;
291 16 : PGconn *conn = NULL;
292 16 : remoteConn *rconn = NULL;
293 :
294 16 : dblink_init();
295 :
296 16 : if (PG_NARGS() == 2)
297 : {
298 11 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
299 11 : connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
300 : }
301 5 : else if (PG_NARGS() == 1)
302 5 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
303 :
304 : /* first check for valid foreign data server */
305 16 : connstr = get_connect_string(conname_or_str);
306 16 : if (connstr == NULL)
307 14 : connstr = conname_or_str;
308 :
309 : /* check password in connection string if not superuser */
310 16 : dblink_connstr_check(connstr);
311 :
312 : /* first time, allocate or get the custom wait event */
313 15 : if (dblink_we_connect == 0)
314 2 : dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
315 :
316 : /* if we need a hashtable entry, make that first, since it might fail */
317 15 : if (connname)
318 : {
319 10 : rconn = createNewConnection(connname);
320 : Assert(rconn->conn == NULL);
321 : }
322 :
323 : /* OK to make connection */
324 14 : conn = libpqsrv_connect_start(connstr);
325 14 : if (conn != NULL)
326 14 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
327 : "received message via remote connection");
328 14 : libpqsrv_connect_complete(conn, dblink_we_connect);
329 :
330 14 : if (PQstatus(conn) == CONNECTION_BAD)
331 : {
332 0 : msg = pchomp(PQerrorMessage(conn));
333 0 : libpqsrv_disconnect(conn);
334 0 : if (connname)
335 0 : deleteConnection(connname);
336 :
337 0 : ereport(ERROR,
338 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
339 : errmsg("could not establish connection"),
340 : errdetail_internal("%s", msg)));
341 : }
342 :
343 : /* check password actually used if not superuser */
344 14 : dblink_security_check(conn, connname, connstr);
345 :
346 : /* attempt to set client encoding to match server encoding, if needed */
347 14 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
348 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
349 :
350 : /* all OK, save away the conn */
351 14 : if (connname)
352 : {
353 9 : rconn->conn = conn;
354 : }
355 : else
356 : {
357 5 : if (pconn->conn)
358 1 : libpqsrv_disconnect(pconn->conn);
359 5 : pconn->conn = conn;
360 : }
361 :
362 14 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
363 : }
364 :
365 : /*
366 : * Clear a persistent connection to another database
367 : */
368 8 : PG_FUNCTION_INFO_V1(dblink_disconnect);
369 : Datum
370 13 : dblink_disconnect(PG_FUNCTION_ARGS)
371 : {
372 13 : char *conname = NULL;
373 13 : remoteConn *rconn = NULL;
374 13 : PGconn *conn = NULL;
375 :
376 13 : dblink_init();
377 :
378 13 : if (PG_NARGS() == 1)
379 : {
380 9 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
381 9 : rconn = getConnectionByName(conname);
382 9 : if (rconn)
383 8 : conn = rconn->conn;
384 : }
385 : else
386 4 : conn = pconn->conn;
387 :
388 13 : if (!conn)
389 1 : dblink_conn_not_avail(conname);
390 :
391 12 : libpqsrv_disconnect(conn);
392 12 : if (rconn)
393 8 : deleteConnection(conname);
394 : else
395 4 : pconn->conn = NULL;
396 :
397 12 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
398 : }
399 :
400 : /*
401 : * opens a cursor using a persistent connection
402 : */
403 13 : PG_FUNCTION_INFO_V1(dblink_open);
404 : Datum
405 9 : dblink_open(PG_FUNCTION_ARGS)
406 : {
407 9 : PGresult *res = NULL;
408 : PGconn *conn;
409 9 : char *curname = NULL;
410 9 : char *sql = NULL;
411 9 : char *conname = NULL;
412 : StringInfoData buf;
413 9 : remoteConn *rconn = NULL;
414 9 : bool fail = true; /* default to backward compatible behavior */
415 :
416 9 : dblink_init();
417 9 : initStringInfo(&buf);
418 :
419 9 : if (PG_NARGS() == 2)
420 : {
421 : /* text,text */
422 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
423 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
424 2 : rconn = pconn;
425 : }
426 7 : else if (PG_NARGS() == 3)
427 : {
428 : /* might be text,text,text or text,text,bool */
429 6 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
430 : {
431 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
432 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
433 1 : fail = PG_GETARG_BOOL(2);
434 1 : rconn = pconn;
435 : }
436 : else
437 : {
438 5 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
439 5 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
440 5 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
441 5 : rconn = getConnectionByName(conname);
442 : }
443 : }
444 1 : else if (PG_NARGS() == 4)
445 : {
446 : /* text,text,text,bool */
447 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
448 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
449 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
450 1 : fail = PG_GETARG_BOOL(3);
451 1 : rconn = getConnectionByName(conname);
452 : }
453 :
454 9 : if (!rconn || !rconn->conn)
455 0 : dblink_conn_not_avail(conname);
456 :
457 9 : conn = rconn->conn;
458 :
459 : /* If we are not in a transaction, start one */
460 9 : if (PQtransactionStatus(conn) == PQTRANS_IDLE)
461 : {
462 7 : res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
463 7 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
464 0 : dblink_res_internalerror(conn, res, "begin error");
465 7 : PQclear(res);
466 7 : rconn->newXactForCursor = true;
467 :
468 : /*
469 : * Since transaction state was IDLE, we force cursor count to
470 : * initially be 0. This is needed as a previous ABORT might have wiped
471 : * out our transaction without maintaining the cursor count for us.
472 : */
473 7 : rconn->openCursorCount = 0;
474 : }
475 :
476 : /* if we started a transaction, increment cursor count */
477 9 : if (rconn->newXactForCursor)
478 9 : (rconn->openCursorCount)++;
479 :
480 9 : appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
481 9 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
482 9 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
483 : {
484 2 : dblink_res_error(conn, conname, res, fail,
485 : "while opening cursor \"%s\"", curname);
486 2 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
487 : }
488 :
489 7 : PQclear(res);
490 7 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
491 : }
492 :
493 : /*
494 : * closes a cursor
495 : */
496 10 : PG_FUNCTION_INFO_V1(dblink_close);
497 : Datum
498 5 : dblink_close(PG_FUNCTION_ARGS)
499 : {
500 : PGconn *conn;
501 5 : PGresult *res = NULL;
502 5 : char *curname = NULL;
503 5 : char *conname = NULL;
504 : StringInfoData buf;
505 5 : remoteConn *rconn = NULL;
506 5 : bool fail = true; /* default to backward compatible behavior */
507 :
508 5 : dblink_init();
509 5 : initStringInfo(&buf);
510 :
511 5 : if (PG_NARGS() == 1)
512 : {
513 : /* text */
514 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
515 0 : rconn = pconn;
516 : }
517 5 : else if (PG_NARGS() == 2)
518 : {
519 : /* might be text,text or text,bool */
520 5 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
521 : {
522 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
523 2 : fail = PG_GETARG_BOOL(1);
524 2 : rconn = pconn;
525 : }
526 : else
527 : {
528 3 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
529 3 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
530 3 : rconn = getConnectionByName(conname);
531 : }
532 : }
533 5 : if (PG_NARGS() == 3)
534 : {
535 : /* text,text,bool */
536 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
537 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
538 0 : fail = PG_GETARG_BOOL(2);
539 0 : rconn = getConnectionByName(conname);
540 : }
541 :
542 5 : if (!rconn || !rconn->conn)
543 0 : dblink_conn_not_avail(conname);
544 :
545 5 : conn = rconn->conn;
546 :
547 5 : appendStringInfo(&buf, "CLOSE %s", curname);
548 :
549 : /* close the cursor */
550 5 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
551 5 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
552 : {
553 1 : dblink_res_error(conn, conname, res, fail,
554 : "while closing cursor \"%s\"", curname);
555 1 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
556 : }
557 :
558 4 : PQclear(res);
559 :
560 : /* if we started a transaction, decrement cursor count */
561 4 : if (rconn->newXactForCursor)
562 : {
563 4 : (rconn->openCursorCount)--;
564 :
565 : /* if count is zero, commit the transaction */
566 4 : if (rconn->openCursorCount == 0)
567 : {
568 2 : rconn->newXactForCursor = false;
569 :
570 2 : res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
571 2 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
572 0 : dblink_res_internalerror(conn, res, "commit error");
573 2 : PQclear(res);
574 : }
575 : }
576 :
577 4 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
578 : }
579 :
580 : /*
581 : * Fetch results from an open cursor
582 : */
583 13 : PG_FUNCTION_INFO_V1(dblink_fetch);
584 : Datum
585 13 : dblink_fetch(PG_FUNCTION_ARGS)
586 : {
587 13 : PGresult *res = NULL;
588 13 : char *conname = NULL;
589 13 : remoteConn *rconn = NULL;
590 13 : PGconn *conn = NULL;
591 : StringInfoData buf;
592 13 : char *curname = NULL;
593 13 : int howmany = 0;
594 13 : bool fail = true; /* default to backward compatible */
595 :
596 13 : prepTuplestoreResult(fcinfo);
597 :
598 13 : dblink_init();
599 :
600 13 : if (PG_NARGS() == 4)
601 : {
602 : /* text,text,int,bool */
603 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
604 1 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
605 1 : howmany = PG_GETARG_INT32(2);
606 1 : fail = PG_GETARG_BOOL(3);
607 :
608 1 : rconn = getConnectionByName(conname);
609 1 : if (rconn)
610 1 : conn = rconn->conn;
611 : }
612 12 : else if (PG_NARGS() == 3)
613 : {
614 : /* text,text,int or text,int,bool */
615 8 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
616 : {
617 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
618 2 : howmany = PG_GETARG_INT32(1);
619 2 : fail = PG_GETARG_BOOL(2);
620 2 : conn = pconn->conn;
621 : }
622 : else
623 : {
624 6 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
625 6 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
626 6 : howmany = PG_GETARG_INT32(2);
627 :
628 6 : rconn = getConnectionByName(conname);
629 6 : if (rconn)
630 6 : conn = rconn->conn;
631 : }
632 : }
633 4 : else if (PG_NARGS() == 2)
634 : {
635 : /* text,int */
636 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
637 4 : howmany = PG_GETARG_INT32(1);
638 4 : conn = pconn->conn;
639 : }
640 :
641 13 : if (!conn)
642 0 : dblink_conn_not_avail(conname);
643 :
644 13 : initStringInfo(&buf);
645 13 : appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
646 :
647 : /*
648 : * Try to execute the query. Note that since libpq uses malloc, the
649 : * PGresult will be long-lived even though we are still in a short-lived
650 : * memory context.
651 : */
652 13 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
653 26 : if (!res ||
654 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
655 13 : PQresultStatus(res) != PGRES_TUPLES_OK))
656 : {
657 5 : dblink_res_error(conn, conname, res, fail,
658 : "while fetching from cursor \"%s\"", curname);
659 3 : return (Datum) 0;
660 : }
661 8 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
662 : {
663 : /* cursor does not exist - closed already or bad name */
664 0 : PQclear(res);
665 0 : ereport(ERROR,
666 : (errcode(ERRCODE_INVALID_CURSOR_NAME),
667 : errmsg("cursor \"%s\" does not exist", curname)));
668 : }
669 :
670 8 : materializeResult(fcinfo, conn, res);
671 7 : return (Datum) 0;
672 : }
673 :
674 : /*
675 : * Note: this is the new preferred version of dblink
676 : */
677 19 : PG_FUNCTION_INFO_V1(dblink_record);
678 : Datum
679 29 : dblink_record(PG_FUNCTION_ARGS)
680 : {
681 29 : return dblink_record_internal(fcinfo, false);
682 : }
683 :
684 4 : PG_FUNCTION_INFO_V1(dblink_send_query);
685 : Datum
686 6 : dblink_send_query(PG_FUNCTION_ARGS)
687 : {
688 : PGconn *conn;
689 : char *sql;
690 : int retval;
691 :
692 6 : if (PG_NARGS() == 2)
693 : {
694 6 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
695 6 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
696 : }
697 : else
698 : /* shouldn't happen */
699 0 : elog(ERROR, "wrong number of arguments");
700 :
701 : /* async query send */
702 6 : retval = PQsendQuery(conn, sql);
703 6 : if (retval != 1)
704 0 : elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
705 :
706 6 : PG_RETURN_INT32(retval);
707 : }
708 :
709 6 : PG_FUNCTION_INFO_V1(dblink_get_result);
710 : Datum
711 8 : dblink_get_result(PG_FUNCTION_ARGS)
712 : {
713 8 : return dblink_record_internal(fcinfo, true);
714 : }
715 :
716 : static Datum
717 37 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
718 : {
719 37 : PGconn *volatile conn = NULL;
720 37 : volatile bool freeconn = false;
721 :
722 37 : prepTuplestoreResult(fcinfo);
723 :
724 37 : dblink_init();
725 :
726 37 : PG_TRY();
727 : {
728 37 : char *sql = NULL;
729 37 : char *conname = NULL;
730 37 : bool fail = true; /* default to backward compatible */
731 :
732 37 : if (!is_async)
733 : {
734 29 : if (PG_NARGS() == 3)
735 : {
736 : /* text,text,bool */
737 1 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
738 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
739 1 : fail = PG_GETARG_BOOL(2);
740 1 : dblink_get_conn(conname, &conn, &conname, &freeconn);
741 : }
742 28 : else if (PG_NARGS() == 2)
743 : {
744 : /* text,text or text,bool */
745 21 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
746 : {
747 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
748 1 : fail = PG_GETARG_BOOL(1);
749 1 : conn = pconn->conn;
750 : }
751 : else
752 : {
753 20 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
754 20 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
755 20 : dblink_get_conn(conname, &conn, &conname, &freeconn);
756 : }
757 : }
758 7 : else if (PG_NARGS() == 1)
759 : {
760 : /* text */
761 7 : conn = pconn->conn;
762 7 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
763 : }
764 : else
765 : /* shouldn't happen */
766 0 : elog(ERROR, "wrong number of arguments");
767 : }
768 : else /* is_async */
769 : {
770 : /* get async result */
771 8 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
772 :
773 8 : if (PG_NARGS() == 2)
774 : {
775 : /* text,bool */
776 0 : fail = PG_GETARG_BOOL(1);
777 0 : conn = dblink_get_named_conn(conname);
778 : }
779 8 : else if (PG_NARGS() == 1)
780 : {
781 : /* text */
782 8 : conn = dblink_get_named_conn(conname);
783 : }
784 : else
785 : /* shouldn't happen */
786 0 : elog(ERROR, "wrong number of arguments");
787 : }
788 :
789 33 : if (!conn)
790 2 : dblink_conn_not_avail(conname);
791 :
792 31 : if (!is_async)
793 : {
794 : /* synchronous query, use efficient tuple collection method */
795 23 : materializeQueryResult(fcinfo, conn, conname, sql, fail);
796 : }
797 : else
798 : {
799 : /* async result retrieval, do it the old way */
800 8 : PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);
801 :
802 : /* NULL means we're all done with the async results */
803 8 : if (res)
804 : {
805 5 : if (PQresultStatus(res) != PGRES_COMMAND_OK &&
806 5 : PQresultStatus(res) != PGRES_TUPLES_OK)
807 : {
808 0 : dblink_res_error(conn, conname, res, fail,
809 : "while executing query");
810 : /* if fail isn't set, we'll return an empty query result */
811 : }
812 : else
813 : {
814 5 : materializeResult(fcinfo, conn, res);
815 : }
816 : }
817 : }
818 : }
819 6 : PG_FINALLY();
820 : {
821 : /* if needed, close the connection to the database */
822 37 : if (freeconn)
823 5 : libpqsrv_disconnect(conn);
824 : }
825 37 : PG_END_TRY();
826 :
827 31 : return (Datum) 0;
828 : }
829 :
830 : /*
831 : * Verify function caller can handle a tuplestore result, and set up for that.
832 : *
833 : * Note: if the caller returns without actually creating a tuplestore, the
834 : * executor will treat the function result as an empty set.
835 : */
836 : static void
837 50 : prepTuplestoreResult(FunctionCallInfo fcinfo)
838 : {
839 50 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
840 :
841 : /* check to see if query supports us returning a tuplestore */
842 50 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
843 0 : ereport(ERROR,
844 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
845 : errmsg("set-valued function called in context that cannot accept a set")));
846 50 : if (!(rsinfo->allowedModes & SFRM_Materialize))
847 0 : ereport(ERROR,
848 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
849 : errmsg("materialize mode required, but it is not allowed in this context")));
850 :
851 : /* let the executor know we're sending back a tuplestore */
852 50 : rsinfo->returnMode = SFRM_Materialize;
853 :
854 : /* caller must fill these to return a non-empty result */
855 50 : rsinfo->setResult = NULL;
856 50 : rsinfo->setDesc = NULL;
857 50 : }
858 :
859 : /*
860 : * Copy the contents of the PGresult into a tuplestore to be returned
861 : * as the result of the current function.
862 : * The PGresult will be released in this function.
863 : */
864 : static void
865 13 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
866 : {
867 13 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
868 : TupleDesc tupdesc;
869 : bool is_sql_cmd;
870 : int ntuples;
871 : int nfields;
872 :
873 : /* prepTuplestoreResult must have been called previously */
874 : Assert(rsinfo->returnMode == SFRM_Materialize);
875 :
876 13 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
877 : {
878 0 : is_sql_cmd = true;
879 :
880 : /*
881 : * need a tuple descriptor representing one TEXT column to return the
882 : * command status string as our result tuple
883 : */
884 0 : tupdesc = CreateTemplateTupleDesc(1);
885 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
886 : TEXTOID, -1, 0);
887 0 : TupleDescFinalize(tupdesc);
888 0 : ntuples = 1;
889 0 : nfields = 1;
890 : }
891 : else
892 : {
893 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
894 :
895 13 : is_sql_cmd = false;
896 :
897 : /* get a tuple descriptor for our result type */
898 13 : switch (get_call_result_type(fcinfo, NULL, &tupdesc))
899 : {
900 13 : case TYPEFUNC_COMPOSITE:
901 : /* success */
902 13 : break;
903 0 : case TYPEFUNC_RECORD:
904 : /* failed to determine actual type of RECORD */
905 0 : ereport(ERROR,
906 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
907 : errmsg("function returning record called in context "
908 : "that cannot accept type record")));
909 : break;
910 0 : default:
911 : /* result type isn't composite */
912 0 : elog(ERROR, "return type must be a row type");
913 : break;
914 : }
915 :
916 : /* make sure we have a persistent copy of the tupdesc */
917 13 : tupdesc = CreateTupleDescCopy(tupdesc);
918 13 : ntuples = PQntuples(res);
919 13 : nfields = PQnfields(res);
920 : }
921 :
922 : /*
923 : * check result and tuple descriptor have the same number of columns
924 : */
925 13 : if (nfields != tupdesc->natts)
926 0 : ereport(ERROR,
927 : (errcode(ERRCODE_DATATYPE_MISMATCH),
928 : errmsg("remote query result rowtype does not match "
929 : "the specified FROM clause rowtype")));
930 :
931 13 : if (ntuples > 0)
932 : {
933 : AttInMetadata *attinmeta;
934 13 : int nestlevel = -1;
935 : Tuplestorestate *tupstore;
936 : MemoryContext oldcontext;
937 : int row;
938 : char **values;
939 :
940 13 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
941 :
942 : /* Set GUCs to ensure we read GUC-sensitive data types correctly */
943 13 : if (!is_sql_cmd)
944 13 : nestlevel = applyRemoteGucs(conn);
945 :
946 13 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
947 13 : tupstore = tuplestore_begin_heap(true, false, work_mem);
948 13 : rsinfo->setResult = tupstore;
949 13 : rsinfo->setDesc = tupdesc;
950 13 : MemoryContextSwitchTo(oldcontext);
951 :
952 13 : values = palloc_array(char *, nfields);
953 :
954 : /* put all tuples into the tuplestore */
955 49 : for (row = 0; row < ntuples; row++)
956 : {
957 : HeapTuple tuple;
958 :
959 37 : if (!is_sql_cmd)
960 : {
961 : int i;
962 :
963 138 : for (i = 0; i < nfields; i++)
964 : {
965 101 : if (PQgetisnull(res, row, i))
966 0 : values[i] = NULL;
967 : else
968 101 : values[i] = PQgetvalue(res, row, i);
969 : }
970 : }
971 : else
972 : {
973 0 : values[0] = PQcmdStatus(res);
974 : }
975 :
976 : /* build the tuple and put it into the tuplestore. */
977 37 : tuple = BuildTupleFromCStrings(attinmeta, values);
978 36 : tuplestore_puttuple(tupstore, tuple);
979 : }
980 :
981 : /* clean up GUC settings, if we changed any */
982 12 : restoreLocalGucs(nestlevel);
983 : }
984 :
985 12 : PQclear(res);
986 12 : }
987 :
988 : /*
989 : * Execute the given SQL command and store its results into a tuplestore
990 : * to be returned as the result of the current function.
991 : *
992 : * This is equivalent to PQexec followed by materializeResult, but we make
993 : * use of libpq's single-row mode to avoid accumulating the whole result
994 : * inside libpq before it gets transferred to the tuplestore.
995 : */
996 : static void
997 23 : materializeQueryResult(FunctionCallInfo fcinfo,
998 : PGconn *conn,
999 : const char *conname,
1000 : const char *sql,
1001 : bool fail)
1002 : {
1003 23 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1004 :
1005 : /* prepTuplestoreResult must have been called previously */
1006 : Assert(rsinfo->returnMode == SFRM_Materialize);
1007 :
1008 : /* Use a PG_TRY block to ensure we pump libpq dry of results */
1009 23 : PG_TRY();
1010 : {
1011 23 : storeInfo sinfo = {0};
1012 : PGresult *res;
1013 :
1014 23 : sinfo.fcinfo = fcinfo;
1015 : /* Create short-lived memory context for data conversions */
1016 23 : sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1017 : "dblink temporary context",
1018 : ALLOCSET_DEFAULT_SIZES);
1019 :
1020 : /* execute query, collecting any tuples into the tuplestore */
1021 23 : res = storeQueryResult(&sinfo, conn, sql);
1022 :
1023 23 : if (!res ||
1024 23 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1025 23 : PQresultStatus(res) != PGRES_TUPLES_OK))
1026 : {
1027 2 : dblink_res_error(conn, conname, res, fail,
1028 : "while executing query");
1029 : /* if fail isn't set, we'll return an empty query result */
1030 : }
1031 21 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1032 : {
1033 : /*
1034 : * storeRow didn't get called, so we need to convert the command
1035 : * status string to a tuple manually
1036 : */
1037 : TupleDesc tupdesc;
1038 : AttInMetadata *attinmeta;
1039 : Tuplestorestate *tupstore;
1040 : HeapTuple tuple;
1041 : char *values[1];
1042 : MemoryContext oldcontext;
1043 :
1044 : /*
1045 : * need a tuple descriptor representing one TEXT column to return
1046 : * the command status string as our result tuple
1047 : */
1048 0 : tupdesc = CreateTemplateTupleDesc(1);
1049 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1050 : TEXTOID, -1, 0);
1051 0 : TupleDescFinalize(tupdesc);
1052 0 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1053 :
1054 0 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1055 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1056 0 : rsinfo->setResult = tupstore;
1057 0 : rsinfo->setDesc = tupdesc;
1058 0 : MemoryContextSwitchTo(oldcontext);
1059 :
1060 0 : values[0] = PQcmdStatus(res);
1061 :
1062 : /* build the tuple and put it into the tuplestore. */
1063 0 : tuple = BuildTupleFromCStrings(attinmeta, values);
1064 0 : tuplestore_puttuple(tupstore, tuple);
1065 :
1066 0 : PQclear(res);
1067 : }
1068 : else
1069 : {
1070 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1071 : /* storeRow should have created a tuplestore */
1072 : Assert(rsinfo->setResult != NULL);
1073 :
1074 21 : PQclear(res);
1075 : }
1076 :
1077 : /* clean up data conversion short-lived memory context */
1078 23 : if (sinfo.tmpcontext != NULL)
1079 23 : MemoryContextDelete(sinfo.tmpcontext);
1080 :
1081 23 : PQclear(sinfo.last_res);
1082 23 : PQclear(sinfo.cur_res);
1083 : }
1084 0 : PG_CATCH();
1085 : {
1086 : PGresult *res;
1087 :
1088 : /* be sure to clear out any pending data in libpq */
1089 0 : while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
1090 : NULL)
1091 0 : PQclear(res);
1092 0 : PG_RE_THROW();
1093 : }
1094 23 : PG_END_TRY();
1095 23 : }
1096 :
1097 : /*
1098 : * Execute query, and send any result rows to sinfo->tuplestore.
1099 : */
1100 : static PGresult *
1101 23 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
1102 : {
1103 23 : bool first = true;
1104 23 : int nestlevel = -1;
1105 : PGresult *res;
1106 :
1107 23 : if (!PQsendQuery(conn, sql))
1108 0 : elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1109 :
1110 23 : if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1111 0 : elog(ERROR, "failed to set single-row mode for dblink query");
1112 :
1113 : for (;;)
1114 : {
1115 198 : CHECK_FOR_INTERRUPTS();
1116 :
1117 198 : sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
1118 198 : if (!sinfo->cur_res)
1119 23 : break;
1120 :
1121 175 : if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1122 : {
1123 : /* got one row from possibly-bigger resultset */
1124 :
1125 : /*
1126 : * Set GUCs to ensure we read GUC-sensitive data types correctly.
1127 : * We shouldn't do this until we have a row in hand, to ensure
1128 : * libpq has seen any earlier ParameterStatus protocol messages.
1129 : */
1130 152 : if (first && nestlevel < 0)
1131 21 : nestlevel = applyRemoteGucs(conn);
1132 :
1133 152 : storeRow(sinfo, sinfo->cur_res, first);
1134 :
1135 152 : PQclear(sinfo->cur_res);
1136 152 : sinfo->cur_res = NULL;
1137 152 : first = false;
1138 : }
1139 : else
1140 : {
1141 : /* if empty resultset, fill tuplestore header */
1142 23 : if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1143 0 : storeRow(sinfo, sinfo->cur_res, first);
1144 :
1145 : /* store completed result at last_res */
1146 23 : PQclear(sinfo->last_res);
1147 23 : sinfo->last_res = sinfo->cur_res;
1148 23 : sinfo->cur_res = NULL;
1149 23 : first = true;
1150 : }
1151 : }
1152 :
1153 : /* clean up GUC settings, if we changed any */
1154 23 : restoreLocalGucs(nestlevel);
1155 :
1156 : /* return last_res */
1157 23 : res = sinfo->last_res;
1158 23 : sinfo->last_res = NULL;
1159 23 : return res;
1160 : }
1161 :
1162 : /*
1163 : * Send single row to sinfo->tuplestore.
1164 : *
1165 : * If "first" is true, create the tuplestore using PGresult's metadata
1166 : * (in this case the PGresult might contain either zero or one row).
1167 : */
1168 : static void
1169 152 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
1170 : {
1171 152 : int nfields = PQnfields(res);
1172 : HeapTuple tuple;
1173 : int i;
1174 : MemoryContext oldcontext;
1175 :
1176 152 : if (first)
1177 : {
1178 : /* Prepare for new result set */
1179 21 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1180 : TupleDesc tupdesc;
1181 :
1182 : /*
1183 : * It's possible to get more than one result set if the query string
1184 : * contained multiple SQL commands. In that case, we follow PQexec's
1185 : * traditional behavior of throwing away all but the last result.
1186 : */
1187 21 : if (sinfo->tuplestore)
1188 0 : tuplestore_end(sinfo->tuplestore);
1189 21 : sinfo->tuplestore = NULL;
1190 :
1191 : /* get a tuple descriptor for our result type */
1192 21 : switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1193 : {
1194 21 : case TYPEFUNC_COMPOSITE:
1195 : /* success */
1196 21 : break;
1197 0 : case TYPEFUNC_RECORD:
1198 : /* failed to determine actual type of RECORD */
1199 0 : ereport(ERROR,
1200 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1201 : errmsg("function returning record called in context "
1202 : "that cannot accept type record")));
1203 : break;
1204 0 : default:
1205 : /* result type isn't composite */
1206 0 : elog(ERROR, "return type must be a row type");
1207 : break;
1208 : }
1209 :
1210 : /* make sure we have a persistent copy of the tupdesc */
1211 21 : tupdesc = CreateTupleDescCopy(tupdesc);
1212 :
1213 : /* check result and tuple descriptor have the same number of columns */
1214 21 : if (nfields != tupdesc->natts)
1215 0 : ereport(ERROR,
1216 : (errcode(ERRCODE_DATATYPE_MISMATCH),
1217 : errmsg("remote query result rowtype does not match "
1218 : "the specified FROM clause rowtype")));
1219 :
1220 : /* Prepare attinmeta for later data conversions */
1221 21 : sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1222 :
1223 : /* Create a new, empty tuplestore */
1224 21 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1225 21 : sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1226 21 : rsinfo->setResult = sinfo->tuplestore;
1227 21 : rsinfo->setDesc = tupdesc;
1228 21 : MemoryContextSwitchTo(oldcontext);
1229 :
1230 : /* Done if empty resultset */
1231 21 : if (PQntuples(res) == 0)
1232 0 : return;
1233 :
1234 : /*
1235 : * Set up sufficiently-wide string pointers array; this won't change
1236 : * in size so it's easy to preallocate.
1237 : */
1238 21 : if (sinfo->cstrs)
1239 0 : pfree(sinfo->cstrs);
1240 21 : sinfo->cstrs = palloc_array(char *, nfields);
1241 : }
1242 :
1243 : /* Should have a single-row result if we get here */
1244 : Assert(PQntuples(res) == 1);
1245 :
1246 : /*
1247 : * Do the following work in a temp context that we reset after each tuple.
1248 : * This cleans up not only the data we have direct access to, but any
1249 : * cruft the I/O functions might leak.
1250 : */
1251 152 : oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1252 :
1253 : /*
1254 : * Fill cstrs with null-terminated strings of column values.
1255 : */
1256 570 : for (i = 0; i < nfields; i++)
1257 : {
1258 418 : if (PQgetisnull(res, 0, i))
1259 0 : sinfo->cstrs[i] = NULL;
1260 : else
1261 418 : sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1262 : }
1263 :
1264 : /* Convert row to a tuple, and add it to the tuplestore */
1265 152 : tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1266 :
1267 152 : tuplestore_puttuple(sinfo->tuplestore, tuple);
1268 :
1269 : /* Clean up */
1270 152 : MemoryContextSwitchTo(oldcontext);
1271 152 : MemoryContextReset(sinfo->tmpcontext);
1272 : }
1273 :
1274 : /*
1275 : * List all open dblink connections by name.
1276 : * Returns an array of all connection names.
1277 : * Takes no params
1278 : */
1279 3 : PG_FUNCTION_INFO_V1(dblink_get_connections);
1280 : Datum
1281 1 : dblink_get_connections(PG_FUNCTION_ARGS)
1282 : {
1283 : HASH_SEQ_STATUS status;
1284 : remoteConnHashEnt *hentry;
1285 1 : ArrayBuildState *astate = NULL;
1286 :
1287 1 : if (remoteConnHash)
1288 : {
1289 1 : hash_seq_init(&status, remoteConnHash);
1290 4 : while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1291 : {
1292 : /* ignore it if it's not an open connection */
1293 3 : if (hentry->rconn.conn == NULL)
1294 0 : continue;
1295 : /* stash away current value */
1296 3 : astate = accumArrayResult(astate,
1297 3 : CStringGetTextDatum(hentry->name),
1298 : false, TEXTOID, CurrentMemoryContext);
1299 : }
1300 : }
1301 :
1302 1 : if (astate)
1303 1 : PG_RETURN_DATUM(makeArrayResult(astate,
1304 : CurrentMemoryContext));
1305 : else
1306 0 : PG_RETURN_NULL();
1307 : }
1308 :
1309 : /*
1310 : * Checks if a given remote connection is busy
1311 : *
1312 : * Returns 1 if the connection is busy, 0 otherwise
1313 : * Params:
1314 : * text connection_name - name of the connection to check
1315 : *
1316 : */
1317 3 : PG_FUNCTION_INFO_V1(dblink_is_busy);
1318 : Datum
1319 1 : dblink_is_busy(PG_FUNCTION_ARGS)
1320 : {
1321 : PGconn *conn;
1322 :
1323 1 : dblink_init();
1324 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1325 :
1326 1 : PQconsumeInput(conn);
1327 1 : PG_RETURN_INT32(PQisBusy(conn));
1328 : }
1329 :
1330 : /*
1331 : * Cancels a running request on a connection
1332 : *
1333 : * Returns text:
1334 : * "OK" if the cancel request has been sent correctly,
1335 : * an error message otherwise
1336 : *
1337 : * Params:
1338 : * text connection_name - name of the connection to check
1339 : *
1340 : */
1341 3 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
1342 : Datum
1343 1 : dblink_cancel_query(PG_FUNCTION_ARGS)
1344 : {
1345 : PGconn *conn;
1346 : const char *msg;
1347 : TimestampTz endtime;
1348 :
1349 1 : dblink_init();
1350 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1351 1 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1352 : 30000);
1353 1 : msg = libpqsrv_cancel(conn, endtime);
1354 1 : if (msg == NULL)
1355 1 : msg = "OK";
1356 :
1357 1 : PG_RETURN_TEXT_P(cstring_to_text(msg));
1358 : }
1359 :
1360 :
1361 : /*
1362 : * Get error message from a connection
1363 : *
1364 : * Returns text:
1365 : * "OK" if no error, an error message otherwise
1366 : *
1367 : * Params:
1368 : * text connection_name - name of the connection to check
1369 : *
1370 : */
1371 3 : PG_FUNCTION_INFO_V1(dblink_error_message);
1372 : Datum
1373 1 : dblink_error_message(PG_FUNCTION_ARGS)
1374 : {
1375 : char *msg;
1376 : PGconn *conn;
1377 :
1378 1 : dblink_init();
1379 1 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1380 :
1381 1 : msg = PQerrorMessage(conn);
1382 1 : if (msg == NULL || msg[0] == '\0')
1383 1 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1384 : else
1385 0 : PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1386 : }
1387 :
1388 : /*
1389 : * Execute an SQL non-SELECT command
1390 : */
1391 13 : PG_FUNCTION_INFO_V1(dblink_exec);
1392 : Datum
1393 26 : dblink_exec(PG_FUNCTION_ARGS)
1394 : {
1395 26 : text *volatile sql_cmd_status = NULL;
1396 26 : PGconn *volatile conn = NULL;
1397 26 : volatile bool freeconn = false;
1398 :
1399 26 : dblink_init();
1400 :
1401 26 : PG_TRY();
1402 : {
1403 26 : PGresult *res = NULL;
1404 26 : char *sql = NULL;
1405 26 : char *conname = NULL;
1406 26 : bool fail = true; /* default to backward compatible behavior */
1407 :
1408 26 : if (PG_NARGS() == 3)
1409 : {
1410 : /* must be text,text,bool */
1411 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1412 0 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1413 0 : fail = PG_GETARG_BOOL(2);
1414 0 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1415 : }
1416 26 : else if (PG_NARGS() == 2)
1417 : {
1418 : /* might be text,text or text,bool */
1419 17 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1420 : {
1421 1 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1422 1 : fail = PG_GETARG_BOOL(1);
1423 1 : conn = pconn->conn;
1424 : }
1425 : else
1426 : {
1427 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1428 16 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1429 16 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1430 : }
1431 : }
1432 9 : else if (PG_NARGS() == 1)
1433 : {
1434 : /* must be single text argument */
1435 9 : conn = pconn->conn;
1436 9 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1437 : }
1438 : else
1439 : /* shouldn't happen */
1440 0 : elog(ERROR, "wrong number of arguments");
1441 :
1442 26 : if (!conn)
1443 0 : dblink_conn_not_avail(conname);
1444 :
1445 26 : res = libpqsrv_exec(conn, sql, dblink_we_get_result);
1446 26 : if (!res ||
1447 26 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1448 2 : PQresultStatus(res) != PGRES_TUPLES_OK))
1449 : {
1450 2 : dblink_res_error(conn, conname, res, fail,
1451 : "while executing command");
1452 :
1453 : /*
1454 : * and save a copy of the command status string to return as our
1455 : * result tuple
1456 : */
1457 1 : sql_cmd_status = cstring_to_text("ERROR");
1458 : }
1459 24 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1460 : {
1461 : /*
1462 : * and save a copy of the command status string to return as our
1463 : * result tuple
1464 : */
1465 24 : sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1466 24 : PQclear(res);
1467 : }
1468 : else
1469 : {
1470 0 : PQclear(res);
1471 0 : ereport(ERROR,
1472 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1473 : errmsg("statement returning results not allowed")));
1474 : }
1475 : }
1476 1 : PG_FINALLY();
1477 : {
1478 : /* if needed, close the connection to the database */
1479 26 : if (freeconn)
1480 1 : libpqsrv_disconnect(conn);
1481 : }
1482 26 : PG_END_TRY();
1483 :
1484 25 : PG_RETURN_TEXT_P(sql_cmd_status);
1485 : }
1486 :
1487 :
1488 : /*
1489 : * dblink_get_pkey
1490 : *
1491 : * Return list of primary key fields for the supplied relation,
1492 : * or NULL if none exists.
1493 : */
1494 3 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
1495 : Datum
1496 9 : dblink_get_pkey(PG_FUNCTION_ARGS)
1497 : {
1498 : int16 indnkeyatts;
1499 : char **results;
1500 : FuncCallContext *funcctx;
1501 : int32 call_cntr;
1502 : int32 max_calls;
1503 : AttInMetadata *attinmeta;
1504 : MemoryContext oldcontext;
1505 :
1506 : /* stuff done only on the first call of the function */
1507 9 : if (SRF_IS_FIRSTCALL())
1508 : {
1509 : Relation rel;
1510 : TupleDesc tupdesc;
1511 :
1512 : /* create a function context for cross-call persistence */
1513 3 : funcctx = SRF_FIRSTCALL_INIT();
1514 :
1515 : /*
1516 : * switch to memory context appropriate for multiple function calls
1517 : */
1518 3 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1519 :
1520 : /* open target relation */
1521 3 : rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1522 :
1523 : /* get the array of attnums */
1524 3 : results = get_pkey_attnames(rel, &indnkeyatts);
1525 :
1526 3 : relation_close(rel, AccessShareLock);
1527 :
1528 : /*
1529 : * need a tuple descriptor representing one INT and one TEXT column
1530 : */
1531 3 : tupdesc = CreateTemplateTupleDesc(2);
1532 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1533 : INT4OID, -1, 0);
1534 3 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1535 : TEXTOID, -1, 0);
1536 :
1537 3 : TupleDescFinalize(tupdesc);
1538 :
1539 : /*
1540 : * Generate attribute metadata needed later to produce tuples from raw
1541 : * C strings
1542 : */
1543 3 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1544 3 : funcctx->attinmeta = attinmeta;
1545 :
1546 3 : if ((results != NULL) && (indnkeyatts > 0))
1547 : {
1548 3 : funcctx->max_calls = indnkeyatts;
1549 :
1550 : /* got results, keep track of them */
1551 3 : funcctx->user_fctx = results;
1552 : }
1553 : else
1554 : {
1555 : /* fast track when no results */
1556 0 : MemoryContextSwitchTo(oldcontext);
1557 0 : SRF_RETURN_DONE(funcctx);
1558 : }
1559 :
1560 3 : MemoryContextSwitchTo(oldcontext);
1561 : }
1562 :
1563 : /* stuff done on every call of the function */
1564 9 : funcctx = SRF_PERCALL_SETUP();
1565 :
1566 : /*
1567 : * initialize per-call variables
1568 : */
1569 9 : call_cntr = funcctx->call_cntr;
1570 9 : max_calls = funcctx->max_calls;
1571 :
1572 9 : results = (char **) funcctx->user_fctx;
1573 9 : attinmeta = funcctx->attinmeta;
1574 :
1575 9 : if (call_cntr < max_calls) /* do when there is more left to send */
1576 : {
1577 : char **values;
1578 : HeapTuple tuple;
1579 : Datum result;
1580 :
1581 6 : values = palloc_array(char *, 2);
1582 6 : values[0] = psprintf("%d", call_cntr + 1);
1583 6 : values[1] = results[call_cntr];
1584 :
1585 : /* build the tuple */
1586 6 : tuple = BuildTupleFromCStrings(attinmeta, values);
1587 :
1588 : /* make the tuple into a datum */
1589 6 : result = HeapTupleGetDatum(tuple);
1590 :
1591 6 : SRF_RETURN_NEXT(funcctx, result);
1592 : }
1593 : else
1594 : {
1595 : /* do when there is no more left */
1596 3 : SRF_RETURN_DONE(funcctx);
1597 : }
1598 : }
1599 :
1600 :
1601 : /*
1602 : * dblink_build_sql_insert
1603 : *
1604 : * Used to generate an SQL insert statement
1605 : * based on an existing tuple in a local relation.
1606 : * This is useful for selectively replicating data
1607 : * to another server via dblink.
1608 : *
1609 : * API:
1610 : * <relname> - name of local table of interest
1611 : * <pkattnums> - an int2vector of attnums which will be used
1612 : * to identify the local tuple of interest
1613 : * <pknumatts> - number of attnums in pkattnums
1614 : * <src_pkattvals_arry> - text array of key values which will be used
1615 : * to identify the local tuple of interest
1616 : * <tgt_pkattvals_arry> - text array of key values which will be used
1617 : * to build the string for execution remotely. These are substituted
1618 : * for their counterparts in src_pkattvals_arry
1619 : */
1620 4 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1621 : Datum
1622 6 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
1623 : {
1624 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1625 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1626 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1627 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1628 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1629 : Relation rel;
1630 : int *pkattnums;
1631 : int pknumatts;
1632 : char **src_pkattvals;
1633 : char **tgt_pkattvals;
1634 : int src_nitems;
1635 : int tgt_nitems;
1636 : char *sql;
1637 :
1638 : /*
1639 : * Open target relation.
1640 : */
1641 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1642 :
1643 : /*
1644 : * Process pkattnums argument.
1645 : */
1646 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1647 : &pkattnums, &pknumatts);
1648 :
1649 : /*
1650 : * Source array is made up of key values that will be used to locate the
1651 : * tuple of interest from the local system.
1652 : */
1653 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1654 :
1655 : /*
1656 : * There should be one source array key value for each key attnum
1657 : */
1658 4 : if (src_nitems != pknumatts)
1659 0 : ereport(ERROR,
1660 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1661 : errmsg("source key array length must match number of key attributes")));
1662 :
1663 : /*
1664 : * Target array is made up of key values that will be used to build the
1665 : * SQL string for use on the remote system.
1666 : */
1667 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1668 :
1669 : /*
1670 : * There should be one target array key value for each key attnum
1671 : */
1672 4 : if (tgt_nitems != pknumatts)
1673 0 : ereport(ERROR,
1674 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1675 : errmsg("target key array length must match number of key attributes")));
1676 :
1677 : /*
1678 : * Prep work is finally done. Go get the SQL string.
1679 : */
1680 4 : sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1681 :
1682 : /*
1683 : * Now we can close the relation.
1684 : */
1685 4 : relation_close(rel, AccessShareLock);
1686 :
1687 : /*
1688 : * And send it
1689 : */
1690 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1691 : }
1692 :
1693 :
1694 : /*
1695 : * dblink_build_sql_delete
1696 : *
1697 : * Used to generate an SQL delete statement.
1698 : * This is useful for selectively replicating a
1699 : * delete to another server via dblink.
1700 : *
1701 : * API:
1702 : * <relname> - name of remote table of interest
1703 : * <pkattnums> - an int2vector of attnums which will be used
1704 : * to identify the remote tuple of interest
1705 : * <pknumatts> - number of attnums in pkattnums
1706 : * <tgt_pkattvals_arry> - text array of key values which will be used
1707 : * to build the string for execution remotely.
1708 : */
1709 4 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1710 : Datum
1711 6 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
1712 : {
1713 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1714 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1715 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1716 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1717 : Relation rel;
1718 : int *pkattnums;
1719 : int pknumatts;
1720 : char **tgt_pkattvals;
1721 : int tgt_nitems;
1722 : char *sql;
1723 :
1724 : /*
1725 : * Open target relation.
1726 : */
1727 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1728 :
1729 : /*
1730 : * Process pkattnums argument.
1731 : */
1732 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1733 : &pkattnums, &pknumatts);
1734 :
1735 : /*
1736 : * Target array is made up of key values that will be used to build the
1737 : * SQL string for use on the remote system.
1738 : */
1739 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1740 :
1741 : /*
1742 : * There should be one target array key value for each key attnum
1743 : */
1744 4 : if (tgt_nitems != pknumatts)
1745 0 : ereport(ERROR,
1746 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1747 : errmsg("target key array length must match number of key attributes")));
1748 :
1749 : /*
1750 : * Prep work is finally done. Go get the SQL string.
1751 : */
1752 4 : sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1753 :
1754 : /*
1755 : * Now we can close the relation.
1756 : */
1757 4 : relation_close(rel, AccessShareLock);
1758 :
1759 : /*
1760 : * And send it
1761 : */
1762 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1763 : }
1764 :
1765 :
1766 : /*
1767 : * dblink_build_sql_update
1768 : *
1769 : * Used to generate an SQL update statement
1770 : * based on an existing tuple in a local relation.
1771 : * This is useful for selectively replicating data
1772 : * to another server via dblink.
1773 : *
1774 : * API:
1775 : * <relname> - name of local table of interest
1776 : * <pkattnums> - an int2vector of attnums which will be used
1777 : * to identify the local tuple of interest
1778 : * <pknumatts> - number of attnums in pkattnums
1779 : * <src_pkattvals_arry> - text array of key values which will be used
1780 : * to identify the local tuple of interest
1781 : * <tgt_pkattvals_arry> - text array of key values which will be used
1782 : * to build the string for execution remotely. These are substituted
1783 : * for their counterparts in src_pkattvals_arry
1784 : */
1785 4 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1786 : Datum
1787 6 : dblink_build_sql_update(PG_FUNCTION_ARGS)
1788 : {
1789 6 : text *relname_text = PG_GETARG_TEXT_PP(0);
1790 6 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1791 6 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1792 6 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1793 6 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1794 : Relation rel;
1795 : int *pkattnums;
1796 : int pknumatts;
1797 : char **src_pkattvals;
1798 : char **tgt_pkattvals;
1799 : int src_nitems;
1800 : int tgt_nitems;
1801 : char *sql;
1802 :
1803 : /*
1804 : * Open target relation.
1805 : */
1806 6 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1807 :
1808 : /*
1809 : * Process pkattnums argument.
1810 : */
1811 6 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1812 : &pkattnums, &pknumatts);
1813 :
1814 : /*
1815 : * Source array is made up of key values that will be used to locate the
1816 : * tuple of interest from the local system.
1817 : */
1818 4 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1819 :
1820 : /*
1821 : * There should be one source array key value for each key attnum
1822 : */
1823 4 : if (src_nitems != pknumatts)
1824 0 : ereport(ERROR,
1825 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1826 : errmsg("source key array length must match number of key attributes")));
1827 :
1828 : /*
1829 : * Target array is made up of key values that will be used to build the
1830 : * SQL string for use on the remote system.
1831 : */
1832 4 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1833 :
1834 : /*
1835 : * There should be one target array key value for each key attnum
1836 : */
1837 4 : if (tgt_nitems != pknumatts)
1838 0 : ereport(ERROR,
1839 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1840 : errmsg("target key array length must match number of key attributes")));
1841 :
1842 : /*
1843 : * Prep work is finally done. Go get the SQL string.
1844 : */
1845 4 : sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1846 :
1847 : /*
1848 : * Now we can close the relation.
1849 : */
1850 4 : relation_close(rel, AccessShareLock);
1851 :
1852 : /*
1853 : * And send it
1854 : */
1855 4 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1856 : }
1857 :
1858 : /*
1859 : * dblink_current_query
1860 : * return the current query string
1861 : * to allow its use in (among other things)
1862 : * rewrite rules
1863 : */
1864 2 : PG_FUNCTION_INFO_V1(dblink_current_query);
1865 : Datum
1866 0 : dblink_current_query(PG_FUNCTION_ARGS)
1867 : {
1868 : /* This is now just an alias for the built-in function current_query() */
1869 0 : PG_RETURN_DATUM(current_query(fcinfo));
1870 : }
1871 :
1872 : /*
1873 : * Retrieve async notifications for a connection.
1874 : *
1875 : * Returns a setof record of notifications, or an empty set if none received.
1876 : * Can optionally take a named connection as parameter, but uses the unnamed
1877 : * connection per default.
1878 : *
1879 : */
1880 : #define DBLINK_NOTIFY_COLS 3
1881 :
1882 5 : PG_FUNCTION_INFO_V1(dblink_get_notify);
1883 : Datum
1884 2 : dblink_get_notify(PG_FUNCTION_ARGS)
1885 : {
1886 : PGconn *conn;
1887 : PGnotify *notify;
1888 2 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1889 :
1890 2 : dblink_init();
1891 2 : if (PG_NARGS() == 1)
1892 0 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1893 : else
1894 2 : conn = pconn->conn;
1895 :
1896 2 : InitMaterializedSRF(fcinfo, 0);
1897 :
1898 2 : PQconsumeInput(conn);
1899 4 : while ((notify = PQnotifies(conn)) != NULL)
1900 : {
1901 : Datum values[DBLINK_NOTIFY_COLS];
1902 : bool nulls[DBLINK_NOTIFY_COLS];
1903 :
1904 2 : memset(values, 0, sizeof(values));
1905 2 : memset(nulls, 0, sizeof(nulls));
1906 :
1907 2 : if (notify->relname != NULL)
1908 2 : values[0] = CStringGetTextDatum(notify->relname);
1909 : else
1910 0 : nulls[0] = true;
1911 :
1912 2 : values[1] = Int32GetDatum(notify->be_pid);
1913 :
1914 2 : if (notify->extra != NULL)
1915 2 : values[2] = CStringGetTextDatum(notify->extra);
1916 : else
1917 0 : nulls[2] = true;
1918 :
1919 2 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1920 :
1921 2 : PQfreemem(notify);
1922 2 : PQconsumeInput(conn);
1923 : }
1924 :
1925 2 : return (Datum) 0;
1926 : }
1927 :
1928 : /*
1929 : * Validate the options given to a dblink foreign server or user mapping.
1930 : * Raise an error if any option is invalid.
1931 : *
1932 : * We just check the names of options here, so semantic errors in options,
1933 : * such as invalid numeric format, will be detected at the attempt to connect.
1934 : */
1935 14 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1936 : Datum
1937 19 : dblink_fdw_validator(PG_FUNCTION_ARGS)
1938 : {
1939 19 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1940 19 : Oid context = PG_GETARG_OID(1);
1941 : ListCell *cell;
1942 :
1943 : static const PQconninfoOption *options = NULL;
1944 :
1945 : /*
1946 : * Get list of valid libpq options.
1947 : *
1948 : * To avoid unnecessary work, we get the list once and use it throughout
1949 : * the lifetime of this backend process. We don't need to care about
1950 : * memory context issues, because PQconndefaults allocates with malloc.
1951 : */
1952 19 : if (!options)
1953 : {
1954 12 : options = PQconndefaults();
1955 12 : if (!options) /* assume reason for failure is OOM */
1956 0 : ereport(ERROR,
1957 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1958 : errmsg("out of memory"),
1959 : errdetail("Could not get libpq's default connection options.")));
1960 : }
1961 :
1962 : /* Validate each supplied option. */
1963 50 : foreach(cell, options_list)
1964 : {
1965 39 : DefElem *def = (DefElem *) lfirst(cell);
1966 :
1967 39 : if (!is_valid_dblink_fdw_option(options, def->defname, context))
1968 : {
1969 : /*
1970 : * Unknown option, or invalid option for the context specified, so
1971 : * complain about it. Provide a hint with a valid option that
1972 : * looks similar, if there is one.
1973 : */
1974 : const PQconninfoOption *opt;
1975 : const char *closest_match;
1976 : ClosestMatchState match_state;
1977 8 : bool has_valid_options = false;
1978 :
1979 8 : initClosestMatch(&match_state, def->defname, 4);
1980 424 : for (opt = options; opt->keyword; opt++)
1981 : {
1982 416 : if (is_valid_dblink_option(options, opt->keyword, context))
1983 : {
1984 93 : has_valid_options = true;
1985 93 : updateClosestMatch(&match_state, opt->keyword);
1986 : }
1987 : }
1988 :
1989 8 : closest_match = getClosestMatch(&match_state);
1990 8 : ereport(ERROR,
1991 : (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
1992 : errmsg("invalid option \"%s\"", def->defname),
1993 : has_valid_options ? closest_match ?
1994 : errhint("Perhaps you meant the option \"%s\".",
1995 : closest_match) : 0 :
1996 : errhint("There are no valid options in this context.")));
1997 : }
1998 : }
1999 :
2000 11 : PG_RETURN_VOID();
2001 : }
2002 :
2003 :
2004 : /*************************************************************
2005 : * internal functions
2006 : */
2007 :
2008 :
2009 : /*
2010 : * get_pkey_attnames
2011 : *
2012 : * Get the primary key attnames for the given relation.
2013 : * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2014 : */
2015 : static char **
2016 3 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2017 : {
2018 : Relation indexRelation;
2019 : ScanKeyData skey;
2020 : SysScanDesc scan;
2021 : HeapTuple indexTuple;
2022 : int i;
2023 3 : char **result = NULL;
2024 : TupleDesc tupdesc;
2025 :
2026 : /* initialize indnkeyatts to 0 in case no primary key exists */
2027 3 : *indnkeyatts = 0;
2028 :
2029 3 : tupdesc = rel->rd_att;
2030 :
2031 : /* Prepare to scan pg_index for entries having indrelid = this rel. */
2032 3 : indexRelation = table_open(IndexRelationId, AccessShareLock);
2033 3 : ScanKeyInit(&skey,
2034 : Anum_pg_index_indrelid,
2035 : BTEqualStrategyNumber, F_OIDEQ,
2036 : ObjectIdGetDatum(RelationGetRelid(rel)));
2037 :
2038 3 : scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2039 : NULL, 1, &skey);
2040 :
2041 3 : while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2042 : {
2043 3 : Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2044 :
2045 : /* we're only interested if it is the primary key */
2046 3 : if (index->indisprimary)
2047 : {
2048 3 : *indnkeyatts = index->indnkeyatts;
2049 3 : if (*indnkeyatts > 0)
2050 : {
2051 3 : result = palloc_array(char *, *indnkeyatts);
2052 :
2053 9 : for (i = 0; i < *indnkeyatts; i++)
2054 6 : result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2055 : }
2056 3 : break;
2057 : }
2058 : }
2059 :
2060 3 : systable_endscan(scan);
2061 3 : table_close(indexRelation, AccessShareLock);
2062 :
2063 3 : return result;
2064 : }
2065 :
2066 : /*
2067 : * Deconstruct a text[] into C-strings (note any NULL elements will be
2068 : * returned as NULL pointers)
2069 : */
2070 : static char **
2071 20 : get_text_array_contents(ArrayType *array, int *numitems)
2072 : {
2073 20 : int ndim = ARR_NDIM(array);
2074 20 : int *dims = ARR_DIMS(array);
2075 : int nitems;
2076 : int16 typlen;
2077 : bool typbyval;
2078 : char typalign;
2079 : uint8 typalignby;
2080 : char **values;
2081 : char *ptr;
2082 : uint8 *bitmap;
2083 : int bitmask;
2084 : int i;
2085 :
2086 : Assert(ARR_ELEMTYPE(array) == TEXTOID);
2087 :
2088 20 : *numitems = nitems = ArrayGetNItems(ndim, dims);
2089 :
2090 20 : get_typlenbyvalalign(ARR_ELEMTYPE(array),
2091 : &typlen, &typbyval, &typalign);
2092 20 : typalignby = typalign_to_alignby(typalign);
2093 :
2094 20 : values = palloc_array(char *, nitems);
2095 :
2096 20 : ptr = ARR_DATA_PTR(array);
2097 20 : bitmap = ARR_NULLBITMAP(array);
2098 20 : bitmask = 1;
2099 :
2100 55 : for (i = 0; i < nitems; i++)
2101 : {
2102 35 : if (bitmap && (*bitmap & bitmask) == 0)
2103 : {
2104 0 : values[i] = NULL;
2105 : }
2106 : else
2107 : {
2108 35 : values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2109 35 : ptr = att_addlength_pointer(ptr, typlen, ptr);
2110 35 : ptr = (char *) att_nominal_alignby(ptr, typalignby);
2111 : }
2112 :
2113 : /* advance bitmap pointer if any */
2114 35 : if (bitmap)
2115 : {
2116 0 : bitmask <<= 1;
2117 0 : if (bitmask == 0x100)
2118 : {
2119 0 : bitmap++;
2120 0 : bitmask = 1;
2121 : }
2122 : }
2123 : }
2124 :
2125 20 : return values;
2126 : }
2127 :
2128 : static char *
2129 4 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2130 : {
2131 : char *relname;
2132 : HeapTuple tuple;
2133 : TupleDesc tupdesc;
2134 : int natts;
2135 : StringInfoData buf;
2136 : char *val;
2137 : int key;
2138 : int i;
2139 : bool needComma;
2140 :
2141 4 : initStringInfo(&buf);
2142 :
2143 : /* get relation name including any needed schema prefix and quoting */
2144 4 : relname = generate_relation_name(rel);
2145 :
2146 4 : tupdesc = rel->rd_att;
2147 4 : natts = tupdesc->natts;
2148 :
2149 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2150 4 : if (!tuple)
2151 0 : ereport(ERROR,
2152 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2153 : errmsg("source row not found")));
2154 :
2155 4 : appendStringInfo(&buf, "INSERT INTO %s(", relname);
2156 :
2157 4 : needComma = false;
2158 19 : for (i = 0; i < natts; i++)
2159 : {
2160 15 : Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2161 :
2162 15 : if (att->attisdropped)
2163 2 : continue;
2164 :
2165 13 : if (needComma)
2166 9 : appendStringInfoChar(&buf, ',');
2167 :
2168 13 : appendStringInfoString(&buf,
2169 13 : quote_ident_cstr(NameStr(att->attname)));
2170 13 : needComma = true;
2171 : }
2172 :
2173 4 : appendStringInfoString(&buf, ") VALUES(");
2174 :
2175 : /*
2176 : * Note: i is physical column number (counting from 0).
2177 : */
2178 4 : needComma = false;
2179 19 : for (i = 0; i < natts; i++)
2180 : {
2181 15 : if (TupleDescAttr(tupdesc, i)->attisdropped)
2182 2 : continue;
2183 :
2184 13 : if (needComma)
2185 9 : appendStringInfoChar(&buf, ',');
2186 :
2187 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2188 :
2189 13 : if (key >= 0)
2190 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2191 : else
2192 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2193 :
2194 13 : if (val != NULL)
2195 : {
2196 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
2197 13 : pfree(val);
2198 : }
2199 : else
2200 0 : appendStringInfoString(&buf, "NULL");
2201 13 : needComma = true;
2202 : }
2203 4 : appendStringInfoChar(&buf, ')');
2204 :
2205 4 : return buf.data;
2206 : }
2207 :
2208 : static char *
2209 4 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2210 : {
2211 : char *relname;
2212 : TupleDesc tupdesc;
2213 : StringInfoData buf;
2214 : int i;
2215 :
2216 4 : initStringInfo(&buf);
2217 :
2218 : /* get relation name including any needed schema prefix and quoting */
2219 4 : relname = generate_relation_name(rel);
2220 :
2221 4 : tupdesc = rel->rd_att;
2222 :
2223 4 : appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2224 11 : for (i = 0; i < pknumatts; i++)
2225 : {
2226 7 : int pkattnum = pkattnums[i];
2227 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2228 :
2229 7 : if (i > 0)
2230 3 : appendStringInfoString(&buf, " AND ");
2231 :
2232 7 : appendStringInfoString(&buf,
2233 7 : quote_ident_cstr(NameStr(attr->attname)));
2234 :
2235 7 : if (tgt_pkattvals[i] != NULL)
2236 7 : appendStringInfo(&buf, " = %s",
2237 7 : quote_literal_cstr(tgt_pkattvals[i]));
2238 : else
2239 0 : appendStringInfoString(&buf, " IS NULL");
2240 : }
2241 :
2242 4 : return buf.data;
2243 : }
2244 :
2245 : static char *
2246 4 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2247 : {
2248 : char *relname;
2249 : HeapTuple tuple;
2250 : TupleDesc tupdesc;
2251 : int natts;
2252 : StringInfoData buf;
2253 : char *val;
2254 : int key;
2255 : int i;
2256 : bool needComma;
2257 :
2258 4 : initStringInfo(&buf);
2259 :
2260 : /* get relation name including any needed schema prefix and quoting */
2261 4 : relname = generate_relation_name(rel);
2262 :
2263 4 : tupdesc = rel->rd_att;
2264 4 : natts = tupdesc->natts;
2265 :
2266 4 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2267 4 : if (!tuple)
2268 0 : ereport(ERROR,
2269 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2270 : errmsg("source row not found")));
2271 :
2272 4 : appendStringInfo(&buf, "UPDATE %s SET ", relname);
2273 :
2274 : /*
2275 : * Note: i is physical column number (counting from 0).
2276 : */
2277 4 : needComma = false;
2278 19 : for (i = 0; i < natts; i++)
2279 : {
2280 15 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2281 :
2282 15 : if (attr->attisdropped)
2283 2 : continue;
2284 :
2285 13 : if (needComma)
2286 9 : appendStringInfoString(&buf, ", ");
2287 :
2288 13 : appendStringInfo(&buf, "%s = ",
2289 13 : quote_ident_cstr(NameStr(attr->attname)));
2290 :
2291 13 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2292 :
2293 13 : if (key >= 0)
2294 7 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2295 : else
2296 6 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2297 :
2298 13 : if (val != NULL)
2299 : {
2300 13 : appendStringInfoString(&buf, quote_literal_cstr(val));
2301 13 : pfree(val);
2302 : }
2303 : else
2304 0 : appendStringInfoString(&buf, "NULL");
2305 13 : needComma = true;
2306 : }
2307 :
2308 4 : appendStringInfoString(&buf, " WHERE ");
2309 :
2310 11 : for (i = 0; i < pknumatts; i++)
2311 : {
2312 7 : int pkattnum = pkattnums[i];
2313 7 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2314 :
2315 7 : if (i > 0)
2316 3 : appendStringInfoString(&buf, " AND ");
2317 :
2318 7 : appendStringInfoString(&buf,
2319 7 : quote_ident_cstr(NameStr(attr->attname)));
2320 :
2321 7 : val = tgt_pkattvals[i];
2322 :
2323 7 : if (val != NULL)
2324 7 : appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2325 : else
2326 0 : appendStringInfoString(&buf, " IS NULL");
2327 : }
2328 :
2329 4 : return buf.data;
2330 : }
2331 :
2332 : /*
2333 : * Return a properly quoted identifier.
2334 : * Uses quote_ident in quote.c
2335 : */
2336 : static char *
2337 80 : quote_ident_cstr(char *rawstr)
2338 : {
2339 : text *rawstr_text;
2340 : text *result_text;
2341 : char *result;
2342 :
2343 80 : rawstr_text = cstring_to_text(rawstr);
2344 80 : result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2345 : PointerGetDatum(rawstr_text)));
2346 80 : result = text_to_cstring(result_text);
2347 :
2348 80 : return result;
2349 : }
2350 :
2351 : static int
2352 26 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2353 : {
2354 : int i;
2355 :
2356 : /*
2357 : * Not likely a long list anyway, so just scan for the value
2358 : */
2359 50 : for (i = 0; i < pknumatts; i++)
2360 38 : if (key == pkattnums[i])
2361 14 : return i;
2362 :
2363 12 : return -1;
2364 : }
2365 :
2366 : static HeapTuple
2367 8 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2368 : {
2369 : char *relname;
2370 : TupleDesc tupdesc;
2371 : int natts;
2372 : StringInfoData buf;
2373 : int ret;
2374 : HeapTuple tuple;
2375 : int i;
2376 :
2377 : /*
2378 : * Connect to SPI manager
2379 : */
2380 8 : SPI_connect();
2381 :
2382 8 : initStringInfo(&buf);
2383 :
2384 : /* get relation name including any needed schema prefix and quoting */
2385 8 : relname = generate_relation_name(rel);
2386 :
2387 8 : tupdesc = rel->rd_att;
2388 8 : natts = tupdesc->natts;
2389 :
2390 : /*
2391 : * Build sql statement to look up tuple of interest, ie, the one matching
2392 : * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2393 : * generate a result tuple that matches the table's physical structure,
2394 : * with NULLs for any dropped columns. Otherwise we have to deal with two
2395 : * different tupdescs and everything's very confusing.
2396 : */
2397 8 : appendStringInfoString(&buf, "SELECT ");
2398 :
2399 38 : for (i = 0; i < natts; i++)
2400 : {
2401 30 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2402 :
2403 30 : if (i > 0)
2404 22 : appendStringInfoString(&buf, ", ");
2405 :
2406 30 : if (attr->attisdropped)
2407 4 : appendStringInfoString(&buf, "NULL");
2408 : else
2409 26 : appendStringInfoString(&buf,
2410 26 : quote_ident_cstr(NameStr(attr->attname)));
2411 : }
2412 :
2413 8 : appendStringInfo(&buf, " FROM %s WHERE ", relname);
2414 :
2415 22 : for (i = 0; i < pknumatts; i++)
2416 : {
2417 14 : int pkattnum = pkattnums[i];
2418 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2419 :
2420 14 : if (i > 0)
2421 6 : appendStringInfoString(&buf, " AND ");
2422 :
2423 14 : appendStringInfoString(&buf,
2424 14 : quote_ident_cstr(NameStr(attr->attname)));
2425 :
2426 14 : if (src_pkattvals[i] != NULL)
2427 14 : appendStringInfo(&buf, " = %s",
2428 14 : quote_literal_cstr(src_pkattvals[i]));
2429 : else
2430 0 : appendStringInfoString(&buf, " IS NULL");
2431 : }
2432 :
2433 : /*
2434 : * Retrieve the desired tuple
2435 : */
2436 8 : ret = SPI_exec(buf.data, 0);
2437 8 : pfree(buf.data);
2438 :
2439 : /*
2440 : * Only allow one qualifying tuple
2441 : */
2442 8 : if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2443 0 : ereport(ERROR,
2444 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2445 : errmsg("source criteria matched more than one record")));
2446 :
2447 8 : else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2448 : {
2449 8 : SPITupleTable *tuptable = SPI_tuptable;
2450 :
2451 8 : tuple = SPI_copytuple(tuptable->vals[0]);
2452 8 : SPI_finish();
2453 :
2454 8 : return tuple;
2455 : }
2456 : else
2457 : {
2458 : /*
2459 : * no qualifying tuples
2460 : */
2461 0 : SPI_finish();
2462 :
2463 0 : return NULL;
2464 : }
2465 :
2466 : /*
2467 : * never reached, but keep compiler quiet
2468 : */
2469 : return NULL;
2470 : }
2471 :
2472 : static void
2473 21 : RangeVarCallbackForDblink(const RangeVar *relation,
2474 : Oid relId, Oid oldRelId, void *arg)
2475 : {
2476 : AclResult aclresult;
2477 :
2478 21 : if (!OidIsValid(relId))
2479 0 : return;
2480 :
2481 21 : aclresult = pg_class_aclcheck(relId, GetUserId(), *((AclMode *) arg));
2482 21 : if (aclresult != ACLCHECK_OK)
2483 0 : aclcheck_error(aclresult, get_relkind_objtype(get_rel_relkind(relId)),
2484 0 : relation->relname);
2485 : }
2486 :
2487 : /*
2488 : * Open the relation named by relname_text, acquire specified type of lock,
2489 : * verify we have specified permissions.
2490 : * Caller must close rel when done with it.
2491 : */
2492 : static Relation
2493 21 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2494 : {
2495 : RangeVar *relvar;
2496 : Oid relid;
2497 :
2498 21 : relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2499 21 : relid = RangeVarGetRelidExtended(relvar, lockmode, 0,
2500 : RangeVarCallbackForDblink, &aclmode);
2501 :
2502 21 : return table_open(relid, NoLock);
2503 : }
2504 :
2505 : /*
2506 : * generate_relation_name - copied from ruleutils.c
2507 : * Compute the name to display for a relation
2508 : *
2509 : * The result includes all necessary quoting and schema-prefixing.
2510 : */
2511 : static char *
2512 20 : generate_relation_name(Relation rel)
2513 : {
2514 : char *nspname;
2515 : char *result;
2516 :
2517 : /* Qualify the name if not visible in search path */
2518 20 : if (RelationIsVisible(RelationGetRelid(rel)))
2519 15 : nspname = NULL;
2520 : else
2521 5 : nspname = get_namespace_name(rel->rd_rel->relnamespace);
2522 :
2523 20 : result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2524 :
2525 20 : return result;
2526 : }
2527 :
2528 :
2529 : static remoteConn *
2530 79 : getConnectionByName(const char *name)
2531 : {
2532 : remoteConnHashEnt *hentry;
2533 : char *key;
2534 :
2535 79 : if (!remoteConnHash)
2536 6 : remoteConnHash = createConnHash();
2537 :
2538 79 : key = pstrdup(name);
2539 79 : truncate_identifier(key, strlen(key), false);
2540 79 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2541 : key, HASH_FIND, NULL);
2542 :
2543 79 : if (hentry && hentry->rconn.conn != NULL)
2544 68 : return &hentry->rconn;
2545 :
2546 11 : return NULL;
2547 : }
2548 :
2549 : static HTAB *
2550 7 : createConnHash(void)
2551 : {
2552 : HASHCTL ctl;
2553 :
2554 7 : ctl.keysize = NAMEDATALEN;
2555 7 : ctl.entrysize = sizeof(remoteConnHashEnt);
2556 :
2557 7 : return hash_create("Remote Con hash", NUMCONN, &ctl,
2558 : HASH_ELEM | HASH_STRINGS);
2559 : }
2560 :
2561 : static remoteConn *
2562 10 : createNewConnection(const char *name)
2563 : {
2564 : remoteConnHashEnt *hentry;
2565 : bool found;
2566 : char *key;
2567 :
2568 10 : if (!remoteConnHash)
2569 1 : remoteConnHash = createConnHash();
2570 :
2571 10 : key = pstrdup(name);
2572 10 : truncate_identifier(key, strlen(key), true);
2573 10 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2574 : HASH_ENTER, &found);
2575 :
2576 10 : if (found && hentry->rconn.conn != NULL)
2577 1 : ereport(ERROR,
2578 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2579 : errmsg("duplicate connection name")));
2580 :
2581 : /* New, or reusable, so initialize the rconn struct to zeroes */
2582 9 : memset(&hentry->rconn, 0, sizeof(remoteConn));
2583 :
2584 9 : return &hentry->rconn;
2585 : }
2586 :
2587 : static void
2588 8 : deleteConnection(const char *name)
2589 : {
2590 : remoteConnHashEnt *hentry;
2591 : bool found;
2592 : char *key;
2593 :
2594 8 : if (!remoteConnHash)
2595 0 : remoteConnHash = createConnHash();
2596 :
2597 8 : key = pstrdup(name);
2598 8 : truncate_identifier(key, strlen(key), false);
2599 8 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2600 : key, HASH_REMOVE, &found);
2601 :
2602 8 : if (!hentry)
2603 0 : ereport(ERROR,
2604 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2605 : errmsg("undefined connection name")));
2606 8 : }
2607 :
2608 : /*
2609 : * Ensure that require_auth and SCRAM keys are correctly set on connstr.
2610 : * SCRAM keys used to pass-through are coming from the initial connection
2611 : * from the client with the server.
2612 : *
2613 : * All required SCRAM options are set by dblink, so we just need to ensure
2614 : * that these options are not overwritten by the user.
2615 : *
2616 : * See appendSCRAMKeysInfo and its usage for more.
2617 : */
2618 : bool
2619 6 : dblink_connstr_has_required_scram_options(const char *connstr)
2620 : {
2621 : PQconninfoOption *options;
2622 6 : bool has_scram_server_key = false;
2623 6 : bool has_scram_client_key = false;
2624 6 : bool has_require_auth = false;
2625 6 : bool has_scram_keys = false;
2626 :
2627 6 : options = PQconninfoParse(connstr, NULL);
2628 6 : if (options)
2629 : {
2630 : /*
2631 : * Continue iterating even if we found the keys that we need to
2632 : * validate to make sure that there is no other declaration of these
2633 : * keys that can overwrite the first.
2634 : */
2635 318 : for (PQconninfoOption *option = options; option->keyword != NULL; option++)
2636 : {
2637 312 : if (strcmp(option->keyword, "require_auth") == 0)
2638 : {
2639 6 : if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
2640 5 : has_require_auth = true;
2641 : else
2642 1 : has_require_auth = false;
2643 : }
2644 :
2645 312 : if (strcmp(option->keyword, "scram_client_key") == 0)
2646 : {
2647 6 : if (option->val != NULL && option->val[0] != '\0')
2648 6 : has_scram_client_key = true;
2649 : else
2650 0 : has_scram_client_key = false;
2651 : }
2652 :
2653 312 : if (strcmp(option->keyword, "scram_server_key") == 0)
2654 : {
2655 6 : if (option->val != NULL && option->val[0] != '\0')
2656 6 : has_scram_server_key = true;
2657 : else
2658 0 : has_scram_server_key = false;
2659 : }
2660 : }
2661 6 : PQconninfoFree(options);
2662 : }
2663 :
2664 6 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort != NULL && MyProcPort->has_scram_keys;
2665 :
2666 6 : return (has_scram_keys && has_require_auth);
2667 : }
2668 :
2669 : /*
2670 : * We need to make sure that the connection made used credentials
2671 : * which were provided by the user, so check what credentials were
2672 : * used to connect and then make sure that they came from the user.
2673 : *
2674 : * On failure, we close "conn" and also delete the hashtable entry
2675 : * identified by "connname" (if that's not NULL).
2676 : */
2677 : static void
2678 20 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
2679 : {
2680 : /* Superuser bypasses security check */
2681 20 : if (superuser())
2682 18 : return;
2683 :
2684 : /* If password was used to connect, make sure it was one provided */
2685 2 : if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
2686 0 : return;
2687 :
2688 : /*
2689 : * Password was not used to connect, check if SCRAM pass-through is in
2690 : * use.
2691 : *
2692 : * If dblink_connstr_has_required_scram_options is true we assume that
2693 : * UseScramPassthrough is also true because the required SCRAM keys are
2694 : * only added if UseScramPassthrough is set, and the user is not allowed
2695 : * to add the SCRAM keys on fdw and user mapping options.
2696 : */
2697 2 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2698 2 : return;
2699 :
2700 : #ifdef ENABLE_GSS
2701 : /* If GSSAPI creds used to connect, make sure it was one delegated */
2702 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
2703 : return;
2704 : #endif
2705 :
2706 : /* Otherwise, fail out */
2707 0 : libpqsrv_disconnect(conn);
2708 0 : if (connname)
2709 0 : deleteConnection(connname);
2710 :
2711 0 : ereport(ERROR,
2712 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2713 : errmsg("password or GSSAPI delegated credentials required"),
2714 : errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2715 : errhint("Ensure provided credentials match target server's authentication method.")));
2716 : }
2717 :
2718 : /*
2719 : * Function to check if the connection string includes an explicit
2720 : * password, needed to ensure that non-superuser password-based auth
2721 : * is using a provided password and not one picked up from the
2722 : * environment.
2723 : */
2724 : static bool
2725 7 : dblink_connstr_has_pw(const char *connstr)
2726 : {
2727 : PQconninfoOption *options;
2728 : PQconninfoOption *option;
2729 7 : bool connstr_gives_password = false;
2730 :
2731 7 : options = PQconninfoParse(connstr, NULL);
2732 7 : if (options)
2733 : {
2734 371 : for (option = options; option->keyword != NULL; option++)
2735 : {
2736 364 : if (strcmp(option->keyword, "password") == 0)
2737 : {
2738 7 : if (option->val != NULL && option->val[0] != '\0')
2739 : {
2740 0 : connstr_gives_password = true;
2741 0 : break;
2742 : }
2743 : }
2744 : }
2745 7 : PQconninfoFree(options);
2746 : }
2747 :
2748 7 : return connstr_gives_password;
2749 : }
2750 :
2751 : /*
2752 : * For non-superusers, insist that the connstr specify a password, except if
2753 : * GSSAPI credentials have been delegated (and we check that they are used for
2754 : * the connection in dblink_security_check later) or if SCRAM pass-through is
2755 : * being used. This prevents a password or GSSAPI credentials from being
2756 : * picked up from .pgpass, a service file, the environment, etc. We don't want
2757 : * the postgres user's passwords or Kerberos credentials to be accessible to
2758 : * non-superusers. In case of SCRAM pass-through insist that the connstr
2759 : * has the required SCRAM pass-through options.
2760 : */
2761 : static void
2762 26 : dblink_connstr_check(const char *connstr)
2763 : {
2764 26 : if (superuser())
2765 21 : return;
2766 :
2767 5 : if (dblink_connstr_has_pw(connstr))
2768 0 : return;
2769 :
2770 5 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2771 3 : return;
2772 :
2773 : #ifdef ENABLE_GSS
2774 : if (be_gssapi_get_delegation(MyProcPort))
2775 : return;
2776 : #endif
2777 :
2778 2 : ereport(ERROR,
2779 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2780 : errmsg("password or GSSAPI delegated credentials required"),
2781 : errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2782 : }
2783 :
2784 : /*
2785 : * Report an error received from the remote server
2786 : *
2787 : * res: the received error result
2788 : * fail: true for ERROR ereport, false for NOTICE
2789 : * fmt and following args: sprintf-style format and values for errcontext;
2790 : * the resulting string should be worded like "while <some action>"
2791 : *
2792 : * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
2793 : * in which case memory context cleanup will clear it eventually).
2794 : */
2795 : static void
2796 12 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2797 : bool fail, const char *fmt, ...)
2798 : {
2799 : int level;
2800 12 : char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2801 12 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2802 12 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2803 12 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2804 12 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2805 : int sqlstate;
2806 : va_list ap;
2807 : char dblink_context_msg[512];
2808 :
2809 12 : if (fail)
2810 3 : level = ERROR;
2811 : else
2812 9 : level = NOTICE;
2813 :
2814 12 : if (pg_diag_sqlstate)
2815 12 : sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2816 : pg_diag_sqlstate[1],
2817 : pg_diag_sqlstate[2],
2818 : pg_diag_sqlstate[3],
2819 : pg_diag_sqlstate[4]);
2820 : else
2821 0 : sqlstate = ERRCODE_CONNECTION_FAILURE;
2822 :
2823 : /*
2824 : * If we don't get a message from the PGresult, try the PGconn. This is
2825 : * needed because for connection-level failures, PQgetResult may just
2826 : * return NULL, not a PGresult at all.
2827 : */
2828 12 : if (message_primary == NULL)
2829 0 : message_primary = pchomp(PQerrorMessage(conn));
2830 :
2831 : /*
2832 : * Format the basic errcontext string. Below, we'll add on something
2833 : * about the connection name. That's a violation of the translatability
2834 : * guidelines about constructing error messages out of parts, but since
2835 : * there's no translation support for dblink, there's no need to worry
2836 : * about that (yet).
2837 : */
2838 12 : va_start(ap, fmt);
2839 12 : vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2840 12 : va_end(ap);
2841 :
2842 12 : ereport(level,
2843 : (errcode(sqlstate),
2844 : (message_primary != NULL && message_primary[0] != '\0') ?
2845 : errmsg_internal("%s", message_primary) :
2846 : errmsg("could not obtain message string for remote error"),
2847 : message_detail ? errdetail_internal("%s", message_detail) : 0,
2848 : message_hint ? errhint("%s", message_hint) : 0,
2849 : message_context ? (errcontext("%s", message_context)) : 0,
2850 : conname ?
2851 : (errcontext("%s on dblink connection named \"%s\"",
2852 : dblink_context_msg, conname)) :
2853 : (errcontext("%s on unnamed dblink connection",
2854 : dblink_context_msg))));
2855 9 : PQclear(res);
2856 9 : }
2857 :
2858 : /*
2859 : * Obtain connection string for a foreign server
2860 : */
2861 : static char *
2862 26 : get_connect_string(const char *servername)
2863 : {
2864 26 : ForeignServer *foreign_server = NULL;
2865 : UserMapping *user_mapping;
2866 : ListCell *cell;
2867 : StringInfoData buf;
2868 : ForeignDataWrapper *fdw;
2869 : AclResult aclresult;
2870 : char *srvname;
2871 :
2872 : static const PQconninfoOption *options = NULL;
2873 :
2874 26 : initStringInfo(&buf);
2875 :
2876 : /*
2877 : * Get list of valid libpq options.
2878 : *
2879 : * To avoid unnecessary work, we get the list once and use it throughout
2880 : * the lifetime of this backend process. We don't need to care about
2881 : * memory context issues, because PQconndefaults allocates with malloc.
2882 : */
2883 26 : if (!options)
2884 : {
2885 7 : options = PQconndefaults();
2886 7 : if (!options) /* assume reason for failure is OOM */
2887 0 : ereport(ERROR,
2888 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2889 : errmsg("out of memory"),
2890 : errdetail("Could not get libpq's default connection options.")));
2891 : }
2892 :
2893 : /* first gather the server connstr options */
2894 26 : srvname = pstrdup(servername);
2895 26 : truncate_identifier(srvname, strlen(srvname), false);
2896 26 : foreign_server = GetForeignServerByName(srvname, true);
2897 :
2898 26 : if (foreign_server)
2899 : {
2900 6 : Oid serverid = foreign_server->serverid;
2901 6 : Oid fdwid = foreign_server->fdwid;
2902 6 : Oid userid = GetUserId();
2903 :
2904 6 : user_mapping = GetUserMapping(userid, serverid);
2905 6 : fdw = GetForeignDataWrapper(fdwid);
2906 :
2907 : /* Check permissions, user must have usage on the server. */
2908 6 : aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2909 6 : if (aclresult != ACLCHECK_OK)
2910 0 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2911 :
2912 : /*
2913 : * First append hardcoded options needed for SCRAM pass-through, so if
2914 : * the user overwrites these options we can ereport on
2915 : * dblink_connstr_check and dblink_security_check.
2916 : */
2917 6 : if (MyProcPort != NULL && MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
2918 4 : appendSCRAMKeysInfo(&buf);
2919 :
2920 6 : foreach(cell, fdw->options)
2921 : {
2922 0 : DefElem *def = lfirst(cell);
2923 :
2924 0 : if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2925 0 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2926 0 : escape_param_str(strVal(def->arg)));
2927 : }
2928 :
2929 27 : foreach(cell, foreign_server->options)
2930 : {
2931 21 : DefElem *def = lfirst(cell);
2932 :
2933 21 : if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2934 17 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2935 17 : escape_param_str(strVal(def->arg)));
2936 : }
2937 :
2938 12 : foreach(cell, user_mapping->options)
2939 : {
2940 :
2941 6 : DefElem *def = lfirst(cell);
2942 :
2943 6 : if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2944 6 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2945 6 : escape_param_str(strVal(def->arg)));
2946 : }
2947 :
2948 6 : return buf.data;
2949 : }
2950 : else
2951 20 : return NULL;
2952 : }
2953 :
2954 : /*
2955 : * Escaping libpq connect parameter strings.
2956 : *
2957 : * Replaces "'" with "\'" and "\" with "\\".
2958 : */
2959 : static char *
2960 23 : escape_param_str(const char *str)
2961 : {
2962 : const char *cp;
2963 : StringInfoData buf;
2964 :
2965 23 : initStringInfo(&buf);
2966 :
2967 205 : for (cp = str; *cp; cp++)
2968 : {
2969 182 : if (*cp == '\\' || *cp == '\'')
2970 0 : appendStringInfoChar(&buf, '\\');
2971 182 : appendStringInfoChar(&buf, *cp);
2972 : }
2973 :
2974 23 : return buf.data;
2975 : }
2976 :
2977 : /*
2978 : * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2979 : * functions, and translate to the internal representation.
2980 : *
2981 : * The user supplies an int2vector of 1-based logical attnums, plus a count
2982 : * argument (the need for the separate count argument is historical, but we
2983 : * still check it). We check that each attnum corresponds to a valid,
2984 : * non-dropped attribute of the rel. We do *not* prevent attnums from being
2985 : * listed twice, though the actual use-case for such things is dubious.
2986 : * Note that before Postgres 9.0, the user's attnums were interpreted as
2987 : * physical not logical column numbers; this was changed for future-proofing.
2988 : *
2989 : * The internal representation is a palloc'd int array of 0-based physical
2990 : * attnums.
2991 : */
2992 : static void
2993 18 : validate_pkattnums(Relation rel,
2994 : int2vector *pkattnums_arg, int32 pknumatts_arg,
2995 : int **pkattnums, int *pknumatts)
2996 : {
2997 18 : TupleDesc tupdesc = rel->rd_att;
2998 18 : int natts = tupdesc->natts;
2999 : int i;
3000 :
3001 : /* Don't take more array elements than there are */
3002 18 : pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
3003 :
3004 : /* Must have at least one pk attnum selected */
3005 18 : if (pknumatts_arg <= 0)
3006 0 : ereport(ERROR,
3007 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3008 : errmsg("number of key attributes must be > 0")));
3009 :
3010 : /* Allocate output array */
3011 18 : *pkattnums = palloc_array(int, pknumatts_arg);
3012 18 : *pknumatts = pknumatts_arg;
3013 :
3014 : /* Validate attnums and convert to internal form */
3015 57 : for (i = 0; i < pknumatts_arg; i++)
3016 : {
3017 45 : int pkattnum = pkattnums_arg->values[i];
3018 : int lnum;
3019 : int j;
3020 :
3021 : /* Can throw error immediately if out of range */
3022 45 : if (pkattnum <= 0 || pkattnum > natts)
3023 6 : ereport(ERROR,
3024 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3025 : errmsg("invalid attribute number %d", pkattnum)));
3026 :
3027 : /* Identify which physical column has this logical number */
3028 39 : lnum = 0;
3029 69 : for (j = 0; j < natts; j++)
3030 : {
3031 : /* dropped columns don't count */
3032 69 : if (TupleDescCompactAttr(tupdesc, j)->attisdropped)
3033 3 : continue;
3034 :
3035 66 : if (++lnum == pkattnum)
3036 39 : break;
3037 : }
3038 :
3039 39 : if (j < natts)
3040 39 : (*pkattnums)[i] = j;
3041 : else
3042 0 : ereport(ERROR,
3043 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3044 : errmsg("invalid attribute number %d", pkattnum)));
3045 : }
3046 12 : }
3047 :
3048 : /*
3049 : * Check if the specified connection option is valid.
3050 : *
3051 : * We basically allow whatever libpq thinks is an option, with these
3052 : * restrictions:
3053 : * debug options: disallowed
3054 : * "client_encoding": disallowed
3055 : * "user": valid only in USER MAPPING options
3056 : * secure options (eg password): valid only in USER MAPPING options
3057 : * others: valid only in FOREIGN SERVER options
3058 : *
3059 : * We disallow client_encoding because it would be overridden anyway via
3060 : * PQclientEncoding; allowing it to be specified would merely promote
3061 : * confusion.
3062 : */
3063 : static bool
3064 478 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3065 : Oid context)
3066 : {
3067 : const PQconninfoOption *opt;
3068 :
3069 : /* Look up the option in libpq result */
3070 12050 : for (opt = options; opt->keyword; opt++)
3071 : {
3072 12044 : if (strcmp(opt->keyword, option) == 0)
3073 472 : break;
3074 : }
3075 478 : if (opt->keyword == NULL)
3076 6 : return false;
3077 :
3078 : /* Disallow debug options (particularly "replication") */
3079 472 : if (strchr(opt->dispchar, 'D'))
3080 34 : return false;
3081 :
3082 : /* Disallow "client_encoding" */
3083 438 : if (strcmp(opt->keyword, "client_encoding") == 0)
3084 8 : return false;
3085 :
3086 : /*
3087 : * Disallow OAuth options for now, since the builtin flow communicates on
3088 : * stderr by default and can't cache tokens yet.
3089 : */
3090 430 : if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
3091 44 : return false;
3092 :
3093 : /*
3094 : * If the option is "user" or marked secure, it should be specified only
3095 : * in USER MAPPING. Others should be specified only in SERVER.
3096 : */
3097 386 : if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3098 : {
3099 38 : if (context != UserMappingRelationId)
3100 9 : return false;
3101 : }
3102 : else
3103 : {
3104 348 : if (context != ForeignServerRelationId)
3105 234 : return false;
3106 : }
3107 :
3108 143 : return true;
3109 : }
3110 :
3111 : /*
3112 : * Same as is_valid_dblink_option but also check for only dblink_fdw specific
3113 : * options.
3114 : */
3115 : static bool
3116 39 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
3117 : Oid context)
3118 : {
3119 39 : if (strcmp(option, "use_scram_passthrough") == 0)
3120 4 : return true;
3121 :
3122 35 : return is_valid_dblink_option(options, option, context);
3123 : }
3124 :
3125 : /*
3126 : * Copy the remote session's values of GUCs that affect datatype I/O
3127 : * and apply them locally in a new GUC nesting level. Returns the new
3128 : * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3129 : * or -1 if no new nestlevel was needed.
3130 : *
3131 : * We use the equivalent of a function SET option to allow the settings to
3132 : * persist only until the caller calls restoreLocalGucs. If an error is
3133 : * thrown in between, guc.c will take care of undoing the settings.
3134 : */
3135 : static int
3136 34 : applyRemoteGucs(PGconn *conn)
3137 : {
3138 : static const char *const GUCsAffectingIO[] = {
3139 : "DateStyle",
3140 : "IntervalStyle"
3141 : };
3142 :
3143 34 : int nestlevel = -1;
3144 : int i;
3145 :
3146 102 : for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3147 : {
3148 68 : const char *gucName = GUCsAffectingIO[i];
3149 68 : const char *remoteVal = PQparameterStatus(conn, gucName);
3150 : const char *localVal;
3151 :
3152 : /*
3153 : * If the remote server is pre-8.4, it won't have IntervalStyle, but
3154 : * that's okay because its output format won't be ambiguous. So just
3155 : * skip the GUC if we don't get a value for it. (We might eventually
3156 : * need more complicated logic with remote-version checks here.)
3157 : */
3158 68 : if (remoteVal == NULL)
3159 0 : continue;
3160 :
3161 : /*
3162 : * Avoid GUC-setting overhead if the remote and local GUCs already
3163 : * have the same value.
3164 : */
3165 68 : localVal = GetConfigOption(gucName, false, false);
3166 : Assert(localVal != NULL);
3167 :
3168 68 : if (strcmp(remoteVal, localVal) == 0)
3169 51 : continue;
3170 :
3171 : /* Create new GUC nest level if we didn't already */
3172 17 : if (nestlevel < 0)
3173 9 : nestlevel = NewGUCNestLevel();
3174 :
3175 : /* Apply the option (this will throw error on failure) */
3176 17 : (void) set_config_option(gucName, remoteVal,
3177 : PGC_USERSET, PGC_S_SESSION,
3178 : GUC_ACTION_SAVE, true, 0, false);
3179 : }
3180 :
3181 34 : return nestlevel;
3182 : }
3183 :
3184 : /*
3185 : * Restore local GUCs after they have been overlaid with remote settings.
3186 : */
3187 : static void
3188 35 : restoreLocalGucs(int nestlevel)
3189 : {
3190 : /* Do nothing if no new nestlevel was created */
3191 35 : if (nestlevel > 0)
3192 8 : AtEOXact_GUC(true, nestlevel);
3193 35 : }
3194 :
3195 : /*
3196 : * Append SCRAM client key and server key information from the global
3197 : * MyProcPort into the given StringInfo buffer.
3198 : */
3199 : static void
3200 4 : appendSCRAMKeysInfo(StringInfo buf)
3201 : {
3202 : int len;
3203 : int encoded_len;
3204 : char *client_key;
3205 : char *server_key;
3206 :
3207 4 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
3208 : /* don't forget the zero-terminator */
3209 4 : client_key = palloc0(len + 1);
3210 4 : encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
3211 : sizeof(MyProcPort->scram_ClientKey),
3212 : client_key, len);
3213 4 : if (encoded_len < 0)
3214 0 : elog(ERROR, "could not encode SCRAM client key");
3215 :
3216 4 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
3217 : /* don't forget the zero-terminator */
3218 4 : server_key = palloc0(len + 1);
3219 4 : encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
3220 : sizeof(MyProcPort->scram_ServerKey),
3221 : server_key, len);
3222 4 : if (encoded_len < 0)
3223 0 : elog(ERROR, "could not encode SCRAM server key");
3224 :
3225 4 : appendStringInfo(buf, "scram_client_key='%s' ", client_key);
3226 4 : appendStringInfo(buf, "scram_server_key='%s' ", server_key);
3227 4 : appendStringInfoString(buf, "require_auth='scram-sha-256' ");
3228 :
3229 4 : pfree(client_key);
3230 4 : pfree(server_key);
3231 4 : }
3232 :
3233 :
3234 : static bool
3235 4 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
3236 : {
3237 : ListCell *cell;
3238 :
3239 16 : foreach(cell, foreign_server->options)
3240 : {
3241 16 : DefElem *def = lfirst(cell);
3242 :
3243 16 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3244 4 : return defGetBoolean(def);
3245 : }
3246 :
3247 0 : foreach(cell, user->options)
3248 : {
3249 0 : DefElem *def = (DefElem *) lfirst(cell);
3250 :
3251 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3252 0 : return defGetBoolean(def);
3253 : }
3254 :
3255 0 : return false;
3256 : }
|