Line data Source code
1 : /*
2 : * dblink.c
3 : *
4 : * Functions returning results from a remote database
5 : *
6 : * Joe Conway <mail@joeconway.com>
7 : * And contributors:
8 : * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 : * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 : *
11 : * contrib/dblink/dblink.c
12 : * Copyright (c) 2001-2025, PostgreSQL Global Development Group
13 : * ALL RIGHTS RESERVED;
14 : *
15 : * Permission to use, copy, modify, and distribute this software and its
16 : * documentation for any purpose, without fee, and without a written agreement
17 : * is hereby granted, provided that the above copyright notice and this
18 : * paragraph and the following two paragraphs appear in all copies.
19 : *
20 : * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 : * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 : * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 : * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 : * POSSIBILITY OF SUCH DAMAGE.
25 : *
26 : * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 : * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 : * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 : * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 : * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 : *
32 : */
33 : #include "postgres.h"
34 :
35 : #include <limits.h>
36 :
37 : #include "access/htup_details.h"
38 : #include "access/relation.h"
39 : #include "access/reloptions.h"
40 : #include "access/table.h"
41 : #include "catalog/namespace.h"
42 : #include "catalog/pg_foreign_data_wrapper.h"
43 : #include "catalog/pg_foreign_server.h"
44 : #include "catalog/pg_type.h"
45 : #include "catalog/pg_user_mapping.h"
46 : #include "commands/defrem.h"
47 : #include "common/base64.h"
48 : #include "executor/spi.h"
49 : #include "foreign/foreign.h"
50 : #include "funcapi.h"
51 : #include "lib/stringinfo.h"
52 : #include "libpq-fe.h"
53 : #include "libpq/libpq-be.h"
54 : #include "libpq/libpq-be-fe-helpers.h"
55 : #include "mb/pg_wchar.h"
56 : #include "miscadmin.h"
57 : #include "parser/scansup.h"
58 : #include "utils/acl.h"
59 : #include "utils/builtins.h"
60 : #include "utils/fmgroids.h"
61 : #include "utils/guc.h"
62 : #include "utils/lsyscache.h"
63 : #include "utils/memutils.h"
64 : #include "utils/rel.h"
65 : #include "utils/varlena.h"
66 : #include "utils/wait_event.h"
67 :
68 32 : PG_MODULE_MAGIC_EXT(
69 : .name = "dblink",
70 : .version = PG_VERSION
71 : );
72 :
73 : typedef struct remoteConn
74 : {
75 : PGconn *conn; /* Hold the remote connection */
76 : int openCursorCount; /* The number of open cursors */
77 : bool newXactForCursor; /* Opened a transaction for a cursor */
78 : } remoteConn;
79 :
80 : typedef struct storeInfo
81 : {
82 : FunctionCallInfo fcinfo;
83 : Tuplestorestate *tuplestore;
84 : AttInMetadata *attinmeta;
85 : MemoryContext tmpcontext;
86 : char **cstrs;
87 : /* temp storage for results to avoid leaks on exception */
88 : PGresult *last_res;
89 : PGresult *cur_res;
90 : } storeInfo;
91 :
92 : /*
93 : * Internal declarations
94 : */
95 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
96 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
97 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
98 : PGresult *res);
99 : static void materializeQueryResult(FunctionCallInfo fcinfo,
100 : PGconn *conn,
101 : const char *conname,
102 : const char *sql,
103 : bool fail);
104 : static PGresult *storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql);
105 : static void storeRow(storeInfo *sinfo, PGresult *res, bool first);
106 : static remoteConn *getConnectionByName(const char *name);
107 : static HTAB *createConnHash(void);
108 : static remoteConn *createNewConnection(const char *name);
109 : static void deleteConnection(const char *name);
110 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
111 : static char **get_text_array_contents(ArrayType *array, int *numitems);
112 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
113 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
114 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
115 : static char *quote_ident_cstr(char *rawstr);
116 : static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
117 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
118 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
119 : static char *generate_relation_name(Relation rel);
120 : static void dblink_connstr_check(const char *connstr);
121 : static bool dblink_connstr_has_pw(const char *connstr);
122 : static void dblink_security_check(PGconn *conn, const char *connname,
123 : const char *connstr);
124 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
125 : bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
126 : static char *get_connect_string(const char *servername);
127 : static char *escape_param_str(const char *str);
128 : static void validate_pkattnums(Relation rel,
129 : int2vector *pkattnums_arg, int32 pknumatts_arg,
130 : int **pkattnums, int *pknumatts);
131 : static bool is_valid_dblink_option(const PQconninfoOption *options,
132 : const char *option, Oid context);
133 : static int applyRemoteGucs(PGconn *conn);
134 : static void restoreLocalGucs(int nestlevel);
135 : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
136 : static void appendSCRAMKeysInfo(StringInfo buf);
137 : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
138 : Oid context);
139 : static bool dblink_connstr_has_required_scram_options(const char *connstr);
140 :
141 : /* Global */
142 : static remoteConn *pconn = NULL;
143 : static HTAB *remoteConnHash = NULL;
144 :
145 : /* custom wait event values, retrieved from shared memory */
146 : static uint32 dblink_we_connect = 0;
147 : static uint32 dblink_we_get_conn = 0;
148 : static uint32 dblink_we_get_result = 0;
149 :
150 : /*
151 : * Following is hash that holds multiple remote connections.
152 : * Calling convention of each dblink function changes to accept
153 : * connection name as the first parameter. The connection hash is
154 : * much like ecpg e.g. a mapping between a name and a PGconn object.
155 : *
156 : * To avoid potentially leaking a PGconn object in case of out-of-memory
157 : * errors, we first create the hash entry, then open the PGconn.
158 : * Hence, a hash entry whose rconn.conn pointer is NULL must be
159 : * understood as a leftover from a failed create; it should be ignored
160 : * by lookup operations, and silently replaced by create operations.
161 : */
162 :
163 : typedef struct remoteConnHashEnt
164 : {
165 : char name[NAMEDATALEN];
166 : remoteConn rconn;
167 : } remoteConnHashEnt;
168 :
169 : /* initial number of connection hashes */
170 : #define NUMCONN 16
171 :
172 : pg_noreturn static void
173 0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
174 : {
175 0 : char *msg = pchomp(PQerrorMessage(conn));
176 :
177 0 : PQclear(res);
178 0 : elog(ERROR, "%s: %s", p2, msg);
179 : }
180 :
181 : pg_noreturn static void
182 6 : dblink_conn_not_avail(const char *conname)
183 : {
184 6 : if (conname)
185 2 : ereport(ERROR,
186 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
187 : errmsg("connection \"%s\" not available", conname)));
188 : else
189 4 : ereport(ERROR,
190 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
191 : errmsg("connection not available")));
192 : }
193 :
194 : static void
195 72 : dblink_get_conn(char *conname_or_str,
196 : PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
197 : {
198 72 : remoteConn *rconn = getConnectionByName(conname_or_str);
199 : PGconn *conn;
200 : char *conname;
201 : bool freeconn;
202 :
203 72 : if (rconn)
204 : {
205 54 : conn = rconn->conn;
206 54 : conname = conname_or_str;
207 54 : freeconn = false;
208 : }
209 : else
210 : {
211 : const char *connstr;
212 :
213 18 : connstr = get_connect_string(conname_or_str);
214 18 : if (connstr == NULL)
215 12 : connstr = conname_or_str;
216 18 : dblink_connstr_check(connstr);
217 :
218 : /* first time, allocate or get the custom wait event */
219 16 : if (dblink_we_get_conn == 0)
220 8 : dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
221 :
222 : /* OK to make connection */
223 16 : conn = libpqsrv_connect(connstr, dblink_we_get_conn);
224 :
225 16 : if (PQstatus(conn) == CONNECTION_BAD)
226 : {
227 4 : char *msg = pchomp(PQerrorMessage(conn));
228 :
229 4 : libpqsrv_disconnect(conn);
230 4 : ereport(ERROR,
231 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
232 : errmsg("could not establish connection"),
233 : errdetail_internal("%s", msg)));
234 : }
235 :
236 12 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
237 : "received message via remote connection");
238 :
239 12 : dblink_security_check(conn, NULL, connstr);
240 12 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
241 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
242 12 : freeconn = true;
243 12 : conname = NULL;
244 : }
245 :
246 66 : *conn_p = conn;
247 66 : *conname_p = conname;
248 66 : *freeconn_p = freeconn;
249 66 : }
250 :
251 : static PGconn *
252 34 : dblink_get_named_conn(const char *conname)
253 : {
254 34 : remoteConn *rconn = getConnectionByName(conname);
255 :
256 34 : if (rconn)
257 34 : return rconn->conn;
258 :
259 0 : dblink_conn_not_avail(conname);
260 : return NULL; /* keep compiler quiet */
261 : }
262 :
263 : static void
264 246 : dblink_init(void)
265 : {
266 246 : if (!pconn)
267 : {
268 12 : if (dblink_we_get_result == 0)
269 12 : dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
270 :
271 12 : pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
272 12 : pconn->conn = NULL;
273 12 : pconn->openCursorCount = 0;
274 12 : pconn->newXactForCursor = false;
275 : }
276 246 : }
277 :
278 : /*
279 : * Create a persistent connection to another database
280 : */
281 26 : PG_FUNCTION_INFO_V1(dblink_connect);
282 : Datum
283 32 : dblink_connect(PG_FUNCTION_ARGS)
284 : {
285 32 : char *conname_or_str = NULL;
286 32 : char *connstr = NULL;
287 32 : char *connname = NULL;
288 : char *msg;
289 32 : PGconn *conn = NULL;
290 32 : remoteConn *rconn = NULL;
291 :
292 32 : dblink_init();
293 :
294 32 : if (PG_NARGS() == 2)
295 : {
296 22 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
297 22 : connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
298 : }
299 10 : else if (PG_NARGS() == 1)
300 10 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
301 :
302 : /* first check for valid foreign data server */
303 32 : connstr = get_connect_string(conname_or_str);
304 32 : if (connstr == NULL)
305 28 : connstr = conname_or_str;
306 :
307 : /* check password in connection string if not superuser */
308 32 : dblink_connstr_check(connstr);
309 :
310 : /* first time, allocate or get the custom wait event */
311 30 : if (dblink_we_connect == 0)
312 4 : dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
313 :
314 : /* if we need a hashtable entry, make that first, since it might fail */
315 30 : if (connname)
316 : {
317 20 : rconn = createNewConnection(connname);
318 : Assert(rconn->conn == NULL);
319 : }
320 :
321 : /* OK to make connection */
322 28 : conn = libpqsrv_connect(connstr, dblink_we_connect);
323 :
324 28 : if (PQstatus(conn) == CONNECTION_BAD)
325 : {
326 0 : msg = pchomp(PQerrorMessage(conn));
327 0 : libpqsrv_disconnect(conn);
328 0 : if (connname)
329 0 : deleteConnection(connname);
330 :
331 0 : ereport(ERROR,
332 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
333 : errmsg("could not establish connection"),
334 : errdetail_internal("%s", msg)));
335 : }
336 :
337 28 : PQsetNoticeReceiver(conn, libpqsrv_notice_receiver,
338 : "received message via remote connection");
339 :
340 : /* check password actually used if not superuser */
341 28 : dblink_security_check(conn, connname, connstr);
342 :
343 : /* attempt to set client encoding to match server encoding, if needed */
344 28 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
345 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
346 :
347 : /* all OK, save away the conn */
348 28 : if (connname)
349 : {
350 18 : rconn->conn = conn;
351 : }
352 : else
353 : {
354 10 : if (pconn->conn)
355 2 : libpqsrv_disconnect(pconn->conn);
356 10 : pconn->conn = conn;
357 : }
358 :
359 28 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
360 : }
361 :
362 : /*
363 : * Clear a persistent connection to another database
364 : */
365 16 : PG_FUNCTION_INFO_V1(dblink_disconnect);
366 : Datum
367 26 : dblink_disconnect(PG_FUNCTION_ARGS)
368 : {
369 26 : char *conname = NULL;
370 26 : remoteConn *rconn = NULL;
371 26 : PGconn *conn = NULL;
372 :
373 26 : dblink_init();
374 :
375 26 : if (PG_NARGS() == 1)
376 : {
377 18 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
378 18 : rconn = getConnectionByName(conname);
379 18 : if (rconn)
380 16 : conn = rconn->conn;
381 : }
382 : else
383 8 : conn = pconn->conn;
384 :
385 26 : if (!conn)
386 2 : dblink_conn_not_avail(conname);
387 :
388 24 : libpqsrv_disconnect(conn);
389 24 : if (rconn)
390 16 : deleteConnection(conname);
391 : else
392 8 : pconn->conn = NULL;
393 :
394 24 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
395 : }
396 :
397 : /*
398 : * opens a cursor using a persistent connection
399 : */
400 26 : PG_FUNCTION_INFO_V1(dblink_open);
401 : Datum
402 18 : dblink_open(PG_FUNCTION_ARGS)
403 : {
404 18 : PGresult *res = NULL;
405 : PGconn *conn;
406 18 : char *curname = NULL;
407 18 : char *sql = NULL;
408 18 : char *conname = NULL;
409 : StringInfoData buf;
410 18 : remoteConn *rconn = NULL;
411 18 : bool fail = true; /* default to backward compatible behavior */
412 :
413 18 : dblink_init();
414 18 : initStringInfo(&buf);
415 :
416 18 : if (PG_NARGS() == 2)
417 : {
418 : /* text,text */
419 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
420 4 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
421 4 : rconn = pconn;
422 : }
423 14 : else if (PG_NARGS() == 3)
424 : {
425 : /* might be text,text,text or text,text,bool */
426 12 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
427 : {
428 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
429 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
430 2 : fail = PG_GETARG_BOOL(2);
431 2 : rconn = pconn;
432 : }
433 : else
434 : {
435 10 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
436 10 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
437 10 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
438 10 : rconn = getConnectionByName(conname);
439 : }
440 : }
441 2 : else if (PG_NARGS() == 4)
442 : {
443 : /* text,text,text,bool */
444 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
445 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
446 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
447 2 : fail = PG_GETARG_BOOL(3);
448 2 : rconn = getConnectionByName(conname);
449 : }
450 :
451 18 : if (!rconn || !rconn->conn)
452 0 : dblink_conn_not_avail(conname);
453 :
454 18 : conn = rconn->conn;
455 :
456 : /* If we are not in a transaction, start one */
457 18 : if (PQtransactionStatus(conn) == PQTRANS_IDLE)
458 : {
459 14 : res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
460 14 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
461 0 : dblink_res_internalerror(conn, res, "begin error");
462 14 : PQclear(res);
463 14 : rconn->newXactForCursor = true;
464 :
465 : /*
466 : * Since transaction state was IDLE, we force cursor count to
467 : * initially be 0. This is needed as a previous ABORT might have wiped
468 : * out our transaction without maintaining the cursor count for us.
469 : */
470 14 : rconn->openCursorCount = 0;
471 : }
472 :
473 : /* if we started a transaction, increment cursor count */
474 18 : if (rconn->newXactForCursor)
475 18 : (rconn->openCursorCount)++;
476 :
477 18 : appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
478 18 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
479 18 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
480 : {
481 4 : dblink_res_error(conn, conname, res, fail,
482 : "while opening cursor \"%s\"", curname);
483 4 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
484 : }
485 :
486 14 : PQclear(res);
487 14 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
488 : }
489 :
490 : /*
491 : * closes a cursor
492 : */
493 20 : PG_FUNCTION_INFO_V1(dblink_close);
494 : Datum
495 10 : dblink_close(PG_FUNCTION_ARGS)
496 : {
497 : PGconn *conn;
498 10 : PGresult *res = NULL;
499 10 : char *curname = NULL;
500 10 : char *conname = NULL;
501 : StringInfoData buf;
502 10 : remoteConn *rconn = NULL;
503 10 : bool fail = true; /* default to backward compatible behavior */
504 :
505 10 : dblink_init();
506 10 : initStringInfo(&buf);
507 :
508 10 : if (PG_NARGS() == 1)
509 : {
510 : /* text */
511 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
512 0 : rconn = pconn;
513 : }
514 10 : else if (PG_NARGS() == 2)
515 : {
516 : /* might be text,text or text,bool */
517 10 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
518 : {
519 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
520 4 : fail = PG_GETARG_BOOL(1);
521 4 : rconn = pconn;
522 : }
523 : else
524 : {
525 6 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
526 6 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
527 6 : rconn = getConnectionByName(conname);
528 : }
529 : }
530 10 : if (PG_NARGS() == 3)
531 : {
532 : /* text,text,bool */
533 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
534 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
535 0 : fail = PG_GETARG_BOOL(2);
536 0 : rconn = getConnectionByName(conname);
537 : }
538 :
539 10 : if (!rconn || !rconn->conn)
540 0 : dblink_conn_not_avail(conname);
541 :
542 10 : conn = rconn->conn;
543 :
544 10 : appendStringInfo(&buf, "CLOSE %s", curname);
545 :
546 : /* close the cursor */
547 10 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
548 10 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
549 : {
550 2 : dblink_res_error(conn, conname, res, fail,
551 : "while closing cursor \"%s\"", curname);
552 2 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
553 : }
554 :
555 8 : PQclear(res);
556 :
557 : /* if we started a transaction, decrement cursor count */
558 8 : if (rconn->newXactForCursor)
559 : {
560 8 : (rconn->openCursorCount)--;
561 :
562 : /* if count is zero, commit the transaction */
563 8 : if (rconn->openCursorCount == 0)
564 : {
565 4 : rconn->newXactForCursor = false;
566 :
567 4 : res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
568 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
569 0 : dblink_res_internalerror(conn, res, "commit error");
570 4 : PQclear(res);
571 : }
572 : }
573 :
574 8 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
575 : }
576 :
577 : /*
578 : * Fetch results from an open cursor
579 : */
580 26 : PG_FUNCTION_INFO_V1(dblink_fetch);
581 : Datum
582 26 : dblink_fetch(PG_FUNCTION_ARGS)
583 : {
584 26 : PGresult *res = NULL;
585 26 : char *conname = NULL;
586 26 : remoteConn *rconn = NULL;
587 26 : PGconn *conn = NULL;
588 : StringInfoData buf;
589 26 : char *curname = NULL;
590 26 : int howmany = 0;
591 26 : bool fail = true; /* default to backward compatible */
592 :
593 26 : prepTuplestoreResult(fcinfo);
594 :
595 26 : dblink_init();
596 :
597 26 : if (PG_NARGS() == 4)
598 : {
599 : /* text,text,int,bool */
600 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
601 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
602 2 : howmany = PG_GETARG_INT32(2);
603 2 : fail = PG_GETARG_BOOL(3);
604 :
605 2 : rconn = getConnectionByName(conname);
606 2 : if (rconn)
607 2 : conn = rconn->conn;
608 : }
609 24 : else if (PG_NARGS() == 3)
610 : {
611 : /* text,text,int or text,int,bool */
612 16 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
613 : {
614 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
615 4 : howmany = PG_GETARG_INT32(1);
616 4 : fail = PG_GETARG_BOOL(2);
617 4 : conn = pconn->conn;
618 : }
619 : else
620 : {
621 12 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
622 12 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
623 12 : howmany = PG_GETARG_INT32(2);
624 :
625 12 : rconn = getConnectionByName(conname);
626 12 : if (rconn)
627 12 : conn = rconn->conn;
628 : }
629 : }
630 8 : else if (PG_NARGS() == 2)
631 : {
632 : /* text,int */
633 8 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
634 8 : howmany = PG_GETARG_INT32(1);
635 8 : conn = pconn->conn;
636 : }
637 :
638 26 : if (!conn)
639 0 : dblink_conn_not_avail(conname);
640 :
641 26 : initStringInfo(&buf);
642 26 : appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
643 :
644 : /*
645 : * Try to execute the query. Note that since libpq uses malloc, the
646 : * PGresult will be long-lived even though we are still in a short-lived
647 : * memory context.
648 : */
649 26 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
650 52 : if (!res ||
651 52 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
652 26 : PQresultStatus(res) != PGRES_TUPLES_OK))
653 : {
654 10 : dblink_res_error(conn, conname, res, fail,
655 : "while fetching from cursor \"%s\"", curname);
656 6 : return (Datum) 0;
657 : }
658 16 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
659 : {
660 : /* cursor does not exist - closed already or bad name */
661 0 : PQclear(res);
662 0 : ereport(ERROR,
663 : (errcode(ERRCODE_INVALID_CURSOR_NAME),
664 : errmsg("cursor \"%s\" does not exist", curname)));
665 : }
666 :
667 16 : materializeResult(fcinfo, conn, res);
668 14 : return (Datum) 0;
669 : }
670 :
671 : /*
672 : * Note: this is the new preferred version of dblink
673 : */
674 36 : PG_FUNCTION_INFO_V1(dblink_record);
675 : Datum
676 56 : dblink_record(PG_FUNCTION_ARGS)
677 : {
678 56 : return dblink_record_internal(fcinfo, false);
679 : }
680 :
681 8 : PG_FUNCTION_INFO_V1(dblink_send_query);
682 : Datum
683 12 : dblink_send_query(PG_FUNCTION_ARGS)
684 : {
685 : PGconn *conn;
686 : char *sql;
687 : int retval;
688 :
689 12 : if (PG_NARGS() == 2)
690 : {
691 12 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
692 12 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
693 : }
694 : else
695 : /* shouldn't happen */
696 0 : elog(ERROR, "wrong number of arguments");
697 :
698 : /* async query send */
699 12 : retval = PQsendQuery(conn, sql);
700 12 : if (retval != 1)
701 0 : elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
702 :
703 12 : PG_RETURN_INT32(retval);
704 : }
705 :
706 12 : PG_FUNCTION_INFO_V1(dblink_get_result);
707 : Datum
708 16 : dblink_get_result(PG_FUNCTION_ARGS)
709 : {
710 16 : return dblink_record_internal(fcinfo, true);
711 : }
712 :
713 : static Datum
714 72 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
715 : {
716 72 : PGconn *volatile conn = NULL;
717 72 : volatile bool freeconn = false;
718 :
719 72 : prepTuplestoreResult(fcinfo);
720 :
721 72 : dblink_init();
722 :
723 72 : PG_TRY();
724 : {
725 72 : char *sql = NULL;
726 72 : char *conname = NULL;
727 72 : bool fail = true; /* default to backward compatible */
728 :
729 72 : if (!is_async)
730 : {
731 56 : if (PG_NARGS() == 3)
732 : {
733 : /* text,text,bool */
734 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
735 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
736 2 : fail = PG_GETARG_BOOL(2);
737 2 : dblink_get_conn(conname, &conn, &conname, &freeconn);
738 : }
739 54 : else if (PG_NARGS() == 2)
740 : {
741 : /* text,text or text,bool */
742 40 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
743 : {
744 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
745 2 : fail = PG_GETARG_BOOL(1);
746 2 : conn = pconn->conn;
747 : }
748 : else
749 : {
750 38 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
751 38 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
752 38 : dblink_get_conn(conname, &conn, &conname, &freeconn);
753 : }
754 : }
755 14 : else if (PG_NARGS() == 1)
756 : {
757 : /* text */
758 14 : conn = pconn->conn;
759 14 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
760 : }
761 : else
762 : /* shouldn't happen */
763 0 : elog(ERROR, "wrong number of arguments");
764 : }
765 : else /* is_async */
766 : {
767 : /* get async result */
768 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
769 :
770 16 : if (PG_NARGS() == 2)
771 : {
772 : /* text,bool */
773 0 : fail = PG_GETARG_BOOL(1);
774 0 : conn = dblink_get_named_conn(conname);
775 : }
776 16 : else if (PG_NARGS() == 1)
777 : {
778 : /* text */
779 16 : conn = dblink_get_named_conn(conname);
780 : }
781 : else
782 : /* shouldn't happen */
783 0 : elog(ERROR, "wrong number of arguments");
784 : }
785 :
786 66 : if (!conn)
787 4 : dblink_conn_not_avail(conname);
788 :
789 62 : if (!is_async)
790 : {
791 : /* synchronous query, use efficient tuple collection method */
792 46 : materializeQueryResult(fcinfo, conn, conname, sql, fail);
793 : }
794 : else
795 : {
796 : /* async result retrieval, do it the old way */
797 16 : PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);
798 :
799 : /* NULL means we're all done with the async results */
800 16 : if (res)
801 : {
802 10 : if (PQresultStatus(res) != PGRES_COMMAND_OK &&
803 10 : PQresultStatus(res) != PGRES_TUPLES_OK)
804 : {
805 0 : dblink_res_error(conn, conname, res, fail,
806 : "while executing query");
807 : /* if fail isn't set, we'll return an empty query result */
808 : }
809 : else
810 : {
811 10 : materializeResult(fcinfo, conn, res);
812 : }
813 : }
814 : }
815 : }
816 10 : PG_FINALLY();
817 : {
818 : /* if needed, close the connection to the database */
819 72 : if (freeconn)
820 10 : libpqsrv_disconnect(conn);
821 : }
822 72 : PG_END_TRY();
823 :
824 62 : return (Datum) 0;
825 : }
826 :
827 : /*
828 : * Verify function caller can handle a tuplestore result, and set up for that.
829 : *
830 : * Note: if the caller returns without actually creating a tuplestore, the
831 : * executor will treat the function result as an empty set.
832 : */
833 : static void
834 98 : prepTuplestoreResult(FunctionCallInfo fcinfo)
835 : {
836 98 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
837 :
838 : /* check to see if query supports us returning a tuplestore */
839 98 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
840 0 : ereport(ERROR,
841 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
842 : errmsg("set-valued function called in context that cannot accept a set")));
843 98 : if (!(rsinfo->allowedModes & SFRM_Materialize))
844 0 : ereport(ERROR,
845 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
846 : errmsg("materialize mode required, but it is not allowed in this context")));
847 :
848 : /* let the executor know we're sending back a tuplestore */
849 98 : rsinfo->returnMode = SFRM_Materialize;
850 :
851 : /* caller must fill these to return a non-empty result */
852 98 : rsinfo->setResult = NULL;
853 98 : rsinfo->setDesc = NULL;
854 98 : }
855 :
856 : /*
857 : * Copy the contents of the PGresult into a tuplestore to be returned
858 : * as the result of the current function.
859 : * The PGresult will be released in this function.
860 : */
861 : static void
862 26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
863 : {
864 26 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
865 : TupleDesc tupdesc;
866 : bool is_sql_cmd;
867 : int ntuples;
868 : int nfields;
869 :
870 : /* prepTuplestoreResult must have been called previously */
871 : Assert(rsinfo->returnMode == SFRM_Materialize);
872 :
873 26 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
874 : {
875 0 : is_sql_cmd = true;
876 :
877 : /*
878 : * need a tuple descriptor representing one TEXT column to return the
879 : * command status string as our result tuple
880 : */
881 0 : tupdesc = CreateTemplateTupleDesc(1);
882 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
883 : TEXTOID, -1, 0);
884 0 : ntuples = 1;
885 0 : nfields = 1;
886 : }
887 : else
888 : {
889 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
890 :
891 26 : is_sql_cmd = false;
892 :
893 : /* get a tuple descriptor for our result type */
894 26 : switch (get_call_result_type(fcinfo, NULL, &tupdesc))
895 : {
896 26 : case TYPEFUNC_COMPOSITE:
897 : /* success */
898 26 : break;
899 0 : case TYPEFUNC_RECORD:
900 : /* failed to determine actual type of RECORD */
901 0 : ereport(ERROR,
902 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
903 : errmsg("function returning record called in context "
904 : "that cannot accept type record")));
905 : break;
906 0 : default:
907 : /* result type isn't composite */
908 0 : elog(ERROR, "return type must be a row type");
909 : break;
910 : }
911 :
912 : /* make sure we have a persistent copy of the tupdesc */
913 26 : tupdesc = CreateTupleDescCopy(tupdesc);
914 26 : ntuples = PQntuples(res);
915 26 : nfields = PQnfields(res);
916 : }
917 :
918 : /*
919 : * check result and tuple descriptor have the same number of columns
920 : */
921 26 : if (nfields != tupdesc->natts)
922 0 : ereport(ERROR,
923 : (errcode(ERRCODE_DATATYPE_MISMATCH),
924 : errmsg("remote query result rowtype does not match "
925 : "the specified FROM clause rowtype")));
926 :
927 26 : if (ntuples > 0)
928 : {
929 : AttInMetadata *attinmeta;
930 26 : int nestlevel = -1;
931 : Tuplestorestate *tupstore;
932 : MemoryContext oldcontext;
933 : int row;
934 : char **values;
935 :
936 26 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
937 :
938 : /* Set GUCs to ensure we read GUC-sensitive data types correctly */
939 26 : if (!is_sql_cmd)
940 26 : nestlevel = applyRemoteGucs(conn);
941 :
942 26 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
943 26 : tupstore = tuplestore_begin_heap(true, false, work_mem);
944 26 : rsinfo->setResult = tupstore;
945 26 : rsinfo->setDesc = tupdesc;
946 26 : MemoryContextSwitchTo(oldcontext);
947 :
948 26 : values = palloc_array(char *, nfields);
949 :
950 : /* put all tuples into the tuplestore */
951 98 : for (row = 0; row < ntuples; row++)
952 : {
953 : HeapTuple tuple;
954 :
955 74 : if (!is_sql_cmd)
956 : {
957 : int i;
958 :
959 276 : for (i = 0; i < nfields; i++)
960 : {
961 202 : if (PQgetisnull(res, row, i))
962 0 : values[i] = NULL;
963 : else
964 202 : values[i] = PQgetvalue(res, row, i);
965 : }
966 : }
967 : else
968 : {
969 0 : values[0] = PQcmdStatus(res);
970 : }
971 :
972 : /* build the tuple and put it into the tuplestore. */
973 74 : tuple = BuildTupleFromCStrings(attinmeta, values);
974 72 : tuplestore_puttuple(tupstore, tuple);
975 : }
976 :
977 : /* clean up GUC settings, if we changed any */
978 24 : restoreLocalGucs(nestlevel);
979 : }
980 :
981 24 : PQclear(res);
982 24 : }
983 :
984 : /*
985 : * Execute the given SQL command and store its results into a tuplestore
986 : * to be returned as the result of the current function.
987 : *
988 : * This is equivalent to PQexec followed by materializeResult, but we make
989 : * use of libpq's single-row mode to avoid accumulating the whole result
990 : * inside libpq before it gets transferred to the tuplestore.
991 : */
992 : static void
993 46 : materializeQueryResult(FunctionCallInfo fcinfo,
994 : PGconn *conn,
995 : const char *conname,
996 : const char *sql,
997 : bool fail)
998 : {
999 46 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1000 :
1001 : /* prepTuplestoreResult must have been called previously */
1002 : Assert(rsinfo->returnMode == SFRM_Materialize);
1003 :
1004 : /* Use a PG_TRY block to ensure we pump libpq dry of results */
1005 46 : PG_TRY();
1006 : {
1007 46 : storeInfo sinfo = {0};
1008 : PGresult *res;
1009 :
1010 46 : sinfo.fcinfo = fcinfo;
1011 : /* Create short-lived memory context for data conversions */
1012 46 : sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1013 : "dblink temporary context",
1014 : ALLOCSET_DEFAULT_SIZES);
1015 :
1016 : /* execute query, collecting any tuples into the tuplestore */
1017 46 : res = storeQueryResult(&sinfo, conn, sql);
1018 :
1019 46 : if (!res ||
1020 46 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1021 46 : PQresultStatus(res) != PGRES_TUPLES_OK))
1022 : {
1023 4 : dblink_res_error(conn, conname, res, fail,
1024 : "while executing query");
1025 : /* if fail isn't set, we'll return an empty query result */
1026 : }
1027 42 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1028 : {
1029 : /*
1030 : * storeRow didn't get called, so we need to convert the command
1031 : * status string to a tuple manually
1032 : */
1033 : TupleDesc tupdesc;
1034 : AttInMetadata *attinmeta;
1035 : Tuplestorestate *tupstore;
1036 : HeapTuple tuple;
1037 : char *values[1];
1038 : MemoryContext oldcontext;
1039 :
1040 : /*
1041 : * need a tuple descriptor representing one TEXT column to return
1042 : * the command status string as our result tuple
1043 : */
1044 0 : tupdesc = CreateTemplateTupleDesc(1);
1045 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1046 : TEXTOID, -1, 0);
1047 0 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1048 :
1049 0 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1050 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1051 0 : rsinfo->setResult = tupstore;
1052 0 : rsinfo->setDesc = tupdesc;
1053 0 : MemoryContextSwitchTo(oldcontext);
1054 :
1055 0 : values[0] = PQcmdStatus(res);
1056 :
1057 : /* build the tuple and put it into the tuplestore. */
1058 0 : tuple = BuildTupleFromCStrings(attinmeta, values);
1059 0 : tuplestore_puttuple(tupstore, tuple);
1060 :
1061 0 : PQclear(res);
1062 : }
1063 : else
1064 : {
1065 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1066 : /* storeRow should have created a tuplestore */
1067 : Assert(rsinfo->setResult != NULL);
1068 :
1069 42 : PQclear(res);
1070 : }
1071 :
1072 : /* clean up data conversion short-lived memory context */
1073 46 : if (sinfo.tmpcontext != NULL)
1074 46 : MemoryContextDelete(sinfo.tmpcontext);
1075 :
1076 46 : PQclear(sinfo.last_res);
1077 46 : PQclear(sinfo.cur_res);
1078 : }
1079 0 : PG_CATCH();
1080 : {
1081 : PGresult *res;
1082 :
1083 : /* be sure to clear out any pending data in libpq */
1084 0 : while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
1085 : NULL)
1086 0 : PQclear(res);
1087 0 : PG_RE_THROW();
1088 : }
1089 46 : PG_END_TRY();
1090 46 : }
1091 :
1092 : /*
1093 : * Execute query, and send any result rows to sinfo->tuplestore.
1094 : */
1095 : static PGresult *
1096 46 : storeQueryResult(storeInfo *sinfo, PGconn *conn, const char *sql)
1097 : {
1098 46 : bool first = true;
1099 46 : int nestlevel = -1;
1100 : PGresult *res;
1101 :
1102 46 : if (!PQsendQuery(conn, sql))
1103 0 : elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1104 :
1105 46 : if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1106 0 : elog(ERROR, "failed to set single-row mode for dblink query");
1107 :
1108 : for (;;)
1109 : {
1110 396 : CHECK_FOR_INTERRUPTS();
1111 :
1112 396 : sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
1113 396 : if (!sinfo->cur_res)
1114 46 : break;
1115 :
1116 350 : if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1117 : {
1118 : /* got one row from possibly-bigger resultset */
1119 :
1120 : /*
1121 : * Set GUCs to ensure we read GUC-sensitive data types correctly.
1122 : * We shouldn't do this until we have a row in hand, to ensure
1123 : * libpq has seen any earlier ParameterStatus protocol messages.
1124 : */
1125 304 : if (first && nestlevel < 0)
1126 42 : nestlevel = applyRemoteGucs(conn);
1127 :
1128 304 : storeRow(sinfo, sinfo->cur_res, first);
1129 :
1130 304 : PQclear(sinfo->cur_res);
1131 304 : sinfo->cur_res = NULL;
1132 304 : first = false;
1133 : }
1134 : else
1135 : {
1136 : /* if empty resultset, fill tuplestore header */
1137 46 : if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1138 0 : storeRow(sinfo, sinfo->cur_res, first);
1139 :
1140 : /* store completed result at last_res */
1141 46 : PQclear(sinfo->last_res);
1142 46 : sinfo->last_res = sinfo->cur_res;
1143 46 : sinfo->cur_res = NULL;
1144 46 : first = true;
1145 : }
1146 : }
1147 :
1148 : /* clean up GUC settings, if we changed any */
1149 46 : restoreLocalGucs(nestlevel);
1150 :
1151 : /* return last_res */
1152 46 : res = sinfo->last_res;
1153 46 : sinfo->last_res = NULL;
1154 46 : return res;
1155 : }
1156 :
1157 : /*
1158 : * Send single row to sinfo->tuplestore.
1159 : *
1160 : * If "first" is true, create the tuplestore using PGresult's metadata
1161 : * (in this case the PGresult might contain either zero or one row).
1162 : */
1163 : static void
1164 304 : storeRow(storeInfo *sinfo, PGresult *res, bool first)
1165 : {
1166 304 : int nfields = PQnfields(res);
1167 : HeapTuple tuple;
1168 : int i;
1169 : MemoryContext oldcontext;
1170 :
1171 304 : if (first)
1172 : {
1173 : /* Prepare for new result set */
1174 42 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1175 : TupleDesc tupdesc;
1176 :
1177 : /*
1178 : * It's possible to get more than one result set if the query string
1179 : * contained multiple SQL commands. In that case, we follow PQexec's
1180 : * traditional behavior of throwing away all but the last result.
1181 : */
1182 42 : if (sinfo->tuplestore)
1183 0 : tuplestore_end(sinfo->tuplestore);
1184 42 : sinfo->tuplestore = NULL;
1185 :
1186 : /* get a tuple descriptor for our result type */
1187 42 : switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1188 : {
1189 42 : case TYPEFUNC_COMPOSITE:
1190 : /* success */
1191 42 : break;
1192 0 : case TYPEFUNC_RECORD:
1193 : /* failed to determine actual type of RECORD */
1194 0 : ereport(ERROR,
1195 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1196 : errmsg("function returning record called in context "
1197 : "that cannot accept type record")));
1198 : break;
1199 0 : default:
1200 : /* result type isn't composite */
1201 0 : elog(ERROR, "return type must be a row type");
1202 : break;
1203 : }
1204 :
1205 : /* make sure we have a persistent copy of the tupdesc */
1206 42 : tupdesc = CreateTupleDescCopy(tupdesc);
1207 :
1208 : /* check result and tuple descriptor have the same number of columns */
1209 42 : if (nfields != tupdesc->natts)
1210 0 : ereport(ERROR,
1211 : (errcode(ERRCODE_DATATYPE_MISMATCH),
1212 : errmsg("remote query result rowtype does not match "
1213 : "the specified FROM clause rowtype")));
1214 :
1215 : /* Prepare attinmeta for later data conversions */
1216 42 : sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1217 :
1218 : /* Create a new, empty tuplestore */
1219 42 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1220 42 : sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1221 42 : rsinfo->setResult = sinfo->tuplestore;
1222 42 : rsinfo->setDesc = tupdesc;
1223 42 : MemoryContextSwitchTo(oldcontext);
1224 :
1225 : /* Done if empty resultset */
1226 42 : if (PQntuples(res) == 0)
1227 0 : return;
1228 :
1229 : /*
1230 : * Set up sufficiently-wide string pointers array; this won't change
1231 : * in size so it's easy to preallocate.
1232 : */
1233 42 : if (sinfo->cstrs)
1234 0 : pfree(sinfo->cstrs);
1235 42 : sinfo->cstrs = palloc_array(char *, nfields);
1236 : }
1237 :
1238 : /* Should have a single-row result if we get here */
1239 : Assert(PQntuples(res) == 1);
1240 :
1241 : /*
1242 : * Do the following work in a temp context that we reset after each tuple.
1243 : * This cleans up not only the data we have direct access to, but any
1244 : * cruft the I/O functions might leak.
1245 : */
1246 304 : oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1247 :
1248 : /*
1249 : * Fill cstrs with null-terminated strings of column values.
1250 : */
1251 1140 : for (i = 0; i < nfields; i++)
1252 : {
1253 836 : if (PQgetisnull(res, 0, i))
1254 0 : sinfo->cstrs[i] = NULL;
1255 : else
1256 836 : sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1257 : }
1258 :
1259 : /* Convert row to a tuple, and add it to the tuplestore */
1260 304 : tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1261 :
1262 304 : tuplestore_puttuple(sinfo->tuplestore, tuple);
1263 :
1264 : /* Clean up */
1265 304 : MemoryContextSwitchTo(oldcontext);
1266 304 : MemoryContextReset(sinfo->tmpcontext);
1267 : }
1268 :
1269 : /*
1270 : * List all open dblink connections by name.
1271 : * Returns an array of all connection names.
1272 : * Takes no params
1273 : */
1274 6 : PG_FUNCTION_INFO_V1(dblink_get_connections);
1275 : Datum
1276 2 : dblink_get_connections(PG_FUNCTION_ARGS)
1277 : {
1278 : HASH_SEQ_STATUS status;
1279 : remoteConnHashEnt *hentry;
1280 2 : ArrayBuildState *astate = NULL;
1281 :
1282 2 : if (remoteConnHash)
1283 : {
1284 2 : hash_seq_init(&status, remoteConnHash);
1285 8 : while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1286 : {
1287 : /* ignore it if it's not an open connection */
1288 6 : if (hentry->rconn.conn == NULL)
1289 0 : continue;
1290 : /* stash away current value */
1291 6 : astate = accumArrayResult(astate,
1292 6 : CStringGetTextDatum(hentry->name),
1293 : false, TEXTOID, CurrentMemoryContext);
1294 : }
1295 : }
1296 :
1297 2 : if (astate)
1298 2 : PG_RETURN_DATUM(makeArrayResult(astate,
1299 : CurrentMemoryContext));
1300 : else
1301 0 : PG_RETURN_NULL();
1302 : }
1303 :
1304 : /*
1305 : * Checks if a given remote connection is busy
1306 : *
1307 : * Returns 1 if the connection is busy, 0 otherwise
1308 : * Params:
1309 : * text connection_name - name of the connection to check
1310 : *
1311 : */
1312 6 : PG_FUNCTION_INFO_V1(dblink_is_busy);
1313 : Datum
1314 2 : dblink_is_busy(PG_FUNCTION_ARGS)
1315 : {
1316 : PGconn *conn;
1317 :
1318 2 : dblink_init();
1319 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1320 :
1321 2 : PQconsumeInput(conn);
1322 2 : PG_RETURN_INT32(PQisBusy(conn));
1323 : }
1324 :
1325 : /*
1326 : * Cancels a running request on a connection
1327 : *
1328 : * Returns text:
1329 : * "OK" if the cancel request has been sent correctly,
1330 : * an error message otherwise
1331 : *
1332 : * Params:
1333 : * text connection_name - name of the connection to check
1334 : *
1335 : */
1336 6 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
1337 : Datum
1338 2 : dblink_cancel_query(PG_FUNCTION_ARGS)
1339 : {
1340 : PGconn *conn;
1341 : const char *msg;
1342 : TimestampTz endtime;
1343 :
1344 2 : dblink_init();
1345 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1346 2 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1347 : 30000);
1348 2 : msg = libpqsrv_cancel(conn, endtime);
1349 2 : if (msg == NULL)
1350 2 : msg = "OK";
1351 :
1352 2 : PG_RETURN_TEXT_P(cstring_to_text(msg));
1353 : }
1354 :
1355 :
1356 : /*
1357 : * Get error message from a connection
1358 : *
1359 : * Returns text:
1360 : * "OK" if no error, an error message otherwise
1361 : *
1362 : * Params:
1363 : * text connection_name - name of the connection to check
1364 : *
1365 : */
1366 6 : PG_FUNCTION_INFO_V1(dblink_error_message);
1367 : Datum
1368 2 : dblink_error_message(PG_FUNCTION_ARGS)
1369 : {
1370 : char *msg;
1371 : PGconn *conn;
1372 :
1373 2 : dblink_init();
1374 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1375 :
1376 2 : msg = PQerrorMessage(conn);
1377 2 : if (msg == NULL || msg[0] == '\0')
1378 2 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1379 : else
1380 0 : PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1381 : }
1382 :
1383 : /*
1384 : * Execute an SQL non-SELECT command
1385 : */
1386 26 : PG_FUNCTION_INFO_V1(dblink_exec);
1387 : Datum
1388 52 : dblink_exec(PG_FUNCTION_ARGS)
1389 : {
1390 52 : text *volatile sql_cmd_status = NULL;
1391 52 : PGconn *volatile conn = NULL;
1392 52 : volatile bool freeconn = false;
1393 :
1394 52 : dblink_init();
1395 :
1396 52 : PG_TRY();
1397 : {
1398 52 : PGresult *res = NULL;
1399 52 : char *sql = NULL;
1400 52 : char *conname = NULL;
1401 52 : bool fail = true; /* default to backward compatible behavior */
1402 :
1403 52 : if (PG_NARGS() == 3)
1404 : {
1405 : /* must be text,text,bool */
1406 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1407 0 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1408 0 : fail = PG_GETARG_BOOL(2);
1409 0 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1410 : }
1411 52 : else if (PG_NARGS() == 2)
1412 : {
1413 : /* might be text,text or text,bool */
1414 34 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1415 : {
1416 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1417 2 : fail = PG_GETARG_BOOL(1);
1418 2 : conn = pconn->conn;
1419 : }
1420 : else
1421 : {
1422 32 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1423 32 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1424 32 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1425 : }
1426 : }
1427 18 : else if (PG_NARGS() == 1)
1428 : {
1429 : /* must be single text argument */
1430 18 : conn = pconn->conn;
1431 18 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1432 : }
1433 : else
1434 : /* shouldn't happen */
1435 0 : elog(ERROR, "wrong number of arguments");
1436 :
1437 52 : if (!conn)
1438 0 : dblink_conn_not_avail(conname);
1439 :
1440 52 : res = libpqsrv_exec(conn, sql, dblink_we_get_result);
1441 52 : if (!res ||
1442 52 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1443 4 : PQresultStatus(res) != PGRES_TUPLES_OK))
1444 : {
1445 4 : dblink_res_error(conn, conname, res, fail,
1446 : "while executing command");
1447 :
1448 : /*
1449 : * and save a copy of the command status string to return as our
1450 : * result tuple
1451 : */
1452 2 : sql_cmd_status = cstring_to_text("ERROR");
1453 : }
1454 48 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1455 : {
1456 : /*
1457 : * and save a copy of the command status string to return as our
1458 : * result tuple
1459 : */
1460 48 : sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1461 48 : PQclear(res);
1462 : }
1463 : else
1464 : {
1465 0 : PQclear(res);
1466 0 : ereport(ERROR,
1467 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1468 : errmsg("statement returning results not allowed")));
1469 : }
1470 : }
1471 2 : PG_FINALLY();
1472 : {
1473 : /* if needed, close the connection to the database */
1474 52 : if (freeconn)
1475 2 : libpqsrv_disconnect(conn);
1476 : }
1477 52 : PG_END_TRY();
1478 :
1479 50 : PG_RETURN_TEXT_P(sql_cmd_status);
1480 : }
1481 :
1482 :
1483 : /*
1484 : * dblink_get_pkey
1485 : *
1486 : * Return list of primary key fields for the supplied relation,
1487 : * or NULL if none exists.
1488 : */
1489 6 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
1490 : Datum
1491 18 : dblink_get_pkey(PG_FUNCTION_ARGS)
1492 : {
1493 : int16 indnkeyatts;
1494 : char **results;
1495 : FuncCallContext *funcctx;
1496 : int32 call_cntr;
1497 : int32 max_calls;
1498 : AttInMetadata *attinmeta;
1499 : MemoryContext oldcontext;
1500 :
1501 : /* stuff done only on the first call of the function */
1502 18 : if (SRF_IS_FIRSTCALL())
1503 : {
1504 : Relation rel;
1505 : TupleDesc tupdesc;
1506 :
1507 : /* create a function context for cross-call persistence */
1508 6 : funcctx = SRF_FIRSTCALL_INIT();
1509 :
1510 : /*
1511 : * switch to memory context appropriate for multiple function calls
1512 : */
1513 6 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1514 :
1515 : /* open target relation */
1516 6 : rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1517 :
1518 : /* get the array of attnums */
1519 6 : results = get_pkey_attnames(rel, &indnkeyatts);
1520 :
1521 6 : relation_close(rel, AccessShareLock);
1522 :
1523 : /*
1524 : * need a tuple descriptor representing one INT and one TEXT column
1525 : */
1526 6 : tupdesc = CreateTemplateTupleDesc(2);
1527 6 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1528 : INT4OID, -1, 0);
1529 6 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1530 : TEXTOID, -1, 0);
1531 :
1532 : /*
1533 : * Generate attribute metadata needed later to produce tuples from raw
1534 : * C strings
1535 : */
1536 6 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1537 6 : funcctx->attinmeta = attinmeta;
1538 :
1539 6 : if ((results != NULL) && (indnkeyatts > 0))
1540 : {
1541 6 : funcctx->max_calls = indnkeyatts;
1542 :
1543 : /* got results, keep track of them */
1544 6 : funcctx->user_fctx = results;
1545 : }
1546 : else
1547 : {
1548 : /* fast track when no results */
1549 0 : MemoryContextSwitchTo(oldcontext);
1550 0 : SRF_RETURN_DONE(funcctx);
1551 : }
1552 :
1553 6 : MemoryContextSwitchTo(oldcontext);
1554 : }
1555 :
1556 : /* stuff done on every call of the function */
1557 18 : funcctx = SRF_PERCALL_SETUP();
1558 :
1559 : /*
1560 : * initialize per-call variables
1561 : */
1562 18 : call_cntr = funcctx->call_cntr;
1563 18 : max_calls = funcctx->max_calls;
1564 :
1565 18 : results = (char **) funcctx->user_fctx;
1566 18 : attinmeta = funcctx->attinmeta;
1567 :
1568 18 : if (call_cntr < max_calls) /* do when there is more left to send */
1569 : {
1570 : char **values;
1571 : HeapTuple tuple;
1572 : Datum result;
1573 :
1574 12 : values = palloc_array(char *, 2);
1575 12 : values[0] = psprintf("%d", call_cntr + 1);
1576 12 : values[1] = results[call_cntr];
1577 :
1578 : /* build the tuple */
1579 12 : tuple = BuildTupleFromCStrings(attinmeta, values);
1580 :
1581 : /* make the tuple into a datum */
1582 12 : result = HeapTupleGetDatum(tuple);
1583 :
1584 12 : SRF_RETURN_NEXT(funcctx, result);
1585 : }
1586 : else
1587 : {
1588 : /* do when there is no more left */
1589 6 : SRF_RETURN_DONE(funcctx);
1590 : }
1591 : }
1592 :
1593 :
1594 : /*
1595 : * dblink_build_sql_insert
1596 : *
1597 : * Used to generate an SQL insert statement
1598 : * based on an existing tuple in a local relation.
1599 : * This is useful for selectively replicating data
1600 : * to another server via dblink.
1601 : *
1602 : * API:
1603 : * <relname> - name of local table of interest
1604 : * <pkattnums> - an int2vector of attnums which will be used
1605 : * to identify the local tuple of interest
1606 : * <pknumatts> - number of attnums in pkattnums
1607 : * <src_pkattvals_arry> - text array of key values which will be used
1608 : * to identify the local tuple of interest
1609 : * <tgt_pkattvals_arry> - text array of key values which will be used
1610 : * to build the string for execution remotely. These are substituted
1611 : * for their counterparts in src_pkattvals_arry
1612 : */
1613 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1614 : Datum
1615 12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
1616 : {
1617 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1618 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1619 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1620 12 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1621 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1622 : Relation rel;
1623 : int *pkattnums;
1624 : int pknumatts;
1625 : char **src_pkattvals;
1626 : char **tgt_pkattvals;
1627 : int src_nitems;
1628 : int tgt_nitems;
1629 : char *sql;
1630 :
1631 : /*
1632 : * Open target relation.
1633 : */
1634 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1635 :
1636 : /*
1637 : * Process pkattnums argument.
1638 : */
1639 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1640 : &pkattnums, &pknumatts);
1641 :
1642 : /*
1643 : * Source array is made up of key values that will be used to locate the
1644 : * tuple of interest from the local system.
1645 : */
1646 8 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1647 :
1648 : /*
1649 : * There should be one source array key value for each key attnum
1650 : */
1651 8 : if (src_nitems != pknumatts)
1652 0 : ereport(ERROR,
1653 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1654 : errmsg("source key array length must match number of key attributes")));
1655 :
1656 : /*
1657 : * Target array is made up of key values that will be used to build the
1658 : * SQL string for use on the remote system.
1659 : */
1660 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1661 :
1662 : /*
1663 : * There should be one target array key value for each key attnum
1664 : */
1665 8 : if (tgt_nitems != pknumatts)
1666 0 : ereport(ERROR,
1667 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1668 : errmsg("target key array length must match number of key attributes")));
1669 :
1670 : /*
1671 : * Prep work is finally done. Go get the SQL string.
1672 : */
1673 8 : sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1674 :
1675 : /*
1676 : * Now we can close the relation.
1677 : */
1678 8 : relation_close(rel, AccessShareLock);
1679 :
1680 : /*
1681 : * And send it
1682 : */
1683 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1684 : }
1685 :
1686 :
1687 : /*
1688 : * dblink_build_sql_delete
1689 : *
1690 : * Used to generate an SQL delete statement.
1691 : * This is useful for selectively replicating a
1692 : * delete to another server via dblink.
1693 : *
1694 : * API:
1695 : * <relname> - name of remote table of interest
1696 : * <pkattnums> - an int2vector of attnums which will be used
1697 : * to identify the remote tuple of interest
1698 : * <pknumatts> - number of attnums in pkattnums
1699 : * <tgt_pkattvals_arry> - text array of key values which will be used
1700 : * to build the string for execution remotely.
1701 : */
1702 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1703 : Datum
1704 12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
1705 : {
1706 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1707 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1708 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1709 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1710 : Relation rel;
1711 : int *pkattnums;
1712 : int pknumatts;
1713 : char **tgt_pkattvals;
1714 : int tgt_nitems;
1715 : char *sql;
1716 :
1717 : /*
1718 : * Open target relation.
1719 : */
1720 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1721 :
1722 : /*
1723 : * Process pkattnums argument.
1724 : */
1725 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1726 : &pkattnums, &pknumatts);
1727 :
1728 : /*
1729 : * Target array is made up of key values that will be used to build the
1730 : * SQL string for use on the remote system.
1731 : */
1732 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1733 :
1734 : /*
1735 : * There should be one target array key value for each key attnum
1736 : */
1737 8 : if (tgt_nitems != pknumatts)
1738 0 : ereport(ERROR,
1739 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1740 : errmsg("target key array length must match number of key attributes")));
1741 :
1742 : /*
1743 : * Prep work is finally done. Go get the SQL string.
1744 : */
1745 8 : sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1746 :
1747 : /*
1748 : * Now we can close the relation.
1749 : */
1750 8 : relation_close(rel, AccessShareLock);
1751 :
1752 : /*
1753 : * And send it
1754 : */
1755 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1756 : }
1757 :
1758 :
1759 : /*
1760 : * dblink_build_sql_update
1761 : *
1762 : * Used to generate an SQL update statement
1763 : * based on an existing tuple in a local relation.
1764 : * This is useful for selectively replicating data
1765 : * to another server via dblink.
1766 : *
1767 : * API:
1768 : * <relname> - name of local table of interest
1769 : * <pkattnums> - an int2vector of attnums which will be used
1770 : * to identify the local tuple of interest
1771 : * <pknumatts> - number of attnums in pkattnums
1772 : * <src_pkattvals_arry> - text array of key values which will be used
1773 : * to identify the local tuple of interest
1774 : * <tgt_pkattvals_arry> - text array of key values which will be used
1775 : * to build the string for execution remotely. These are substituted
1776 : * for their counterparts in src_pkattvals_arry
1777 : */
1778 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1779 : Datum
1780 12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
1781 : {
1782 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1783 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1784 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1785 12 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1786 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1787 : Relation rel;
1788 : int *pkattnums;
1789 : int pknumatts;
1790 : char **src_pkattvals;
1791 : char **tgt_pkattvals;
1792 : int src_nitems;
1793 : int tgt_nitems;
1794 : char *sql;
1795 :
1796 : /*
1797 : * Open target relation.
1798 : */
1799 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1800 :
1801 : /*
1802 : * Process pkattnums argument.
1803 : */
1804 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1805 : &pkattnums, &pknumatts);
1806 :
1807 : /*
1808 : * Source array is made up of key values that will be used to locate the
1809 : * tuple of interest from the local system.
1810 : */
1811 8 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1812 :
1813 : /*
1814 : * There should be one source array key value for each key attnum
1815 : */
1816 8 : if (src_nitems != pknumatts)
1817 0 : ereport(ERROR,
1818 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1819 : errmsg("source key array length must match number of key attributes")));
1820 :
1821 : /*
1822 : * Target array is made up of key values that will be used to build the
1823 : * SQL string for use on the remote system.
1824 : */
1825 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1826 :
1827 : /*
1828 : * There should be one target array key value for each key attnum
1829 : */
1830 8 : if (tgt_nitems != pknumatts)
1831 0 : ereport(ERROR,
1832 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1833 : errmsg("target key array length must match number of key attributes")));
1834 :
1835 : /*
1836 : * Prep work is finally done. Go get the SQL string.
1837 : */
1838 8 : sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1839 :
1840 : /*
1841 : * Now we can close the relation.
1842 : */
1843 8 : relation_close(rel, AccessShareLock);
1844 :
1845 : /*
1846 : * And send it
1847 : */
1848 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1849 : }
1850 :
1851 : /*
1852 : * dblink_current_query
1853 : * return the current query string
1854 : * to allow its use in (among other things)
1855 : * rewrite rules
1856 : */
1857 4 : PG_FUNCTION_INFO_V1(dblink_current_query);
1858 : Datum
1859 0 : dblink_current_query(PG_FUNCTION_ARGS)
1860 : {
1861 : /* This is now just an alias for the built-in function current_query() */
1862 0 : PG_RETURN_DATUM(current_query(fcinfo));
1863 : }
1864 :
1865 : /*
1866 : * Retrieve async notifications for a connection.
1867 : *
1868 : * Returns a setof record of notifications, or an empty set if none received.
1869 : * Can optionally take a named connection as parameter, but uses the unnamed
1870 : * connection per default.
1871 : *
1872 : */
1873 : #define DBLINK_NOTIFY_COLS 3
1874 :
1875 10 : PG_FUNCTION_INFO_V1(dblink_get_notify);
1876 : Datum
1877 4 : dblink_get_notify(PG_FUNCTION_ARGS)
1878 : {
1879 : PGconn *conn;
1880 : PGnotify *notify;
1881 4 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1882 :
1883 4 : dblink_init();
1884 4 : if (PG_NARGS() == 1)
1885 0 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1886 : else
1887 4 : conn = pconn->conn;
1888 :
1889 4 : InitMaterializedSRF(fcinfo, 0);
1890 :
1891 4 : PQconsumeInput(conn);
1892 8 : while ((notify = PQnotifies(conn)) != NULL)
1893 : {
1894 : Datum values[DBLINK_NOTIFY_COLS];
1895 : bool nulls[DBLINK_NOTIFY_COLS];
1896 :
1897 4 : memset(values, 0, sizeof(values));
1898 4 : memset(nulls, 0, sizeof(nulls));
1899 :
1900 4 : if (notify->relname != NULL)
1901 4 : values[0] = CStringGetTextDatum(notify->relname);
1902 : else
1903 0 : nulls[0] = true;
1904 :
1905 4 : values[1] = Int32GetDatum(notify->be_pid);
1906 :
1907 4 : if (notify->extra != NULL)
1908 4 : values[2] = CStringGetTextDatum(notify->extra);
1909 : else
1910 0 : nulls[2] = true;
1911 :
1912 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1913 :
1914 4 : PQfreemem(notify);
1915 4 : PQconsumeInput(conn);
1916 : }
1917 :
1918 4 : return (Datum) 0;
1919 : }
1920 :
1921 : /*
1922 : * Validate the options given to a dblink foreign server or user mapping.
1923 : * Raise an error if any option is invalid.
1924 : *
1925 : * We just check the names of options here, so semantic errors in options,
1926 : * such as invalid numeric format, will be detected at the attempt to connect.
1927 : */
1928 28 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1929 : Datum
1930 38 : dblink_fdw_validator(PG_FUNCTION_ARGS)
1931 : {
1932 38 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1933 38 : Oid context = PG_GETARG_OID(1);
1934 : ListCell *cell;
1935 :
1936 : static const PQconninfoOption *options = NULL;
1937 :
1938 : /*
1939 : * Get list of valid libpq options.
1940 : *
1941 : * To avoid unnecessary work, we get the list once and use it throughout
1942 : * the lifetime of this backend process. We don't need to care about
1943 : * memory context issues, because PQconndefaults allocates with malloc.
1944 : */
1945 38 : if (!options)
1946 : {
1947 24 : options = PQconndefaults();
1948 24 : if (!options) /* assume reason for failure is OOM */
1949 0 : ereport(ERROR,
1950 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1951 : errmsg("out of memory"),
1952 : errdetail("Could not get libpq's default connection options.")));
1953 : }
1954 :
1955 : /* Validate each supplied option. */
1956 100 : foreach(cell, options_list)
1957 : {
1958 78 : DefElem *def = (DefElem *) lfirst(cell);
1959 :
1960 78 : if (!is_valid_dblink_fdw_option(options, def->defname, context))
1961 : {
1962 : /*
1963 : * Unknown option, or invalid option for the context specified, so
1964 : * complain about it. Provide a hint with a valid option that
1965 : * looks similar, if there is one.
1966 : */
1967 : const PQconninfoOption *opt;
1968 : const char *closest_match;
1969 : ClosestMatchState match_state;
1970 16 : bool has_valid_options = false;
1971 :
1972 16 : initClosestMatch(&match_state, def->defname, 4);
1973 832 : for (opt = options; opt->keyword; opt++)
1974 : {
1975 816 : if (is_valid_dblink_option(options, opt->keyword, context))
1976 : {
1977 186 : has_valid_options = true;
1978 186 : updateClosestMatch(&match_state, opt->keyword);
1979 : }
1980 : }
1981 :
1982 16 : closest_match = getClosestMatch(&match_state);
1983 16 : ereport(ERROR,
1984 : (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
1985 : errmsg("invalid option \"%s\"", def->defname),
1986 : has_valid_options ? closest_match ?
1987 : errhint("Perhaps you meant the option \"%s\".",
1988 : closest_match) : 0 :
1989 : errhint("There are no valid options in this context.")));
1990 : }
1991 : }
1992 :
1993 22 : PG_RETURN_VOID();
1994 : }
1995 :
1996 :
1997 : /*************************************************************
1998 : * internal functions
1999 : */
2000 :
2001 :
2002 : /*
2003 : * get_pkey_attnames
2004 : *
2005 : * Get the primary key attnames for the given relation.
2006 : * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2007 : */
2008 : static char **
2009 6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2010 : {
2011 : Relation indexRelation;
2012 : ScanKeyData skey;
2013 : SysScanDesc scan;
2014 : HeapTuple indexTuple;
2015 : int i;
2016 6 : char **result = NULL;
2017 : TupleDesc tupdesc;
2018 :
2019 : /* initialize indnkeyatts to 0 in case no primary key exists */
2020 6 : *indnkeyatts = 0;
2021 :
2022 6 : tupdesc = rel->rd_att;
2023 :
2024 : /* Prepare to scan pg_index for entries having indrelid = this rel. */
2025 6 : indexRelation = table_open(IndexRelationId, AccessShareLock);
2026 6 : ScanKeyInit(&skey,
2027 : Anum_pg_index_indrelid,
2028 : BTEqualStrategyNumber, F_OIDEQ,
2029 : ObjectIdGetDatum(RelationGetRelid(rel)));
2030 :
2031 6 : scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2032 : NULL, 1, &skey);
2033 :
2034 6 : while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2035 : {
2036 6 : Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2037 :
2038 : /* we're only interested if it is the primary key */
2039 6 : if (index->indisprimary)
2040 : {
2041 6 : *indnkeyatts = index->indnkeyatts;
2042 6 : if (*indnkeyatts > 0)
2043 : {
2044 6 : result = palloc_array(char *, *indnkeyatts);
2045 :
2046 18 : for (i = 0; i < *indnkeyatts; i++)
2047 12 : result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2048 : }
2049 6 : break;
2050 : }
2051 : }
2052 :
2053 6 : systable_endscan(scan);
2054 6 : table_close(indexRelation, AccessShareLock);
2055 :
2056 6 : return result;
2057 : }
2058 :
2059 : /*
2060 : * Deconstruct a text[] into C-strings (note any NULL elements will be
2061 : * returned as NULL pointers)
2062 : */
2063 : static char **
2064 40 : get_text_array_contents(ArrayType *array, int *numitems)
2065 : {
2066 40 : int ndim = ARR_NDIM(array);
2067 40 : int *dims = ARR_DIMS(array);
2068 : int nitems;
2069 : int16 typlen;
2070 : bool typbyval;
2071 : char typalign;
2072 : char **values;
2073 : char *ptr;
2074 : bits8 *bitmap;
2075 : int bitmask;
2076 : int i;
2077 :
2078 : Assert(ARR_ELEMTYPE(array) == TEXTOID);
2079 :
2080 40 : *numitems = nitems = ArrayGetNItems(ndim, dims);
2081 :
2082 40 : get_typlenbyvalalign(ARR_ELEMTYPE(array),
2083 : &typlen, &typbyval, &typalign);
2084 :
2085 40 : values = palloc_array(char *, nitems);
2086 :
2087 40 : ptr = ARR_DATA_PTR(array);
2088 40 : bitmap = ARR_NULLBITMAP(array);
2089 40 : bitmask = 1;
2090 :
2091 110 : for (i = 0; i < nitems; i++)
2092 : {
2093 70 : if (bitmap && (*bitmap & bitmask) == 0)
2094 : {
2095 0 : values[i] = NULL;
2096 : }
2097 : else
2098 : {
2099 70 : values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2100 70 : ptr = att_addlength_pointer(ptr, typlen, ptr);
2101 70 : ptr = (char *) att_align_nominal(ptr, typalign);
2102 : }
2103 :
2104 : /* advance bitmap pointer if any */
2105 70 : if (bitmap)
2106 : {
2107 0 : bitmask <<= 1;
2108 0 : if (bitmask == 0x100)
2109 : {
2110 0 : bitmap++;
2111 0 : bitmask = 1;
2112 : }
2113 : }
2114 : }
2115 :
2116 40 : return values;
2117 : }
2118 :
2119 : static char *
2120 8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2121 : {
2122 : char *relname;
2123 : HeapTuple tuple;
2124 : TupleDesc tupdesc;
2125 : int natts;
2126 : StringInfoData buf;
2127 : char *val;
2128 : int key;
2129 : int i;
2130 : bool needComma;
2131 :
2132 8 : initStringInfo(&buf);
2133 :
2134 : /* get relation name including any needed schema prefix and quoting */
2135 8 : relname = generate_relation_name(rel);
2136 :
2137 8 : tupdesc = rel->rd_att;
2138 8 : natts = tupdesc->natts;
2139 :
2140 8 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2141 8 : if (!tuple)
2142 0 : ereport(ERROR,
2143 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2144 : errmsg("source row not found")));
2145 :
2146 8 : appendStringInfo(&buf, "INSERT INTO %s(", relname);
2147 :
2148 8 : needComma = false;
2149 38 : for (i = 0; i < natts; i++)
2150 : {
2151 30 : Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2152 :
2153 30 : if (att->attisdropped)
2154 4 : continue;
2155 :
2156 26 : if (needComma)
2157 18 : appendStringInfoChar(&buf, ',');
2158 :
2159 26 : appendStringInfoString(&buf,
2160 26 : quote_ident_cstr(NameStr(att->attname)));
2161 26 : needComma = true;
2162 : }
2163 :
2164 8 : appendStringInfoString(&buf, ") VALUES(");
2165 :
2166 : /*
2167 : * Note: i is physical column number (counting from 0).
2168 : */
2169 8 : needComma = false;
2170 38 : for (i = 0; i < natts; i++)
2171 : {
2172 30 : if (TupleDescAttr(tupdesc, i)->attisdropped)
2173 4 : continue;
2174 :
2175 26 : if (needComma)
2176 18 : appendStringInfoChar(&buf, ',');
2177 :
2178 26 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2179 :
2180 26 : if (key >= 0)
2181 14 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2182 : else
2183 12 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2184 :
2185 26 : if (val != NULL)
2186 : {
2187 26 : appendStringInfoString(&buf, quote_literal_cstr(val));
2188 26 : pfree(val);
2189 : }
2190 : else
2191 0 : appendStringInfoString(&buf, "NULL");
2192 26 : needComma = true;
2193 : }
2194 8 : appendStringInfoChar(&buf, ')');
2195 :
2196 8 : return buf.data;
2197 : }
2198 :
2199 : static char *
2200 8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2201 : {
2202 : char *relname;
2203 : TupleDesc tupdesc;
2204 : StringInfoData buf;
2205 : int i;
2206 :
2207 8 : initStringInfo(&buf);
2208 :
2209 : /* get relation name including any needed schema prefix and quoting */
2210 8 : relname = generate_relation_name(rel);
2211 :
2212 8 : tupdesc = rel->rd_att;
2213 :
2214 8 : appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2215 22 : for (i = 0; i < pknumatts; i++)
2216 : {
2217 14 : int pkattnum = pkattnums[i];
2218 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2219 :
2220 14 : if (i > 0)
2221 6 : appendStringInfoString(&buf, " AND ");
2222 :
2223 14 : appendStringInfoString(&buf,
2224 14 : quote_ident_cstr(NameStr(attr->attname)));
2225 :
2226 14 : if (tgt_pkattvals[i] != NULL)
2227 14 : appendStringInfo(&buf, " = %s",
2228 14 : quote_literal_cstr(tgt_pkattvals[i]));
2229 : else
2230 0 : appendStringInfoString(&buf, " IS NULL");
2231 : }
2232 :
2233 8 : return buf.data;
2234 : }
2235 :
2236 : static char *
2237 8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2238 : {
2239 : char *relname;
2240 : HeapTuple tuple;
2241 : TupleDesc tupdesc;
2242 : int natts;
2243 : StringInfoData buf;
2244 : char *val;
2245 : int key;
2246 : int i;
2247 : bool needComma;
2248 :
2249 8 : initStringInfo(&buf);
2250 :
2251 : /* get relation name including any needed schema prefix and quoting */
2252 8 : relname = generate_relation_name(rel);
2253 :
2254 8 : tupdesc = rel->rd_att;
2255 8 : natts = tupdesc->natts;
2256 :
2257 8 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2258 8 : if (!tuple)
2259 0 : ereport(ERROR,
2260 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2261 : errmsg("source row not found")));
2262 :
2263 8 : appendStringInfo(&buf, "UPDATE %s SET ", relname);
2264 :
2265 : /*
2266 : * Note: i is physical column number (counting from 0).
2267 : */
2268 8 : needComma = false;
2269 38 : for (i = 0; i < natts; i++)
2270 : {
2271 30 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2272 :
2273 30 : if (attr->attisdropped)
2274 4 : continue;
2275 :
2276 26 : if (needComma)
2277 18 : appendStringInfoString(&buf, ", ");
2278 :
2279 26 : appendStringInfo(&buf, "%s = ",
2280 26 : quote_ident_cstr(NameStr(attr->attname)));
2281 :
2282 26 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2283 :
2284 26 : if (key >= 0)
2285 14 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2286 : else
2287 12 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2288 :
2289 26 : if (val != NULL)
2290 : {
2291 26 : appendStringInfoString(&buf, quote_literal_cstr(val));
2292 26 : pfree(val);
2293 : }
2294 : else
2295 0 : appendStringInfoString(&buf, "NULL");
2296 26 : needComma = true;
2297 : }
2298 :
2299 8 : appendStringInfoString(&buf, " WHERE ");
2300 :
2301 22 : for (i = 0; i < pknumatts; i++)
2302 : {
2303 14 : int pkattnum = pkattnums[i];
2304 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2305 :
2306 14 : if (i > 0)
2307 6 : appendStringInfoString(&buf, " AND ");
2308 :
2309 14 : appendStringInfoString(&buf,
2310 14 : quote_ident_cstr(NameStr(attr->attname)));
2311 :
2312 14 : val = tgt_pkattvals[i];
2313 :
2314 14 : if (val != NULL)
2315 14 : appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2316 : else
2317 0 : appendStringInfoString(&buf, " IS NULL");
2318 : }
2319 :
2320 8 : return buf.data;
2321 : }
2322 :
2323 : /*
2324 : * Return a properly quoted identifier.
2325 : * Uses quote_ident in quote.c
2326 : */
2327 : static char *
2328 160 : quote_ident_cstr(char *rawstr)
2329 : {
2330 : text *rawstr_text;
2331 : text *result_text;
2332 : char *result;
2333 :
2334 160 : rawstr_text = cstring_to_text(rawstr);
2335 160 : result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2336 : PointerGetDatum(rawstr_text)));
2337 160 : result = text_to_cstring(result_text);
2338 :
2339 160 : return result;
2340 : }
2341 :
2342 : static int
2343 52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2344 : {
2345 : int i;
2346 :
2347 : /*
2348 : * Not likely a long list anyway, so just scan for the value
2349 : */
2350 100 : for (i = 0; i < pknumatts; i++)
2351 76 : if (key == pkattnums[i])
2352 28 : return i;
2353 :
2354 24 : return -1;
2355 : }
2356 :
2357 : static HeapTuple
2358 16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2359 : {
2360 : char *relname;
2361 : TupleDesc tupdesc;
2362 : int natts;
2363 : StringInfoData buf;
2364 : int ret;
2365 : HeapTuple tuple;
2366 : int i;
2367 :
2368 : /*
2369 : * Connect to SPI manager
2370 : */
2371 16 : SPI_connect();
2372 :
2373 16 : initStringInfo(&buf);
2374 :
2375 : /* get relation name including any needed schema prefix and quoting */
2376 16 : relname = generate_relation_name(rel);
2377 :
2378 16 : tupdesc = rel->rd_att;
2379 16 : natts = tupdesc->natts;
2380 :
2381 : /*
2382 : * Build sql statement to look up tuple of interest, ie, the one matching
2383 : * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2384 : * generate a result tuple that matches the table's physical structure,
2385 : * with NULLs for any dropped columns. Otherwise we have to deal with two
2386 : * different tupdescs and everything's very confusing.
2387 : */
2388 16 : appendStringInfoString(&buf, "SELECT ");
2389 :
2390 76 : for (i = 0; i < natts; i++)
2391 : {
2392 60 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2393 :
2394 60 : if (i > 0)
2395 44 : appendStringInfoString(&buf, ", ");
2396 :
2397 60 : if (attr->attisdropped)
2398 8 : appendStringInfoString(&buf, "NULL");
2399 : else
2400 52 : appendStringInfoString(&buf,
2401 52 : quote_ident_cstr(NameStr(attr->attname)));
2402 : }
2403 :
2404 16 : appendStringInfo(&buf, " FROM %s WHERE ", relname);
2405 :
2406 44 : for (i = 0; i < pknumatts; i++)
2407 : {
2408 28 : int pkattnum = pkattnums[i];
2409 28 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2410 :
2411 28 : if (i > 0)
2412 12 : appendStringInfoString(&buf, " AND ");
2413 :
2414 28 : appendStringInfoString(&buf,
2415 28 : quote_ident_cstr(NameStr(attr->attname)));
2416 :
2417 28 : if (src_pkattvals[i] != NULL)
2418 28 : appendStringInfo(&buf, " = %s",
2419 28 : quote_literal_cstr(src_pkattvals[i]));
2420 : else
2421 0 : appendStringInfoString(&buf, " IS NULL");
2422 : }
2423 :
2424 : /*
2425 : * Retrieve the desired tuple
2426 : */
2427 16 : ret = SPI_exec(buf.data, 0);
2428 16 : pfree(buf.data);
2429 :
2430 : /*
2431 : * Only allow one qualifying tuple
2432 : */
2433 16 : if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2434 0 : ereport(ERROR,
2435 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2436 : errmsg("source criteria matched more than one record")));
2437 :
2438 16 : else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2439 : {
2440 16 : SPITupleTable *tuptable = SPI_tuptable;
2441 :
2442 16 : tuple = SPI_copytuple(tuptable->vals[0]);
2443 16 : SPI_finish();
2444 :
2445 16 : return tuple;
2446 : }
2447 : else
2448 : {
2449 : /*
2450 : * no qualifying tuples
2451 : */
2452 0 : SPI_finish();
2453 :
2454 0 : return NULL;
2455 : }
2456 :
2457 : /*
2458 : * never reached, but keep compiler quiet
2459 : */
2460 : return NULL;
2461 : }
2462 :
2463 : /*
2464 : * Open the relation named by relname_text, acquire specified type of lock,
2465 : * verify we have specified permissions.
2466 : * Caller must close rel when done with it.
2467 : */
2468 : static Relation
2469 42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2470 : {
2471 : RangeVar *relvar;
2472 : Relation rel;
2473 : AclResult aclresult;
2474 :
2475 42 : relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2476 42 : rel = table_openrv(relvar, lockmode);
2477 :
2478 42 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2479 : aclmode);
2480 42 : if (aclresult != ACLCHECK_OK)
2481 0 : aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2482 0 : RelationGetRelationName(rel));
2483 :
2484 42 : return rel;
2485 : }
2486 :
2487 : /*
2488 : * generate_relation_name - copied from ruleutils.c
2489 : * Compute the name to display for a relation
2490 : *
2491 : * The result includes all necessary quoting and schema-prefixing.
2492 : */
2493 : static char *
2494 40 : generate_relation_name(Relation rel)
2495 : {
2496 : char *nspname;
2497 : char *result;
2498 :
2499 : /* Qualify the name if not visible in search path */
2500 40 : if (RelationIsVisible(RelationGetRelid(rel)))
2501 30 : nspname = NULL;
2502 : else
2503 10 : nspname = get_namespace_name(rel->rd_rel->relnamespace);
2504 :
2505 40 : result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2506 :
2507 40 : return result;
2508 : }
2509 :
2510 :
2511 : static remoteConn *
2512 156 : getConnectionByName(const char *name)
2513 : {
2514 : remoteConnHashEnt *hentry;
2515 : char *key;
2516 :
2517 156 : if (!remoteConnHash)
2518 10 : remoteConnHash = createConnHash();
2519 :
2520 156 : key = pstrdup(name);
2521 156 : truncate_identifier(key, strlen(key), false);
2522 156 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2523 : key, HASH_FIND, NULL);
2524 :
2525 156 : if (hentry && hentry->rconn.conn != NULL)
2526 136 : return &hentry->rconn;
2527 :
2528 20 : return NULL;
2529 : }
2530 :
2531 : static HTAB *
2532 12 : createConnHash(void)
2533 : {
2534 : HASHCTL ctl;
2535 :
2536 12 : ctl.keysize = NAMEDATALEN;
2537 12 : ctl.entrysize = sizeof(remoteConnHashEnt);
2538 :
2539 12 : return hash_create("Remote Con hash", NUMCONN, &ctl,
2540 : HASH_ELEM | HASH_STRINGS);
2541 : }
2542 :
2543 : static remoteConn *
2544 20 : createNewConnection(const char *name)
2545 : {
2546 : remoteConnHashEnt *hentry;
2547 : bool found;
2548 : char *key;
2549 :
2550 20 : if (!remoteConnHash)
2551 2 : remoteConnHash = createConnHash();
2552 :
2553 20 : key = pstrdup(name);
2554 20 : truncate_identifier(key, strlen(key), true);
2555 20 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2556 : HASH_ENTER, &found);
2557 :
2558 20 : if (found && hentry->rconn.conn != NULL)
2559 2 : ereport(ERROR,
2560 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2561 : errmsg("duplicate connection name")));
2562 :
2563 : /* New, or reusable, so initialize the rconn struct to zeroes */
2564 18 : memset(&hentry->rconn, 0, sizeof(remoteConn));
2565 :
2566 18 : return &hentry->rconn;
2567 : }
2568 :
2569 : static void
2570 16 : deleteConnection(const char *name)
2571 : {
2572 : remoteConnHashEnt *hentry;
2573 : bool found;
2574 : char *key;
2575 :
2576 16 : if (!remoteConnHash)
2577 0 : remoteConnHash = createConnHash();
2578 :
2579 16 : key = pstrdup(name);
2580 16 : truncate_identifier(key, strlen(key), false);
2581 16 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2582 : key, HASH_REMOVE, &found);
2583 :
2584 16 : if (!hentry)
2585 0 : ereport(ERROR,
2586 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2587 : errmsg("undefined connection name")));
2588 16 : }
2589 :
2590 : /*
2591 : * Ensure that require_auth and SCRAM keys are correctly set on connstr.
2592 : * SCRAM keys used to pass-through are coming from the initial connection
2593 : * from the client with the server.
2594 : *
2595 : * All required SCRAM options are set by dblink, so we just need to ensure
2596 : * that these options are not overwritten by the user.
2597 : *
2598 : * See appendSCRAMKeysInfo and its usage for more.
2599 : */
2600 : bool
2601 10 : dblink_connstr_has_required_scram_options(const char *connstr)
2602 : {
2603 : PQconninfoOption *options;
2604 10 : bool has_scram_server_key = false;
2605 10 : bool has_scram_client_key = false;
2606 10 : bool has_require_auth = false;
2607 10 : bool has_scram_keys = false;
2608 :
2609 10 : options = PQconninfoParse(connstr, NULL);
2610 10 : if (options)
2611 : {
2612 : /*
2613 : * Continue iterating even if we found the keys that we need to
2614 : * validate to make sure that there is no other declaration of these
2615 : * keys that can overwrite the first.
2616 : */
2617 520 : for (PQconninfoOption *option = options; option->keyword != NULL; option++)
2618 : {
2619 510 : if (strcmp(option->keyword, "require_auth") == 0)
2620 : {
2621 10 : if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
2622 8 : has_require_auth = true;
2623 : else
2624 2 : has_require_auth = false;
2625 : }
2626 :
2627 510 : if (strcmp(option->keyword, "scram_client_key") == 0)
2628 : {
2629 10 : if (option->val != NULL && option->val[0] != '\0')
2630 10 : has_scram_client_key = true;
2631 : else
2632 0 : has_scram_client_key = false;
2633 : }
2634 :
2635 510 : if (strcmp(option->keyword, "scram_server_key") == 0)
2636 : {
2637 10 : if (option->val != NULL && option->val[0] != '\0')
2638 10 : has_scram_server_key = true;
2639 : else
2640 0 : has_scram_server_key = false;
2641 : }
2642 : }
2643 10 : PQconninfoFree(options);
2644 : }
2645 :
2646 10 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
2647 :
2648 10 : return (has_scram_keys && has_require_auth);
2649 : }
2650 :
2651 : /*
2652 : * We need to make sure that the connection made used credentials
2653 : * which were provided by the user, so check what credentials were
2654 : * used to connect and then make sure that they came from the user.
2655 : *
2656 : * On failure, we close "conn" and also delete the hashtable entry
2657 : * identified by "connname" (if that's not NULL).
2658 : */
2659 : static void
2660 40 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
2661 : {
2662 : /* Superuser bypasses security check */
2663 40 : if (superuser())
2664 36 : return;
2665 :
2666 : /* If password was used to connect, make sure it was one provided */
2667 4 : if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
2668 0 : return;
2669 :
2670 : /*
2671 : * Password was not used to connect, check if SCRAM pass-through is in
2672 : * use.
2673 : *
2674 : * If dblink_connstr_has_required_scram_options is true we assume that
2675 : * UseScramPassthrough is also true because the required SCRAM keys are
2676 : * only added if UseScramPassthrough is set, and the user is not allowed
2677 : * to add the SCRAM keys on fdw and user mapping options.
2678 : */
2679 4 : if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2680 4 : return;
2681 :
2682 : #ifdef ENABLE_GSS
2683 : /* If GSSAPI creds used to connect, make sure it was one delegated */
2684 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
2685 : return;
2686 : #endif
2687 :
2688 : /* Otherwise, fail out */
2689 0 : libpqsrv_disconnect(conn);
2690 0 : if (connname)
2691 0 : deleteConnection(connname);
2692 :
2693 0 : ereport(ERROR,
2694 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2695 : errmsg("password or GSSAPI delegated credentials required"),
2696 : errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2697 : errhint("Ensure provided credentials match target server's authentication method.")));
2698 : }
2699 :
2700 : /*
2701 : * Function to check if the connection string includes an explicit
2702 : * password, needed to ensure that non-superuser password-based auth
2703 : * is using a provided password and not one picked up from the
2704 : * environment.
2705 : */
2706 : static bool
2707 12 : dblink_connstr_has_pw(const char *connstr)
2708 : {
2709 : PQconninfoOption *options;
2710 : PQconninfoOption *option;
2711 12 : bool connstr_gives_password = false;
2712 :
2713 12 : options = PQconninfoParse(connstr, NULL);
2714 12 : if (options)
2715 : {
2716 624 : for (option = options; option->keyword != NULL; option++)
2717 : {
2718 612 : if (strcmp(option->keyword, "password") == 0)
2719 : {
2720 12 : if (option->val != NULL && option->val[0] != '\0')
2721 : {
2722 0 : connstr_gives_password = true;
2723 0 : break;
2724 : }
2725 : }
2726 : }
2727 12 : PQconninfoFree(options);
2728 : }
2729 :
2730 12 : return connstr_gives_password;
2731 : }
2732 :
2733 : /*
2734 : * For non-superusers, insist that the connstr specify a password, except if
2735 : * GSSAPI credentials have been delegated (and we check that they are used for
2736 : * the connection in dblink_security_check later) or if SCRAM pass-through is
2737 : * being used. This prevents a password or GSSAPI credentials from being
2738 : * picked up from .pgpass, a service file, the environment, etc. We don't want
2739 : * the postgres user's passwords or Kerberos credentials to be accessible to
2740 : * non-superusers. In case of SCRAM pass-through insist that the connstr
2741 : * has the required SCRAM pass-through options.
2742 : */
2743 : static void
2744 50 : dblink_connstr_check(const char *connstr)
2745 : {
2746 50 : if (superuser())
2747 42 : return;
2748 :
2749 8 : if (dblink_connstr_has_pw(connstr))
2750 0 : return;
2751 :
2752 8 : if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2753 4 : return;
2754 :
2755 : #ifdef ENABLE_GSS
2756 : if (be_gssapi_get_delegation(MyProcPort))
2757 : return;
2758 : #endif
2759 :
2760 4 : ereport(ERROR,
2761 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2762 : errmsg("password or GSSAPI delegated credentials required"),
2763 : errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2764 : }
2765 :
2766 : /*
2767 : * Report an error received from the remote server
2768 : *
2769 : * res: the received error result
2770 : * fail: true for ERROR ereport, false for NOTICE
2771 : * fmt and following args: sprintf-style format and values for errcontext;
2772 : * the resulting string should be worded like "while <some action>"
2773 : *
2774 : * If "res" is not NULL, it'll be PQclear'ed here (unless we throw error,
2775 : * in which case memory context cleanup will clear it eventually).
2776 : */
2777 : static void
2778 24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2779 : bool fail, const char *fmt,...)
2780 : {
2781 : int level;
2782 24 : char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2783 24 : char *message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2784 24 : char *message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2785 24 : char *message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2786 24 : char *message_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2787 : int sqlstate;
2788 : va_list ap;
2789 : char dblink_context_msg[512];
2790 :
2791 24 : if (fail)
2792 6 : level = ERROR;
2793 : else
2794 18 : level = NOTICE;
2795 :
2796 24 : if (pg_diag_sqlstate)
2797 24 : sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2798 : pg_diag_sqlstate[1],
2799 : pg_diag_sqlstate[2],
2800 : pg_diag_sqlstate[3],
2801 : pg_diag_sqlstate[4]);
2802 : else
2803 0 : sqlstate = ERRCODE_CONNECTION_FAILURE;
2804 :
2805 : /*
2806 : * If we don't get a message from the PGresult, try the PGconn. This is
2807 : * needed because for connection-level failures, PQgetResult may just
2808 : * return NULL, not a PGresult at all.
2809 : */
2810 24 : if (message_primary == NULL)
2811 0 : message_primary = pchomp(PQerrorMessage(conn));
2812 :
2813 : /*
2814 : * Format the basic errcontext string. Below, we'll add on something
2815 : * about the connection name. That's a violation of the translatability
2816 : * guidelines about constructing error messages out of parts, but since
2817 : * there's no translation support for dblink, there's no need to worry
2818 : * about that (yet).
2819 : */
2820 24 : va_start(ap, fmt);
2821 24 : vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2822 24 : va_end(ap);
2823 :
2824 24 : ereport(level,
2825 : (errcode(sqlstate),
2826 : (message_primary != NULL && message_primary[0] != '\0') ?
2827 : errmsg_internal("%s", message_primary) :
2828 : errmsg("could not obtain message string for remote error"),
2829 : message_detail ? errdetail_internal("%s", message_detail) : 0,
2830 : message_hint ? errhint("%s", message_hint) : 0,
2831 : message_context ? (errcontext("%s", message_context)) : 0,
2832 : conname ?
2833 : (errcontext("%s on dblink connection named \"%s\"",
2834 : dblink_context_msg, conname)) :
2835 : (errcontext("%s on unnamed dblink connection",
2836 : dblink_context_msg))));
2837 18 : PQclear(res);
2838 18 : }
2839 :
2840 : /*
2841 : * Obtain connection string for a foreign server
2842 : */
2843 : static char *
2844 50 : get_connect_string(const char *servername)
2845 : {
2846 50 : ForeignServer *foreign_server = NULL;
2847 : UserMapping *user_mapping;
2848 : ListCell *cell;
2849 : StringInfoData buf;
2850 : ForeignDataWrapper *fdw;
2851 : AclResult aclresult;
2852 : char *srvname;
2853 :
2854 : static const PQconninfoOption *options = NULL;
2855 :
2856 50 : initStringInfo(&buf);
2857 :
2858 : /*
2859 : * Get list of valid libpq options.
2860 : *
2861 : * To avoid unnecessary work, we get the list once and use it throughout
2862 : * the lifetime of this backend process. We don't need to care about
2863 : * memory context issues, because PQconndefaults allocates with malloc.
2864 : */
2865 50 : if (!options)
2866 : {
2867 12 : options = PQconndefaults();
2868 12 : if (!options) /* assume reason for failure is OOM */
2869 0 : ereport(ERROR,
2870 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2871 : errmsg("out of memory"),
2872 : errdetail("Could not get libpq's default connection options.")));
2873 : }
2874 :
2875 : /* first gather the server connstr options */
2876 50 : srvname = pstrdup(servername);
2877 50 : truncate_identifier(srvname, strlen(srvname), false);
2878 50 : foreign_server = GetForeignServerByName(srvname, true);
2879 :
2880 50 : if (foreign_server)
2881 : {
2882 10 : Oid serverid = foreign_server->serverid;
2883 10 : Oid fdwid = foreign_server->fdwid;
2884 10 : Oid userid = GetUserId();
2885 :
2886 10 : user_mapping = GetUserMapping(userid, serverid);
2887 10 : fdw = GetForeignDataWrapper(fdwid);
2888 :
2889 : /* Check permissions, user must have usage on the server. */
2890 10 : aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2891 10 : if (aclresult != ACLCHECK_OK)
2892 0 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2893 :
2894 : /*
2895 : * First append hardcoded options needed for SCRAM pass-through, so if
2896 : * the user overwrites these options we can ereport on
2897 : * dblink_connstr_check and dblink_security_check.
2898 : */
2899 10 : if (MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
2900 6 : appendSCRAMKeysInfo(&buf);
2901 :
2902 10 : foreach(cell, fdw->options)
2903 : {
2904 0 : DefElem *def = lfirst(cell);
2905 :
2906 0 : if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2907 0 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2908 0 : escape_param_str(strVal(def->arg)));
2909 : }
2910 :
2911 44 : foreach(cell, foreign_server->options)
2912 : {
2913 34 : DefElem *def = lfirst(cell);
2914 :
2915 34 : if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2916 28 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2917 28 : escape_param_str(strVal(def->arg)));
2918 : }
2919 :
2920 20 : foreach(cell, user_mapping->options)
2921 : {
2922 :
2923 10 : DefElem *def = lfirst(cell);
2924 :
2925 10 : if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2926 10 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2927 10 : escape_param_str(strVal(def->arg)));
2928 : }
2929 :
2930 10 : return buf.data;
2931 : }
2932 : else
2933 40 : return NULL;
2934 : }
2935 :
2936 : /*
2937 : * Escaping libpq connect parameter strings.
2938 : *
2939 : * Replaces "'" with "\'" and "\" with "\\".
2940 : */
2941 : static char *
2942 38 : escape_param_str(const char *str)
2943 : {
2944 : const char *cp;
2945 : StringInfoData buf;
2946 :
2947 38 : initStringInfo(&buf);
2948 :
2949 344 : for (cp = str; *cp; cp++)
2950 : {
2951 306 : if (*cp == '\\' || *cp == '\'')
2952 0 : appendStringInfoChar(&buf, '\\');
2953 306 : appendStringInfoChar(&buf, *cp);
2954 : }
2955 :
2956 38 : return buf.data;
2957 : }
2958 :
2959 : /*
2960 : * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2961 : * functions, and translate to the internal representation.
2962 : *
2963 : * The user supplies an int2vector of 1-based logical attnums, plus a count
2964 : * argument (the need for the separate count argument is historical, but we
2965 : * still check it). We check that each attnum corresponds to a valid,
2966 : * non-dropped attribute of the rel. We do *not* prevent attnums from being
2967 : * listed twice, though the actual use-case for such things is dubious.
2968 : * Note that before Postgres 9.0, the user's attnums were interpreted as
2969 : * physical not logical column numbers; this was changed for future-proofing.
2970 : *
2971 : * The internal representation is a palloc'd int array of 0-based physical
2972 : * attnums.
2973 : */
2974 : static void
2975 36 : validate_pkattnums(Relation rel,
2976 : int2vector *pkattnums_arg, int32 pknumatts_arg,
2977 : int **pkattnums, int *pknumatts)
2978 : {
2979 36 : TupleDesc tupdesc = rel->rd_att;
2980 36 : int natts = tupdesc->natts;
2981 : int i;
2982 :
2983 : /* Don't take more array elements than there are */
2984 36 : pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
2985 :
2986 : /* Must have at least one pk attnum selected */
2987 36 : if (pknumatts_arg <= 0)
2988 0 : ereport(ERROR,
2989 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
2990 : errmsg("number of key attributes must be > 0")));
2991 :
2992 : /* Allocate output array */
2993 36 : *pkattnums = palloc_array(int, pknumatts_arg);
2994 36 : *pknumatts = pknumatts_arg;
2995 :
2996 : /* Validate attnums and convert to internal form */
2997 114 : for (i = 0; i < pknumatts_arg; i++)
2998 : {
2999 90 : int pkattnum = pkattnums_arg->values[i];
3000 : int lnum;
3001 : int j;
3002 :
3003 : /* Can throw error immediately if out of range */
3004 90 : if (pkattnum <= 0 || pkattnum > natts)
3005 12 : ereport(ERROR,
3006 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3007 : errmsg("invalid attribute number %d", pkattnum)));
3008 :
3009 : /* Identify which physical column has this logical number */
3010 78 : lnum = 0;
3011 138 : for (j = 0; j < natts; j++)
3012 : {
3013 : /* dropped columns don't count */
3014 138 : if (TupleDescAttr(tupdesc, j)->attisdropped)
3015 6 : continue;
3016 :
3017 132 : if (++lnum == pkattnum)
3018 78 : break;
3019 : }
3020 :
3021 78 : if (j < natts)
3022 78 : (*pkattnums)[i] = j;
3023 : else
3024 0 : ereport(ERROR,
3025 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3026 : errmsg("invalid attribute number %d", pkattnum)));
3027 : }
3028 24 : }
3029 :
3030 : /*
3031 : * Check if the specified connection option is valid.
3032 : *
3033 : * We basically allow whatever libpq thinks is an option, with these
3034 : * restrictions:
3035 : * debug options: disallowed
3036 : * "client_encoding": disallowed
3037 : * "user": valid only in USER MAPPING options
3038 : * secure options (eg password): valid only in USER MAPPING options
3039 : * others: valid only in FOREIGN SERVER options
3040 : *
3041 : * We disallow client_encoding because it would be overridden anyway via
3042 : * PQclientEncoding; allowing it to be specified would merely promote
3043 : * confusion.
3044 : */
3045 : static bool
3046 930 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3047 : Oid context)
3048 : {
3049 : const PQconninfoOption *opt;
3050 :
3051 : /* Look up the option in libpq result */
3052 23090 : for (opt = options; opt->keyword; opt++)
3053 : {
3054 23080 : if (strcmp(opt->keyword, option) == 0)
3055 920 : break;
3056 : }
3057 930 : if (opt->keyword == NULL)
3058 10 : return false;
3059 :
3060 : /* Disallow debug options (particularly "replication") */
3061 920 : if (strchr(opt->dispchar, 'D'))
3062 68 : return false;
3063 :
3064 : /* Disallow "client_encoding" */
3065 852 : if (strcmp(opt->keyword, "client_encoding") == 0)
3066 16 : return false;
3067 :
3068 : /*
3069 : * Disallow OAuth options for now, since the builtin flow communicates on
3070 : * stderr by default and can't cache tokens yet.
3071 : */
3072 836 : if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
3073 72 : return false;
3074 :
3075 : /*
3076 : * If the option is "user" or marked secure, it should be specified only
3077 : * in USER MAPPING. Others should be specified only in SERVER.
3078 : */
3079 764 : if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3080 : {
3081 74 : if (context != UserMappingRelationId)
3082 18 : return false;
3083 : }
3084 : else
3085 : {
3086 690 : if (context != ForeignServerRelationId)
3087 468 : return false;
3088 : }
3089 :
3090 278 : return true;
3091 : }
3092 :
3093 : /*
3094 : * Same as is_valid_dblink_option but also check for only dblink_fdw specific
3095 : * options.
3096 : */
3097 : static bool
3098 78 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
3099 : Oid context)
3100 : {
3101 78 : if (strcmp(option, "use_scram_passthrough") == 0)
3102 8 : return true;
3103 :
3104 70 : return is_valid_dblink_option(options, option, context);
3105 : }
3106 :
3107 : /*
3108 : * Copy the remote session's values of GUCs that affect datatype I/O
3109 : * and apply them locally in a new GUC nesting level. Returns the new
3110 : * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3111 : * or -1 if no new nestlevel was needed.
3112 : *
3113 : * We use the equivalent of a function SET option to allow the settings to
3114 : * persist only until the caller calls restoreLocalGucs. If an error is
3115 : * thrown in between, guc.c will take care of undoing the settings.
3116 : */
3117 : static int
3118 68 : applyRemoteGucs(PGconn *conn)
3119 : {
3120 : static const char *const GUCsAffectingIO[] = {
3121 : "DateStyle",
3122 : "IntervalStyle"
3123 : };
3124 :
3125 68 : int nestlevel = -1;
3126 : int i;
3127 :
3128 204 : for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3129 : {
3130 136 : const char *gucName = GUCsAffectingIO[i];
3131 136 : const char *remoteVal = PQparameterStatus(conn, gucName);
3132 : const char *localVal;
3133 :
3134 : /*
3135 : * If the remote server is pre-8.4, it won't have IntervalStyle, but
3136 : * that's okay because its output format won't be ambiguous. So just
3137 : * skip the GUC if we don't get a value for it. (We might eventually
3138 : * need more complicated logic with remote-version checks here.)
3139 : */
3140 136 : if (remoteVal == NULL)
3141 0 : continue;
3142 :
3143 : /*
3144 : * Avoid GUC-setting overhead if the remote and local GUCs already
3145 : * have the same value.
3146 : */
3147 136 : localVal = GetConfigOption(gucName, false, false);
3148 : Assert(localVal != NULL);
3149 :
3150 136 : if (strcmp(remoteVal, localVal) == 0)
3151 102 : continue;
3152 :
3153 : /* Create new GUC nest level if we didn't already */
3154 34 : if (nestlevel < 0)
3155 18 : nestlevel = NewGUCNestLevel();
3156 :
3157 : /* Apply the option (this will throw error on failure) */
3158 34 : (void) set_config_option(gucName, remoteVal,
3159 : PGC_USERSET, PGC_S_SESSION,
3160 : GUC_ACTION_SAVE, true, 0, false);
3161 : }
3162 :
3163 68 : return nestlevel;
3164 : }
3165 :
3166 : /*
3167 : * Restore local GUCs after they have been overlaid with remote settings.
3168 : */
3169 : static void
3170 70 : restoreLocalGucs(int nestlevel)
3171 : {
3172 : /* Do nothing if no new nestlevel was created */
3173 70 : if (nestlevel > 0)
3174 16 : AtEOXact_GUC(true, nestlevel);
3175 70 : }
3176 :
3177 : /*
3178 : * Append SCRAM client key and server key information from the global
3179 : * MyProcPort into the given StringInfo buffer.
3180 : */
3181 : static void
3182 6 : appendSCRAMKeysInfo(StringInfo buf)
3183 : {
3184 : int len;
3185 : int encoded_len;
3186 : char *client_key;
3187 : char *server_key;
3188 :
3189 6 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
3190 : /* don't forget the zero-terminator */
3191 6 : client_key = palloc0(len + 1);
3192 6 : encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
3193 : sizeof(MyProcPort->scram_ClientKey),
3194 : client_key, len);
3195 6 : if (encoded_len < 0)
3196 0 : elog(ERROR, "could not encode SCRAM client key");
3197 :
3198 6 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
3199 : /* don't forget the zero-terminator */
3200 6 : server_key = palloc0(len + 1);
3201 6 : encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
3202 : sizeof(MyProcPort->scram_ServerKey),
3203 : server_key, len);
3204 6 : if (encoded_len < 0)
3205 0 : elog(ERROR, "could not encode SCRAM server key");
3206 :
3207 6 : appendStringInfo(buf, "scram_client_key='%s' ", client_key);
3208 6 : appendStringInfo(buf, "scram_server_key='%s' ", server_key);
3209 6 : appendStringInfoString(buf, "require_auth='scram-sha-256' ");
3210 :
3211 6 : pfree(client_key);
3212 6 : pfree(server_key);
3213 6 : }
3214 :
3215 :
3216 : static bool
3217 6 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
3218 : {
3219 : ListCell *cell;
3220 :
3221 24 : foreach(cell, foreign_server->options)
3222 : {
3223 24 : DefElem *def = lfirst(cell);
3224 :
3225 24 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3226 6 : return defGetBoolean(def);
3227 : }
3228 :
3229 0 : foreach(cell, user->options)
3230 : {
3231 0 : DefElem *def = (DefElem *) lfirst(cell);
3232 :
3233 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3234 0 : return defGetBoolean(def);
3235 : }
3236 :
3237 0 : return false;
3238 : }
|