Line data Source code
1 : /*
2 : * dblink.c
3 : *
4 : * Functions returning results from a remote database
5 : *
6 : * Joe Conway <mail@joeconway.com>
7 : * And contributors:
8 : * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 : * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 : *
11 : * contrib/dblink/dblink.c
12 : * Copyright (c) 2001-2025, PostgreSQL Global Development Group
13 : * ALL RIGHTS RESERVED;
14 : *
15 : * Permission to use, copy, modify, and distribute this software and its
16 : * documentation for any purpose, without fee, and without a written agreement
17 : * is hereby granted, provided that the above copyright notice and this
18 : * paragraph and the following two paragraphs appear in all copies.
19 : *
20 : * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 : * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 : * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 : * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 : * POSSIBILITY OF SUCH DAMAGE.
25 : *
26 : * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 : * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 : * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 : * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 : * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 : *
32 : */
33 : #include "postgres.h"
34 :
35 : #include <limits.h>
36 :
37 : #include "access/htup_details.h"
38 : #include "access/relation.h"
39 : #include "access/reloptions.h"
40 : #include "access/table.h"
41 : #include "catalog/namespace.h"
42 : #include "catalog/pg_foreign_data_wrapper.h"
43 : #include "catalog/pg_foreign_server.h"
44 : #include "catalog/pg_type.h"
45 : #include "catalog/pg_user_mapping.h"
46 : #include "commands/defrem.h"
47 : #include "common/base64.h"
48 : #include "executor/spi.h"
49 : #include "foreign/foreign.h"
50 : #include "funcapi.h"
51 : #include "lib/stringinfo.h"
52 : #include "libpq-fe.h"
53 : #include "libpq/libpq-be.h"
54 : #include "libpq/libpq-be-fe-helpers.h"
55 : #include "mb/pg_wchar.h"
56 : #include "miscadmin.h"
57 : #include "parser/scansup.h"
58 : #include "utils/acl.h"
59 : #include "utils/builtins.h"
60 : #include "utils/fmgroids.h"
61 : #include "utils/guc.h"
62 : #include "utils/lsyscache.h"
63 : #include "utils/memutils.h"
64 : #include "utils/rel.h"
65 : #include "utils/varlena.h"
66 : #include "utils/wait_event.h"
67 :
68 34 : PG_MODULE_MAGIC_EXT(
69 : .name = "dblink",
70 : .version = PG_VERSION
71 : );
72 :
73 : typedef struct remoteConn
74 : {
75 : PGconn *conn; /* Hold the remote connection */
76 : int openCursorCount; /* The number of open cursors */
77 : bool newXactForCursor; /* Opened a transaction for a cursor */
78 : } remoteConn;
79 :
80 : typedef struct storeInfo
81 : {
82 : FunctionCallInfo fcinfo;
83 : Tuplestorestate *tuplestore;
84 : AttInMetadata *attinmeta;
85 : MemoryContext tmpcontext;
86 : char **cstrs;
87 : /* temp storage for results to avoid leaks on exception */
88 : PGresult *last_res;
89 : PGresult *cur_res;
90 : } storeInfo;
91 :
92 : /*
93 : * Internal declarations
94 : */
95 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
96 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
97 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
98 : PGresult *res);
99 : static void materializeQueryResult(FunctionCallInfo fcinfo,
100 : PGconn *conn,
101 : const char *conname,
102 : const char *sql,
103 : bool fail);
104 : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
105 : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
106 : static remoteConn *getConnectionByName(const char *name);
107 : static HTAB *createConnHash(void);
108 : static void createNewConnection(const char *name, remoteConn *rconn);
109 : static void deleteConnection(const char *name);
110 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
111 : static char **get_text_array_contents(ArrayType *array, int *numitems);
112 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
113 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
114 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
115 : static char *quote_ident_cstr(char *rawstr);
116 : static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
117 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
118 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
119 : static char *generate_relation_name(Relation rel);
120 : static void dblink_connstr_check(const char *connstr);
121 : static bool dblink_connstr_has_pw(const char *connstr);
122 : static void dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr);
123 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
124 : bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
125 : static char *get_connect_string(const char *servername);
126 : static char *escape_param_str(const char *str);
127 : static void validate_pkattnums(Relation rel,
128 : int2vector *pkattnums_arg, int32 pknumatts_arg,
129 : int **pkattnums, int *pknumatts);
130 : static bool is_valid_dblink_option(const PQconninfoOption *options,
131 : const char *option, Oid context);
132 : static int applyRemoteGucs(PGconn *conn);
133 : static void restoreLocalGucs(int nestlevel);
134 : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
135 : static void appendSCRAMKeysInfo(StringInfo buf);
136 : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
137 : Oid context);
138 : static bool dblink_connstr_has_required_scram_options(const char *connstr);
139 :
140 : /* Global */
141 : static remoteConn *pconn = NULL;
142 : static HTAB *remoteConnHash = NULL;
143 :
144 : /* custom wait event values, retrieved from shared memory */
145 : static uint32 dblink_we_connect = 0;
146 : static uint32 dblink_we_get_conn = 0;
147 : static uint32 dblink_we_get_result = 0;
148 :
149 : /*
150 : * Following is list that holds multiple remote connections.
151 : * Calling convention of each dblink function changes to accept
152 : * connection name as the first parameter. The connection list is
153 : * much like ecpg e.g. a mapping between a name and a PGconn object.
154 : */
155 :
156 : typedef struct remoteConnHashEnt
157 : {
158 : char name[NAMEDATALEN];
159 : remoteConn *rconn;
160 : } remoteConnHashEnt;
161 :
162 : /* initial number of connection hashes */
163 : #define NUMCONN 16
164 :
165 : static char *
166 96 : xpstrdup(const char *in)
167 : {
168 96 : if (in == NULL)
169 72 : return NULL;
170 24 : return pstrdup(in);
171 : }
172 :
173 : pg_noreturn static void
174 0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
175 : {
176 0 : char *msg = pchomp(PQerrorMessage(conn));
177 :
178 0 : PQclear(res);
179 0 : elog(ERROR, "%s: %s", p2, msg);
180 : }
181 :
182 : pg_noreturn static void
183 6 : dblink_conn_not_avail(const char *conname)
184 : {
185 6 : if (conname)
186 2 : ereport(ERROR,
187 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
188 : errmsg("connection \"%s\" not available", conname)));
189 : else
190 4 : ereport(ERROR,
191 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
192 : errmsg("connection not available")));
193 : }
194 :
195 : static void
196 74 : dblink_get_conn(char *conname_or_str,
197 : PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
198 : {
199 74 : remoteConn *rconn = getConnectionByName(conname_or_str);
200 : PGconn *conn;
201 : char *conname;
202 : bool freeconn;
203 :
204 74 : if (rconn)
205 : {
206 54 : conn = rconn->conn;
207 54 : conname = conname_or_str;
208 54 : freeconn = false;
209 : }
210 : else
211 : {
212 : const char *connstr;
213 :
214 20 : connstr = get_connect_string(conname_or_str);
215 20 : if (connstr == NULL)
216 12 : connstr = conname_or_str;
217 20 : dblink_connstr_check(connstr);
218 :
219 : /* first time, allocate or get the custom wait event */
220 18 : if (dblink_we_get_conn == 0)
221 10 : dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
222 :
223 : /* OK to make connection */
224 18 : conn = libpqsrv_connect(connstr, dblink_we_get_conn);
225 :
226 18 : if (PQstatus(conn) == CONNECTION_BAD)
227 : {
228 6 : char *msg = pchomp(PQerrorMessage(conn));
229 :
230 6 : libpqsrv_disconnect(conn);
231 6 : ereport(ERROR,
232 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
233 : errmsg("could not establish connection"),
234 : errdetail_internal("%s", msg)));
235 : }
236 12 : dblink_security_check(conn, rconn, connstr);
237 12 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
238 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
239 12 : freeconn = true;
240 12 : conname = NULL;
241 : }
242 :
243 66 : *conn_p = conn;
244 66 : *conname_p = conname;
245 66 : *freeconn_p = freeconn;
246 66 : }
247 :
248 : static PGconn *
249 34 : dblink_get_named_conn(const char *conname)
250 : {
251 34 : remoteConn *rconn = getConnectionByName(conname);
252 :
253 34 : if (rconn)
254 34 : return rconn->conn;
255 :
256 0 : dblink_conn_not_avail(conname);
257 : return NULL; /* keep compiler quiet */
258 : }
259 :
260 : static void
261 248 : dblink_init(void)
262 : {
263 248 : if (!pconn)
264 : {
265 14 : if (dblink_we_get_result == 0)
266 14 : dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
267 :
268 14 : pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
269 14 : pconn->conn = NULL;
270 14 : pconn->openCursorCount = 0;
271 14 : pconn->newXactForCursor = false;
272 : }
273 248 : }
274 :
275 : /*
276 : * Create a persistent connection to another database
277 : */
278 26 : PG_FUNCTION_INFO_V1(dblink_connect);
279 : Datum
280 32 : dblink_connect(PG_FUNCTION_ARGS)
281 : {
282 32 : char *conname_or_str = NULL;
283 32 : char *connstr = NULL;
284 32 : char *connname = NULL;
285 : char *msg;
286 32 : PGconn *conn = NULL;
287 32 : remoteConn *rconn = NULL;
288 :
289 32 : dblink_init();
290 :
291 32 : if (PG_NARGS() == 2)
292 : {
293 22 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
294 22 : connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
295 : }
296 10 : else if (PG_NARGS() == 1)
297 10 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
298 :
299 32 : if (connname)
300 : {
301 22 : rconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext,
302 : sizeof(remoteConn));
303 22 : rconn->conn = NULL;
304 22 : rconn->openCursorCount = 0;
305 22 : rconn->newXactForCursor = false;
306 : }
307 :
308 : /* first check for valid foreign data server */
309 32 : connstr = get_connect_string(conname_or_str);
310 32 : if (connstr == NULL)
311 28 : connstr = conname_or_str;
312 :
313 : /* check password in connection string if not superuser */
314 32 : dblink_connstr_check(connstr);
315 :
316 : /* first time, allocate or get the custom wait event */
317 30 : if (dblink_we_connect == 0)
318 4 : dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
319 :
320 : /* OK to make connection */
321 30 : conn = libpqsrv_connect(connstr, dblink_we_connect);
322 :
323 30 : if (PQstatus(conn) == CONNECTION_BAD)
324 : {
325 0 : msg = pchomp(PQerrorMessage(conn));
326 0 : libpqsrv_disconnect(conn);
327 0 : if (rconn)
328 0 : pfree(rconn);
329 :
330 0 : ereport(ERROR,
331 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
332 : errmsg("could not establish connection"),
333 : errdetail_internal("%s", msg)));
334 : }
335 :
336 : /* check password actually used if not superuser */
337 30 : dblink_security_check(conn, rconn, connstr);
338 :
339 : /* attempt to set client encoding to match server encoding, if needed */
340 30 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
341 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
342 :
343 30 : if (connname)
344 : {
345 20 : rconn->conn = conn;
346 20 : createNewConnection(connname, rconn);
347 : }
348 : else
349 : {
350 10 : if (pconn->conn)
351 2 : libpqsrv_disconnect(pconn->conn);
352 10 : pconn->conn = conn;
353 : }
354 :
355 28 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
356 : }
357 :
358 : /*
359 : * Clear a persistent connection to another database
360 : */
361 16 : PG_FUNCTION_INFO_V1(dblink_disconnect);
362 : Datum
363 26 : dblink_disconnect(PG_FUNCTION_ARGS)
364 : {
365 26 : char *conname = NULL;
366 26 : remoteConn *rconn = NULL;
367 26 : PGconn *conn = NULL;
368 :
369 26 : dblink_init();
370 :
371 26 : if (PG_NARGS() == 1)
372 : {
373 18 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
374 18 : rconn = getConnectionByName(conname);
375 18 : if (rconn)
376 16 : conn = rconn->conn;
377 : }
378 : else
379 8 : conn = pconn->conn;
380 :
381 26 : if (!conn)
382 2 : dblink_conn_not_avail(conname);
383 :
384 24 : libpqsrv_disconnect(conn);
385 24 : if (rconn)
386 : {
387 16 : deleteConnection(conname);
388 16 : pfree(rconn);
389 : }
390 : else
391 8 : pconn->conn = NULL;
392 :
393 24 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
394 : }
395 :
396 : /*
397 : * opens a cursor using a persistent connection
398 : */
399 26 : PG_FUNCTION_INFO_V1(dblink_open);
400 : Datum
401 18 : dblink_open(PG_FUNCTION_ARGS)
402 : {
403 18 : PGresult *res = NULL;
404 : PGconn *conn;
405 18 : char *curname = NULL;
406 18 : char *sql = NULL;
407 18 : char *conname = NULL;
408 : StringInfoData buf;
409 18 : remoteConn *rconn = NULL;
410 18 : bool fail = true; /* default to backward compatible behavior */
411 :
412 18 : dblink_init();
413 18 : initStringInfo(&buf);
414 :
415 18 : if (PG_NARGS() == 2)
416 : {
417 : /* text,text */
418 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
419 4 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
420 4 : rconn = pconn;
421 : }
422 14 : else if (PG_NARGS() == 3)
423 : {
424 : /* might be text,text,text or text,text,bool */
425 12 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
426 : {
427 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
428 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
429 2 : fail = PG_GETARG_BOOL(2);
430 2 : rconn = pconn;
431 : }
432 : else
433 : {
434 10 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
435 10 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
436 10 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
437 10 : rconn = getConnectionByName(conname);
438 : }
439 : }
440 2 : else if (PG_NARGS() == 4)
441 : {
442 : /* text,text,text,bool */
443 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
444 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
445 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
446 2 : fail = PG_GETARG_BOOL(3);
447 2 : rconn = getConnectionByName(conname);
448 : }
449 :
450 18 : if (!rconn || !rconn->conn)
451 0 : dblink_conn_not_avail(conname);
452 :
453 18 : conn = rconn->conn;
454 :
455 : /* If we are not in a transaction, start one */
456 18 : if (PQtransactionStatus(conn) == PQTRANS_IDLE)
457 : {
458 14 : res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
459 14 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
460 0 : dblink_res_internalerror(conn, res, "begin error");
461 14 : PQclear(res);
462 14 : rconn->newXactForCursor = true;
463 :
464 : /*
465 : * Since transaction state was IDLE, we force cursor count to
466 : * initially be 0. This is needed as a previous ABORT might have wiped
467 : * out our transaction without maintaining the cursor count for us.
468 : */
469 14 : rconn->openCursorCount = 0;
470 : }
471 :
472 : /* if we started a transaction, increment cursor count */
473 18 : if (rconn->newXactForCursor)
474 18 : (rconn->openCursorCount)++;
475 :
476 18 : appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
477 18 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
478 18 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
479 : {
480 4 : dblink_res_error(conn, conname, res, fail,
481 : "while opening cursor \"%s\"", curname);
482 4 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
483 : }
484 :
485 14 : PQclear(res);
486 14 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
487 : }
488 :
489 : /*
490 : * closes a cursor
491 : */
492 20 : PG_FUNCTION_INFO_V1(dblink_close);
493 : Datum
494 10 : dblink_close(PG_FUNCTION_ARGS)
495 : {
496 : PGconn *conn;
497 10 : PGresult *res = NULL;
498 10 : char *curname = NULL;
499 10 : char *conname = NULL;
500 : StringInfoData buf;
501 10 : remoteConn *rconn = NULL;
502 10 : bool fail = true; /* default to backward compatible behavior */
503 :
504 10 : dblink_init();
505 10 : initStringInfo(&buf);
506 :
507 10 : if (PG_NARGS() == 1)
508 : {
509 : /* text */
510 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
511 0 : rconn = pconn;
512 : }
513 10 : else if (PG_NARGS() == 2)
514 : {
515 : /* might be text,text or text,bool */
516 10 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
517 : {
518 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
519 4 : fail = PG_GETARG_BOOL(1);
520 4 : rconn = pconn;
521 : }
522 : else
523 : {
524 6 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
525 6 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
526 6 : rconn = getConnectionByName(conname);
527 : }
528 : }
529 10 : if (PG_NARGS() == 3)
530 : {
531 : /* text,text,bool */
532 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
533 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
534 0 : fail = PG_GETARG_BOOL(2);
535 0 : rconn = getConnectionByName(conname);
536 : }
537 :
538 10 : if (!rconn || !rconn->conn)
539 0 : dblink_conn_not_avail(conname);
540 :
541 10 : conn = rconn->conn;
542 :
543 10 : appendStringInfo(&buf, "CLOSE %s", curname);
544 :
545 : /* close the cursor */
546 10 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
547 10 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
548 : {
549 2 : dblink_res_error(conn, conname, res, fail,
550 : "while closing cursor \"%s\"", curname);
551 2 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
552 : }
553 :
554 8 : PQclear(res);
555 :
556 : /* if we started a transaction, decrement cursor count */
557 8 : if (rconn->newXactForCursor)
558 : {
559 8 : (rconn->openCursorCount)--;
560 :
561 : /* if count is zero, commit the transaction */
562 8 : if (rconn->openCursorCount == 0)
563 : {
564 4 : rconn->newXactForCursor = false;
565 :
566 4 : res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
567 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
568 0 : dblink_res_internalerror(conn, res, "commit error");
569 4 : PQclear(res);
570 : }
571 : }
572 :
573 8 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
574 : }
575 :
576 : /*
577 : * Fetch results from an open cursor
578 : */
579 26 : PG_FUNCTION_INFO_V1(dblink_fetch);
580 : Datum
581 26 : dblink_fetch(PG_FUNCTION_ARGS)
582 : {
583 26 : PGresult *res = NULL;
584 26 : char *conname = NULL;
585 26 : remoteConn *rconn = NULL;
586 26 : PGconn *conn = NULL;
587 : StringInfoData buf;
588 26 : char *curname = NULL;
589 26 : int howmany = 0;
590 26 : bool fail = true; /* default to backward compatible */
591 :
592 26 : prepTuplestoreResult(fcinfo);
593 :
594 26 : dblink_init();
595 :
596 26 : if (PG_NARGS() == 4)
597 : {
598 : /* text,text,int,bool */
599 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
600 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
601 2 : howmany = PG_GETARG_INT32(2);
602 2 : fail = PG_GETARG_BOOL(3);
603 :
604 2 : rconn = getConnectionByName(conname);
605 2 : if (rconn)
606 2 : conn = rconn->conn;
607 : }
608 24 : else if (PG_NARGS() == 3)
609 : {
610 : /* text,text,int or text,int,bool */
611 16 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
612 : {
613 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
614 4 : howmany = PG_GETARG_INT32(1);
615 4 : fail = PG_GETARG_BOOL(2);
616 4 : conn = pconn->conn;
617 : }
618 : else
619 : {
620 12 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
621 12 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
622 12 : howmany = PG_GETARG_INT32(2);
623 :
624 12 : rconn = getConnectionByName(conname);
625 12 : if (rconn)
626 12 : conn = rconn->conn;
627 : }
628 : }
629 8 : else if (PG_NARGS() == 2)
630 : {
631 : /* text,int */
632 8 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
633 8 : howmany = PG_GETARG_INT32(1);
634 8 : conn = pconn->conn;
635 : }
636 :
637 26 : if (!conn)
638 0 : dblink_conn_not_avail(conname);
639 :
640 26 : initStringInfo(&buf);
641 26 : appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
642 :
643 : /*
644 : * Try to execute the query. Note that since libpq uses malloc, the
645 : * PGresult will be long-lived even though we are still in a short-lived
646 : * memory context.
647 : */
648 26 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
649 52 : if (!res ||
650 52 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
651 26 : PQresultStatus(res) != PGRES_TUPLES_OK))
652 : {
653 10 : dblink_res_error(conn, conname, res, fail,
654 : "while fetching from cursor \"%s\"", curname);
655 6 : return (Datum) 0;
656 : }
657 16 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
658 : {
659 : /* cursor does not exist - closed already or bad name */
660 0 : PQclear(res);
661 0 : ereport(ERROR,
662 : (errcode(ERRCODE_INVALID_CURSOR_NAME),
663 : errmsg("cursor \"%s\" does not exist", curname)));
664 : }
665 :
666 16 : materializeResult(fcinfo, conn, res);
667 14 : return (Datum) 0;
668 : }
669 :
670 : /*
671 : * Note: this is the new preferred version of dblink
672 : */
673 38 : PG_FUNCTION_INFO_V1(dblink_record);
674 : Datum
675 58 : dblink_record(PG_FUNCTION_ARGS)
676 : {
677 58 : return dblink_record_internal(fcinfo, false);
678 : }
679 :
680 8 : PG_FUNCTION_INFO_V1(dblink_send_query);
681 : Datum
682 12 : dblink_send_query(PG_FUNCTION_ARGS)
683 : {
684 : PGconn *conn;
685 : char *sql;
686 : int retval;
687 :
688 12 : if (PG_NARGS() == 2)
689 : {
690 12 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
691 12 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
692 : }
693 : else
694 : /* shouldn't happen */
695 0 : elog(ERROR, "wrong number of arguments");
696 :
697 : /* async query send */
698 12 : retval = PQsendQuery(conn, sql);
699 12 : if (retval != 1)
700 0 : elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
701 :
702 12 : PG_RETURN_INT32(retval);
703 : }
704 :
705 12 : PG_FUNCTION_INFO_V1(dblink_get_result);
706 : Datum
707 16 : dblink_get_result(PG_FUNCTION_ARGS)
708 : {
709 16 : return dblink_record_internal(fcinfo, true);
710 : }
711 :
712 : static Datum
713 74 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
714 : {
715 74 : PGconn *volatile conn = NULL;
716 74 : volatile bool freeconn = false;
717 :
718 74 : prepTuplestoreResult(fcinfo);
719 :
720 74 : dblink_init();
721 :
722 74 : PG_TRY();
723 : {
724 74 : char *sql = NULL;
725 74 : char *conname = NULL;
726 74 : bool fail = true; /* default to backward compatible */
727 :
728 74 : if (!is_async)
729 : {
730 58 : if (PG_NARGS() == 3)
731 : {
732 : /* text,text,bool */
733 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
734 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
735 2 : fail = PG_GETARG_BOOL(2);
736 2 : dblink_get_conn(conname, &conn, &conname, &freeconn);
737 : }
738 56 : else if (PG_NARGS() == 2)
739 : {
740 : /* text,text or text,bool */
741 42 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
742 : {
743 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
744 2 : fail = PG_GETARG_BOOL(1);
745 2 : conn = pconn->conn;
746 : }
747 : else
748 : {
749 40 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
750 40 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
751 40 : dblink_get_conn(conname, &conn, &conname, &freeconn);
752 : }
753 : }
754 14 : else if (PG_NARGS() == 1)
755 : {
756 : /* text */
757 14 : conn = pconn->conn;
758 14 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
759 : }
760 : else
761 : /* shouldn't happen */
762 0 : elog(ERROR, "wrong number of arguments");
763 : }
764 : else /* is_async */
765 : {
766 : /* get async result */
767 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
768 :
769 16 : if (PG_NARGS() == 2)
770 : {
771 : /* text,bool */
772 0 : fail = PG_GETARG_BOOL(1);
773 0 : conn = dblink_get_named_conn(conname);
774 : }
775 16 : else if (PG_NARGS() == 1)
776 : {
777 : /* text */
778 16 : conn = dblink_get_named_conn(conname);
779 : }
780 : else
781 : /* shouldn't happen */
782 0 : elog(ERROR, "wrong number of arguments");
783 : }
784 :
785 66 : if (!conn)
786 4 : dblink_conn_not_avail(conname);
787 :
788 62 : if (!is_async)
789 : {
790 : /* synchronous query, use efficient tuple collection method */
791 46 : materializeQueryResult(fcinfo, conn, conname, sql, fail);
792 : }
793 : else
794 : {
795 : /* async result retrieval, do it the old way */
796 16 : PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);
797 :
798 : /* NULL means we're all done with the async results */
799 16 : if (res)
800 : {
801 10 : if (PQresultStatus(res) != PGRES_COMMAND_OK &&
802 10 : PQresultStatus(res) != PGRES_TUPLES_OK)
803 : {
804 0 : dblink_res_error(conn, conname, res, fail,
805 : "while executing query");
806 : /* if fail isn't set, we'll return an empty query result */
807 : }
808 : else
809 : {
810 10 : materializeResult(fcinfo, conn, res);
811 : }
812 : }
813 : }
814 : }
815 12 : PG_FINALLY();
816 : {
817 : /* if needed, close the connection to the database */
818 74 : if (freeconn)
819 10 : libpqsrv_disconnect(conn);
820 : }
821 74 : PG_END_TRY();
822 :
823 62 : return (Datum) 0;
824 : }
825 :
826 : /*
827 : * Verify function caller can handle a tuplestore result, and set up for that.
828 : *
829 : * Note: if the caller returns without actually creating a tuplestore, the
830 : * executor will treat the function result as an empty set.
831 : */
832 : static void
833 100 : prepTuplestoreResult(FunctionCallInfo fcinfo)
834 : {
835 100 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
836 :
837 : /* check to see if query supports us returning a tuplestore */
838 100 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
839 0 : ereport(ERROR,
840 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
841 : errmsg("set-valued function called in context that cannot accept a set")));
842 100 : if (!(rsinfo->allowedModes & SFRM_Materialize))
843 0 : ereport(ERROR,
844 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
845 : errmsg("materialize mode required, but it is not allowed in this context")));
846 :
847 : /* let the executor know we're sending back a tuplestore */
848 100 : rsinfo->returnMode = SFRM_Materialize;
849 :
850 : /* caller must fill these to return a non-empty result */
851 100 : rsinfo->setResult = NULL;
852 100 : rsinfo->setDesc = NULL;
853 100 : }
854 :
855 : /*
856 : * Copy the contents of the PGresult into a tuplestore to be returned
857 : * as the result of the current function.
858 : * The PGresult will be released in this function.
859 : */
860 : static void
861 26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
862 : {
863 26 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
864 :
865 : /* prepTuplestoreResult must have been called previously */
866 : Assert(rsinfo->returnMode == SFRM_Materialize);
867 :
868 26 : PG_TRY();
869 : {
870 : TupleDesc tupdesc;
871 : bool is_sql_cmd;
872 : int ntuples;
873 : int nfields;
874 :
875 26 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
876 : {
877 0 : is_sql_cmd = true;
878 :
879 : /*
880 : * need a tuple descriptor representing one TEXT column to return
881 : * the command status string as our result tuple
882 : */
883 0 : tupdesc = CreateTemplateTupleDesc(1);
884 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
885 : TEXTOID, -1, 0);
886 0 : ntuples = 1;
887 0 : nfields = 1;
888 : }
889 : else
890 : {
891 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
892 :
893 26 : is_sql_cmd = false;
894 :
895 : /* get a tuple descriptor for our result type */
896 26 : switch (get_call_result_type(fcinfo, NULL, &tupdesc))
897 : {
898 26 : case TYPEFUNC_COMPOSITE:
899 : /* success */
900 26 : break;
901 0 : case TYPEFUNC_RECORD:
902 : /* failed to determine actual type of RECORD */
903 0 : ereport(ERROR,
904 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
905 : errmsg("function returning record called in context "
906 : "that cannot accept type record")));
907 : break;
908 0 : default:
909 : /* result type isn't composite */
910 0 : elog(ERROR, "return type must be a row type");
911 : break;
912 : }
913 :
914 : /* make sure we have a persistent copy of the tupdesc */
915 26 : tupdesc = CreateTupleDescCopy(tupdesc);
916 26 : ntuples = PQntuples(res);
917 26 : nfields = PQnfields(res);
918 : }
919 :
920 : /*
921 : * check result and tuple descriptor have the same number of columns
922 : */
923 26 : if (nfields != tupdesc->natts)
924 0 : ereport(ERROR,
925 : (errcode(ERRCODE_DATATYPE_MISMATCH),
926 : errmsg("remote query result rowtype does not match "
927 : "the specified FROM clause rowtype")));
928 :
929 26 : if (ntuples > 0)
930 : {
931 : AttInMetadata *attinmeta;
932 26 : int nestlevel = -1;
933 : Tuplestorestate *tupstore;
934 : MemoryContext oldcontext;
935 : int row;
936 : char **values;
937 :
938 26 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
939 :
940 : /* Set GUCs to ensure we read GUC-sensitive data types correctly */
941 26 : if (!is_sql_cmd)
942 26 : nestlevel = applyRemoteGucs(conn);
943 :
944 26 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
945 26 : tupstore = tuplestore_begin_heap(true, false, work_mem);
946 26 : rsinfo->setResult = tupstore;
947 26 : rsinfo->setDesc = tupdesc;
948 26 : MemoryContextSwitchTo(oldcontext);
949 :
950 26 : values = palloc_array(char *, nfields);
951 :
952 : /* put all tuples into the tuplestore */
953 98 : for (row = 0; row < ntuples; row++)
954 : {
955 : HeapTuple tuple;
956 :
957 74 : if (!is_sql_cmd)
958 : {
959 : int i;
960 :
961 276 : for (i = 0; i < nfields; i++)
962 : {
963 202 : if (PQgetisnull(res, row, i))
964 0 : values[i] = NULL;
965 : else
966 202 : values[i] = PQgetvalue(res, row, i);
967 : }
968 : }
969 : else
970 : {
971 0 : values[0] = PQcmdStatus(res);
972 : }
973 :
974 : /* build the tuple and put it into the tuplestore. */
975 74 : tuple = BuildTupleFromCStrings(attinmeta, values);
976 72 : tuplestore_puttuple(tupstore, tuple);
977 : }
978 :
979 : /* clean up GUC settings, if we changed any */
980 24 : restoreLocalGucs(nestlevel);
981 : }
982 : }
983 2 : PG_FINALLY();
984 : {
985 : /* be sure to release the libpq result */
986 26 : PQclear(res);
987 : }
988 26 : PG_END_TRY();
989 24 : }
990 :
991 : /*
992 : * Execute the given SQL command and store its results into a tuplestore
993 : * to be returned as the result of the current function.
994 : *
995 : * This is equivalent to PQexec followed by materializeResult, but we make
996 : * use of libpq's single-row mode to avoid accumulating the whole result
997 : * inside libpq before it gets transferred to the tuplestore.
998 : */
999 : static void
1000 46 : materializeQueryResult(FunctionCallInfo fcinfo,
1001 : PGconn *conn,
1002 : const char *conname,
1003 : const char *sql,
1004 : bool fail)
1005 : {
1006 46 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1007 46 : PGresult *volatile res = NULL;
1008 46 : volatile storeInfo sinfo = {0};
1009 :
1010 : /* prepTuplestoreResult must have been called previously */
1011 : Assert(rsinfo->returnMode == SFRM_Materialize);
1012 :
1013 46 : sinfo.fcinfo = fcinfo;
1014 :
1015 46 : PG_TRY();
1016 : {
1017 : /* Create short-lived memory context for data conversions */
1018 46 : sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1019 : "dblink temporary context",
1020 : ALLOCSET_DEFAULT_SIZES);
1021 :
1022 : /* execute query, collecting any tuples into the tuplestore */
1023 46 : res = storeQueryResult(&sinfo, conn, sql);
1024 :
1025 46 : if (!res ||
1026 46 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1027 46 : PQresultStatus(res) != PGRES_TUPLES_OK))
1028 4 : {
1029 : /*
1030 : * dblink_res_error will clear the passed PGresult, so we need
1031 : * this ugly dance to avoid doing so twice during error exit
1032 : */
1033 4 : PGresult *res1 = res;
1034 :
1035 4 : res = NULL;
1036 4 : dblink_res_error(conn, conname, res1, fail,
1037 : "while executing query");
1038 : /* if fail isn't set, we'll return an empty query result */
1039 : }
1040 42 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1041 : {
1042 : /*
1043 : * storeRow didn't get called, so we need to convert the command
1044 : * status string to a tuple manually
1045 : */
1046 : TupleDesc tupdesc;
1047 : AttInMetadata *attinmeta;
1048 : Tuplestorestate *tupstore;
1049 : HeapTuple tuple;
1050 : char *values[1];
1051 : MemoryContext oldcontext;
1052 :
1053 : /*
1054 : * need a tuple descriptor representing one TEXT column to return
1055 : * the command status string as our result tuple
1056 : */
1057 0 : tupdesc = CreateTemplateTupleDesc(1);
1058 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1059 : TEXTOID, -1, 0);
1060 0 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1061 :
1062 0 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1063 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1064 0 : rsinfo->setResult = tupstore;
1065 0 : rsinfo->setDesc = tupdesc;
1066 0 : MemoryContextSwitchTo(oldcontext);
1067 :
1068 0 : values[0] = PQcmdStatus(res);
1069 :
1070 : /* build the tuple and put it into the tuplestore. */
1071 0 : tuple = BuildTupleFromCStrings(attinmeta, values);
1072 0 : tuplestore_puttuple(tupstore, tuple);
1073 :
1074 0 : PQclear(res);
1075 0 : res = NULL;
1076 : }
1077 : else
1078 : {
1079 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1080 : /* storeRow should have created a tuplestore */
1081 : Assert(rsinfo->setResult != NULL);
1082 :
1083 42 : PQclear(res);
1084 42 : res = NULL;
1085 : }
1086 :
1087 : /* clean up data conversion short-lived memory context */
1088 46 : if (sinfo.tmpcontext != NULL)
1089 46 : MemoryContextDelete(sinfo.tmpcontext);
1090 46 : sinfo.tmpcontext = NULL;
1091 :
1092 46 : PQclear(sinfo.last_res);
1093 46 : sinfo.last_res = NULL;
1094 46 : PQclear(sinfo.cur_res);
1095 46 : sinfo.cur_res = NULL;
1096 : }
1097 0 : PG_CATCH();
1098 : {
1099 : /* be sure to release any libpq result we collected */
1100 0 : PQclear(res);
1101 0 : PQclear(sinfo.last_res);
1102 0 : PQclear(sinfo.cur_res);
1103 : /* and clear out any pending data in libpq */
1104 0 : while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
1105 : NULL)
1106 0 : PQclear(res);
1107 0 : PG_RE_THROW();
1108 : }
1109 46 : PG_END_TRY();
1110 46 : }
1111 :
1112 : /*
1113 : * Execute query, and send any result rows to sinfo->tuplestore.
1114 : */
1115 : static PGresult *
1116 46 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1117 : {
1118 46 : bool first = true;
1119 46 : int nestlevel = -1;
1120 : PGresult *res;
1121 :
1122 46 : if (!PQsendQuery(conn, sql))
1123 0 : elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1124 :
1125 46 : if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1126 0 : elog(ERROR, "failed to set single-row mode for dblink query");
1127 :
1128 : for (;;)
1129 : {
1130 396 : CHECK_FOR_INTERRUPTS();
1131 :
1132 396 : sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
1133 396 : if (!sinfo->cur_res)
1134 46 : break;
1135 :
1136 350 : if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1137 : {
1138 : /* got one row from possibly-bigger resultset */
1139 :
1140 : /*
1141 : * Set GUCs to ensure we read GUC-sensitive data types correctly.
1142 : * We shouldn't do this until we have a row in hand, to ensure
1143 : * libpq has seen any earlier ParameterStatus protocol messages.
1144 : */
1145 304 : if (first && nestlevel < 0)
1146 42 : nestlevel = applyRemoteGucs(conn);
1147 :
1148 304 : storeRow(sinfo, sinfo->cur_res, first);
1149 :
1150 304 : PQclear(sinfo->cur_res);
1151 304 : sinfo->cur_res = NULL;
1152 304 : first = false;
1153 : }
1154 : else
1155 : {
1156 : /* if empty resultset, fill tuplestore header */
1157 46 : if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1158 0 : storeRow(sinfo, sinfo->cur_res, first);
1159 :
1160 : /* store completed result at last_res */
1161 46 : PQclear(sinfo->last_res);
1162 46 : sinfo->last_res = sinfo->cur_res;
1163 46 : sinfo->cur_res = NULL;
1164 46 : first = true;
1165 : }
1166 : }
1167 :
1168 : /* clean up GUC settings, if we changed any */
1169 46 : restoreLocalGucs(nestlevel);
1170 :
1171 : /* return last_res */
1172 46 : res = sinfo->last_res;
1173 46 : sinfo->last_res = NULL;
1174 46 : return res;
1175 : }
1176 :
1177 : /*
1178 : * Send single row to sinfo->tuplestore.
1179 : *
1180 : * If "first" is true, create the tuplestore using PGresult's metadata
1181 : * (in this case the PGresult might contain either zero or one row).
1182 : */
1183 : static void
1184 304 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1185 : {
1186 304 : int nfields = PQnfields(res);
1187 : HeapTuple tuple;
1188 : int i;
1189 : MemoryContext oldcontext;
1190 :
1191 304 : if (first)
1192 : {
1193 : /* Prepare for new result set */
1194 42 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1195 : TupleDesc tupdesc;
1196 :
1197 : /*
1198 : * It's possible to get more than one result set if the query string
1199 : * contained multiple SQL commands. In that case, we follow PQexec's
1200 : * traditional behavior of throwing away all but the last result.
1201 : */
1202 42 : if (sinfo->tuplestore)
1203 0 : tuplestore_end(sinfo->tuplestore);
1204 42 : sinfo->tuplestore = NULL;
1205 :
1206 : /* get a tuple descriptor for our result type */
1207 42 : switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1208 : {
1209 42 : case TYPEFUNC_COMPOSITE:
1210 : /* success */
1211 42 : break;
1212 0 : case TYPEFUNC_RECORD:
1213 : /* failed to determine actual type of RECORD */
1214 0 : ereport(ERROR,
1215 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1216 : errmsg("function returning record called in context "
1217 : "that cannot accept type record")));
1218 : break;
1219 0 : default:
1220 : /* result type isn't composite */
1221 0 : elog(ERROR, "return type must be a row type");
1222 : break;
1223 : }
1224 :
1225 : /* make sure we have a persistent copy of the tupdesc */
1226 42 : tupdesc = CreateTupleDescCopy(tupdesc);
1227 :
1228 : /* check result and tuple descriptor have the same number of columns */
1229 42 : if (nfields != tupdesc->natts)
1230 0 : ereport(ERROR,
1231 : (errcode(ERRCODE_DATATYPE_MISMATCH),
1232 : errmsg("remote query result rowtype does not match "
1233 : "the specified FROM clause rowtype")));
1234 :
1235 : /* Prepare attinmeta for later data conversions */
1236 42 : sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1237 :
1238 : /* Create a new, empty tuplestore */
1239 42 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1240 42 : sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1241 42 : rsinfo->setResult = sinfo->tuplestore;
1242 42 : rsinfo->setDesc = tupdesc;
1243 42 : MemoryContextSwitchTo(oldcontext);
1244 :
1245 : /* Done if empty resultset */
1246 42 : if (PQntuples(res) == 0)
1247 0 : return;
1248 :
1249 : /*
1250 : * Set up sufficiently-wide string pointers array; this won't change
1251 : * in size so it's easy to preallocate.
1252 : */
1253 42 : if (sinfo->cstrs)
1254 0 : pfree(sinfo->cstrs);
1255 42 : sinfo->cstrs = palloc_array(char *, nfields);
1256 : }
1257 :
1258 : /* Should have a single-row result if we get here */
1259 : Assert(PQntuples(res) == 1);
1260 :
1261 : /*
1262 : * Do the following work in a temp context that we reset after each tuple.
1263 : * This cleans up not only the data we have direct access to, but any
1264 : * cruft the I/O functions might leak.
1265 : */
1266 304 : oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1267 :
1268 : /*
1269 : * Fill cstrs with null-terminated strings of column values.
1270 : */
1271 1140 : for (i = 0; i < nfields; i++)
1272 : {
1273 836 : if (PQgetisnull(res, 0, i))
1274 0 : sinfo->cstrs[i] = NULL;
1275 : else
1276 836 : sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1277 : }
1278 :
1279 : /* Convert row to a tuple, and add it to the tuplestore */
1280 304 : tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1281 :
1282 304 : tuplestore_puttuple(sinfo->tuplestore, tuple);
1283 :
1284 : /* Clean up */
1285 304 : MemoryContextSwitchTo(oldcontext);
1286 304 : MemoryContextReset(sinfo->tmpcontext);
1287 : }
1288 :
1289 : /*
1290 : * List all open dblink connections by name.
1291 : * Returns an array of all connection names.
1292 : * Takes no params
1293 : */
1294 6 : PG_FUNCTION_INFO_V1(dblink_get_connections);
1295 : Datum
1296 2 : dblink_get_connections(PG_FUNCTION_ARGS)
1297 : {
1298 : HASH_SEQ_STATUS status;
1299 : remoteConnHashEnt *hentry;
1300 2 : ArrayBuildState *astate = NULL;
1301 :
1302 2 : if (remoteConnHash)
1303 : {
1304 2 : hash_seq_init(&status, remoteConnHash);
1305 8 : while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1306 : {
1307 : /* stash away current value */
1308 6 : astate = accumArrayResult(astate,
1309 6 : CStringGetTextDatum(hentry->name),
1310 : false, TEXTOID, CurrentMemoryContext);
1311 : }
1312 : }
1313 :
1314 2 : if (astate)
1315 2 : PG_RETURN_DATUM(makeArrayResult(astate,
1316 : CurrentMemoryContext));
1317 : else
1318 0 : PG_RETURN_NULL();
1319 : }
1320 :
1321 : /*
1322 : * Checks if a given remote connection is busy
1323 : *
1324 : * Returns 1 if the connection is busy, 0 otherwise
1325 : * Params:
1326 : * text connection_name - name of the connection to check
1327 : *
1328 : */
1329 6 : PG_FUNCTION_INFO_V1(dblink_is_busy);
1330 : Datum
1331 2 : dblink_is_busy(PG_FUNCTION_ARGS)
1332 : {
1333 : PGconn *conn;
1334 :
1335 2 : dblink_init();
1336 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1337 :
1338 2 : PQconsumeInput(conn);
1339 2 : PG_RETURN_INT32(PQisBusy(conn));
1340 : }
1341 :
1342 : /*
1343 : * Cancels a running request on a connection
1344 : *
1345 : * Returns text:
1346 : * "OK" if the cancel request has been sent correctly,
1347 : * an error message otherwise
1348 : *
1349 : * Params:
1350 : * text connection_name - name of the connection to check
1351 : *
1352 : */
1353 6 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
1354 : Datum
1355 2 : dblink_cancel_query(PG_FUNCTION_ARGS)
1356 : {
1357 : PGconn *conn;
1358 : const char *msg;
1359 : TimestampTz endtime;
1360 :
1361 2 : dblink_init();
1362 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1363 2 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1364 : 30000);
1365 2 : msg = libpqsrv_cancel(conn, endtime);
1366 2 : if (msg == NULL)
1367 2 : msg = "OK";
1368 :
1369 2 : PG_RETURN_TEXT_P(cstring_to_text(msg));
1370 : }
1371 :
1372 :
1373 : /*
1374 : * Get error message from a connection
1375 : *
1376 : * Returns text:
1377 : * "OK" if no error, an error message otherwise
1378 : *
1379 : * Params:
1380 : * text connection_name - name of the connection to check
1381 : *
1382 : */
1383 6 : PG_FUNCTION_INFO_V1(dblink_error_message);
1384 : Datum
1385 2 : dblink_error_message(PG_FUNCTION_ARGS)
1386 : {
1387 : char *msg;
1388 : PGconn *conn;
1389 :
1390 2 : dblink_init();
1391 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1392 :
1393 2 : msg = PQerrorMessage(conn);
1394 2 : if (msg == NULL || msg[0] == '\0')
1395 2 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1396 : else
1397 0 : PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1398 : }
1399 :
1400 : /*
1401 : * Execute an SQL non-SELECT command
1402 : */
1403 26 : PG_FUNCTION_INFO_V1(dblink_exec);
1404 : Datum
1405 52 : dblink_exec(PG_FUNCTION_ARGS)
1406 : {
1407 52 : text *volatile sql_cmd_status = NULL;
1408 52 : PGconn *volatile conn = NULL;
1409 52 : volatile bool freeconn = false;
1410 :
1411 52 : dblink_init();
1412 :
1413 52 : PG_TRY();
1414 : {
1415 52 : PGresult *res = NULL;
1416 52 : char *sql = NULL;
1417 52 : char *conname = NULL;
1418 52 : bool fail = true; /* default to backward compatible behavior */
1419 :
1420 52 : if (PG_NARGS() == 3)
1421 : {
1422 : /* must be text,text,bool */
1423 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1424 0 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1425 0 : fail = PG_GETARG_BOOL(2);
1426 0 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1427 : }
1428 52 : else if (PG_NARGS() == 2)
1429 : {
1430 : /* might be text,text or text,bool */
1431 34 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1432 : {
1433 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1434 2 : fail = PG_GETARG_BOOL(1);
1435 2 : conn = pconn->conn;
1436 : }
1437 : else
1438 : {
1439 32 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1440 32 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1441 32 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1442 : }
1443 : }
1444 18 : else if (PG_NARGS() == 1)
1445 : {
1446 : /* must be single text argument */
1447 18 : conn = pconn->conn;
1448 18 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1449 : }
1450 : else
1451 : /* shouldn't happen */
1452 0 : elog(ERROR, "wrong number of arguments");
1453 :
1454 52 : if (!conn)
1455 0 : dblink_conn_not_avail(conname);
1456 :
1457 52 : res = libpqsrv_exec(conn, sql, dblink_we_get_result);
1458 52 : if (!res ||
1459 52 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1460 4 : PQresultStatus(res) != PGRES_TUPLES_OK))
1461 : {
1462 4 : dblink_res_error(conn, conname, res, fail,
1463 : "while executing command");
1464 :
1465 : /*
1466 : * and save a copy of the command status string to return as our
1467 : * result tuple
1468 : */
1469 2 : sql_cmd_status = cstring_to_text("ERROR");
1470 : }
1471 48 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1472 : {
1473 : /*
1474 : * and save a copy of the command status string to return as our
1475 : * result tuple
1476 : */
1477 48 : sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1478 48 : PQclear(res);
1479 : }
1480 : else
1481 : {
1482 0 : PQclear(res);
1483 0 : ereport(ERROR,
1484 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1485 : errmsg("statement returning results not allowed")));
1486 : }
1487 : }
1488 2 : PG_FINALLY();
1489 : {
1490 : /* if needed, close the connection to the database */
1491 52 : if (freeconn)
1492 2 : libpqsrv_disconnect(conn);
1493 : }
1494 52 : PG_END_TRY();
1495 :
1496 50 : PG_RETURN_TEXT_P(sql_cmd_status);
1497 : }
1498 :
1499 :
1500 : /*
1501 : * dblink_get_pkey
1502 : *
1503 : * Return list of primary key fields for the supplied relation,
1504 : * or NULL if none exists.
1505 : */
1506 6 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
1507 : Datum
1508 18 : dblink_get_pkey(PG_FUNCTION_ARGS)
1509 : {
1510 : int16 indnkeyatts;
1511 : char **results;
1512 : FuncCallContext *funcctx;
1513 : int32 call_cntr;
1514 : int32 max_calls;
1515 : AttInMetadata *attinmeta;
1516 : MemoryContext oldcontext;
1517 :
1518 : /* stuff done only on the first call of the function */
1519 18 : if (SRF_IS_FIRSTCALL())
1520 : {
1521 : Relation rel;
1522 : TupleDesc tupdesc;
1523 :
1524 : /* create a function context for cross-call persistence */
1525 6 : funcctx = SRF_FIRSTCALL_INIT();
1526 :
1527 : /*
1528 : * switch to memory context appropriate for multiple function calls
1529 : */
1530 6 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1531 :
1532 : /* open target relation */
1533 6 : rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1534 :
1535 : /* get the array of attnums */
1536 6 : results = get_pkey_attnames(rel, &indnkeyatts);
1537 :
1538 6 : relation_close(rel, AccessShareLock);
1539 :
1540 : /*
1541 : * need a tuple descriptor representing one INT and one TEXT column
1542 : */
1543 6 : tupdesc = CreateTemplateTupleDesc(2);
1544 6 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1545 : INT4OID, -1, 0);
1546 6 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1547 : TEXTOID, -1, 0);
1548 :
1549 : /*
1550 : * Generate attribute metadata needed later to produce tuples from raw
1551 : * C strings
1552 : */
1553 6 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1554 6 : funcctx->attinmeta = attinmeta;
1555 :
1556 6 : if ((results != NULL) && (indnkeyatts > 0))
1557 : {
1558 6 : funcctx->max_calls = indnkeyatts;
1559 :
1560 : /* got results, keep track of them */
1561 6 : funcctx->user_fctx = results;
1562 : }
1563 : else
1564 : {
1565 : /* fast track when no results */
1566 0 : MemoryContextSwitchTo(oldcontext);
1567 0 : SRF_RETURN_DONE(funcctx);
1568 : }
1569 :
1570 6 : MemoryContextSwitchTo(oldcontext);
1571 : }
1572 :
1573 : /* stuff done on every call of the function */
1574 18 : funcctx = SRF_PERCALL_SETUP();
1575 :
1576 : /*
1577 : * initialize per-call variables
1578 : */
1579 18 : call_cntr = funcctx->call_cntr;
1580 18 : max_calls = funcctx->max_calls;
1581 :
1582 18 : results = (char **) funcctx->user_fctx;
1583 18 : attinmeta = funcctx->attinmeta;
1584 :
1585 18 : if (call_cntr < max_calls) /* do when there is more left to send */
1586 : {
1587 : char **values;
1588 : HeapTuple tuple;
1589 : Datum result;
1590 :
1591 12 : values = palloc_array(char *, 2);
1592 12 : values[0] = psprintf("%d", call_cntr + 1);
1593 12 : values[1] = results[call_cntr];
1594 :
1595 : /* build the tuple */
1596 12 : tuple = BuildTupleFromCStrings(attinmeta, values);
1597 :
1598 : /* make the tuple into a datum */
1599 12 : result = HeapTupleGetDatum(tuple);
1600 :
1601 12 : SRF_RETURN_NEXT(funcctx, result);
1602 : }
1603 : else
1604 : {
1605 : /* do when there is no more left */
1606 6 : SRF_RETURN_DONE(funcctx);
1607 : }
1608 : }
1609 :
1610 :
1611 : /*
1612 : * dblink_build_sql_insert
1613 : *
1614 : * Used to generate an SQL insert statement
1615 : * based on an existing tuple in a local relation.
1616 : * This is useful for selectively replicating data
1617 : * to another server via dblink.
1618 : *
1619 : * API:
1620 : * <relname> - name of local table of interest
1621 : * <pkattnums> - an int2vector of attnums which will be used
1622 : * to identify the local tuple of interest
1623 : * <pknumatts> - number of attnums in pkattnums
1624 : * <src_pkattvals_arry> - text array of key values which will be used
1625 : * to identify the local tuple of interest
1626 : * <tgt_pkattvals_arry> - text array of key values which will be used
1627 : * to build the string for execution remotely. These are substituted
1628 : * for their counterparts in src_pkattvals_arry
1629 : */
1630 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1631 : Datum
1632 12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
1633 : {
1634 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1635 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1636 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1637 12 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1638 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1639 : Relation rel;
1640 : int *pkattnums;
1641 : int pknumatts;
1642 : char **src_pkattvals;
1643 : char **tgt_pkattvals;
1644 : int src_nitems;
1645 : int tgt_nitems;
1646 : char *sql;
1647 :
1648 : /*
1649 : * Open target relation.
1650 : */
1651 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1652 :
1653 : /*
1654 : * Process pkattnums argument.
1655 : */
1656 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1657 : &pkattnums, &pknumatts);
1658 :
1659 : /*
1660 : * Source array is made up of key values that will be used to locate the
1661 : * tuple of interest from the local system.
1662 : */
1663 8 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1664 :
1665 : /*
1666 : * There should be one source array key value for each key attnum
1667 : */
1668 8 : if (src_nitems != pknumatts)
1669 0 : ereport(ERROR,
1670 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1671 : errmsg("source key array length must match number of key attributes")));
1672 :
1673 : /*
1674 : * Target array is made up of key values that will be used to build the
1675 : * SQL string for use on the remote system.
1676 : */
1677 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1678 :
1679 : /*
1680 : * There should be one target array key value for each key attnum
1681 : */
1682 8 : if (tgt_nitems != pknumatts)
1683 0 : ereport(ERROR,
1684 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1685 : errmsg("target key array length must match number of key attributes")));
1686 :
1687 : /*
1688 : * Prep work is finally done. Go get the SQL string.
1689 : */
1690 8 : sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1691 :
1692 : /*
1693 : * Now we can close the relation.
1694 : */
1695 8 : relation_close(rel, AccessShareLock);
1696 :
1697 : /*
1698 : * And send it
1699 : */
1700 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1701 : }
1702 :
1703 :
1704 : /*
1705 : * dblink_build_sql_delete
1706 : *
1707 : * Used to generate an SQL delete statement.
1708 : * This is useful for selectively replicating a
1709 : * delete to another server via dblink.
1710 : *
1711 : * API:
1712 : * <relname> - name of remote table of interest
1713 : * <pkattnums> - an int2vector of attnums which will be used
1714 : * to identify the remote tuple of interest
1715 : * <pknumatts> - number of attnums in pkattnums
1716 : * <tgt_pkattvals_arry> - text array of key values which will be used
1717 : * to build the string for execution remotely.
1718 : */
1719 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1720 : Datum
1721 12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
1722 : {
1723 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1724 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1725 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1726 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1727 : Relation rel;
1728 : int *pkattnums;
1729 : int pknumatts;
1730 : char **tgt_pkattvals;
1731 : int tgt_nitems;
1732 : char *sql;
1733 :
1734 : /*
1735 : * Open target relation.
1736 : */
1737 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1738 :
1739 : /*
1740 : * Process pkattnums argument.
1741 : */
1742 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1743 : &pkattnums, &pknumatts);
1744 :
1745 : /*
1746 : * Target array is made up of key values that will be used to build the
1747 : * SQL string for use on the remote system.
1748 : */
1749 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1750 :
1751 : /*
1752 : * There should be one target array key value for each key attnum
1753 : */
1754 8 : if (tgt_nitems != pknumatts)
1755 0 : ereport(ERROR,
1756 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1757 : errmsg("target key array length must match number of key attributes")));
1758 :
1759 : /*
1760 : * Prep work is finally done. Go get the SQL string.
1761 : */
1762 8 : sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1763 :
1764 : /*
1765 : * Now we can close the relation.
1766 : */
1767 8 : relation_close(rel, AccessShareLock);
1768 :
1769 : /*
1770 : * And send it
1771 : */
1772 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1773 : }
1774 :
1775 :
1776 : /*
1777 : * dblink_build_sql_update
1778 : *
1779 : * Used to generate an SQL update statement
1780 : * based on an existing tuple in a local relation.
1781 : * This is useful for selectively replicating data
1782 : * to another server via dblink.
1783 : *
1784 : * API:
1785 : * <relname> - name of local table of interest
1786 : * <pkattnums> - an int2vector of attnums which will be used
1787 : * to identify the local tuple of interest
1788 : * <pknumatts> - number of attnums in pkattnums
1789 : * <src_pkattvals_arry> - text array of key values which will be used
1790 : * to identify the local tuple of interest
1791 : * <tgt_pkattvals_arry> - text array of key values which will be used
1792 : * to build the string for execution remotely. These are substituted
1793 : * for their counterparts in src_pkattvals_arry
1794 : */
1795 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1796 : Datum
1797 12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
1798 : {
1799 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1800 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1801 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1802 12 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1803 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1804 : Relation rel;
1805 : int *pkattnums;
1806 : int pknumatts;
1807 : char **src_pkattvals;
1808 : char **tgt_pkattvals;
1809 : int src_nitems;
1810 : int tgt_nitems;
1811 : char *sql;
1812 :
1813 : /*
1814 : * Open target relation.
1815 : */
1816 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1817 :
1818 : /*
1819 : * Process pkattnums argument.
1820 : */
1821 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1822 : &pkattnums, &pknumatts);
1823 :
1824 : /*
1825 : * Source array is made up of key values that will be used to locate the
1826 : * tuple of interest from the local system.
1827 : */
1828 8 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1829 :
1830 : /*
1831 : * There should be one source array key value for each key attnum
1832 : */
1833 8 : if (src_nitems != pknumatts)
1834 0 : ereport(ERROR,
1835 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1836 : errmsg("source key array length must match number of key attributes")));
1837 :
1838 : /*
1839 : * Target array is made up of key values that will be used to build the
1840 : * SQL string for use on the remote system.
1841 : */
1842 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1843 :
1844 : /*
1845 : * There should be one target array key value for each key attnum
1846 : */
1847 8 : if (tgt_nitems != pknumatts)
1848 0 : ereport(ERROR,
1849 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1850 : errmsg("target key array length must match number of key attributes")));
1851 :
1852 : /*
1853 : * Prep work is finally done. Go get the SQL string.
1854 : */
1855 8 : sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1856 :
1857 : /*
1858 : * Now we can close the relation.
1859 : */
1860 8 : relation_close(rel, AccessShareLock);
1861 :
1862 : /*
1863 : * And send it
1864 : */
1865 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1866 : }
1867 :
1868 : /*
1869 : * dblink_current_query
1870 : * return the current query string
1871 : * to allow its use in (among other things)
1872 : * rewrite rules
1873 : */
1874 4 : PG_FUNCTION_INFO_V1(dblink_current_query);
1875 : Datum
1876 0 : dblink_current_query(PG_FUNCTION_ARGS)
1877 : {
1878 : /* This is now just an alias for the built-in function current_query() */
1879 0 : PG_RETURN_DATUM(current_query(fcinfo));
1880 : }
1881 :
1882 : /*
1883 : * Retrieve async notifications for a connection.
1884 : *
1885 : * Returns a setof record of notifications, or an empty set if none received.
1886 : * Can optionally take a named connection as parameter, but uses the unnamed
1887 : * connection per default.
1888 : *
1889 : */
1890 : #define DBLINK_NOTIFY_COLS 3
1891 :
1892 10 : PG_FUNCTION_INFO_V1(dblink_get_notify);
1893 : Datum
1894 4 : dblink_get_notify(PG_FUNCTION_ARGS)
1895 : {
1896 : PGconn *conn;
1897 : PGnotify *notify;
1898 4 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1899 :
1900 4 : dblink_init();
1901 4 : if (PG_NARGS() == 1)
1902 0 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1903 : else
1904 4 : conn = pconn->conn;
1905 :
1906 4 : InitMaterializedSRF(fcinfo, 0);
1907 :
1908 4 : PQconsumeInput(conn);
1909 8 : while ((notify = PQnotifies(conn)) != NULL)
1910 : {
1911 : Datum values[DBLINK_NOTIFY_COLS];
1912 : bool nulls[DBLINK_NOTIFY_COLS];
1913 :
1914 4 : memset(values, 0, sizeof(values));
1915 4 : memset(nulls, 0, sizeof(nulls));
1916 :
1917 4 : if (notify->relname != NULL)
1918 4 : values[0] = CStringGetTextDatum(notify->relname);
1919 : else
1920 0 : nulls[0] = true;
1921 :
1922 4 : values[1] = Int32GetDatum(notify->be_pid);
1923 :
1924 4 : if (notify->extra != NULL)
1925 4 : values[2] = CStringGetTextDatum(notify->extra);
1926 : else
1927 0 : nulls[2] = true;
1928 :
1929 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1930 :
1931 4 : PQfreemem(notify);
1932 4 : PQconsumeInput(conn);
1933 : }
1934 :
1935 4 : return (Datum) 0;
1936 : }
1937 :
1938 : /*
1939 : * Validate the options given to a dblink foreign server or user mapping.
1940 : * Raise an error if any option is invalid.
1941 : *
1942 : * We just check the names of options here, so semantic errors in options,
1943 : * such as invalid numeric format, will be detected at the attempt to connect.
1944 : */
1945 28 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1946 : Datum
1947 30 : dblink_fdw_validator(PG_FUNCTION_ARGS)
1948 : {
1949 30 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1950 30 : Oid context = PG_GETARG_OID(1);
1951 : ListCell *cell;
1952 :
1953 : static const PQconninfoOption *options = NULL;
1954 :
1955 : /*
1956 : * Get list of valid libpq options.
1957 : *
1958 : * To avoid unnecessary work, we get the list once and use it throughout
1959 : * the lifetime of this backend process. We don't need to care about
1960 : * memory context issues, because PQconndefaults allocates with malloc.
1961 : */
1962 30 : if (!options)
1963 : {
1964 24 : options = PQconndefaults();
1965 24 : if (!options) /* assume reason for failure is OOM */
1966 0 : ereport(ERROR,
1967 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1968 : errmsg("out of memory"),
1969 : errdetail("Could not get libpq's default connection options.")));
1970 : }
1971 :
1972 : /* Validate each supplied option. */
1973 80 : foreach(cell, options_list)
1974 : {
1975 58 : DefElem *def = (DefElem *) lfirst(cell);
1976 :
1977 58 : if (!is_valid_dblink_fdw_option(options, def->defname, context))
1978 : {
1979 : /*
1980 : * Unknown option, or invalid option for the context specified, so
1981 : * complain about it. Provide a hint with a valid option that
1982 : * looks similar, if there is one.
1983 : */
1984 : const PQconninfoOption *opt;
1985 : const char *closest_match;
1986 : ClosestMatchState match_state;
1987 8 : bool has_valid_options = false;
1988 :
1989 8 : initClosestMatch(&match_state, def->defname, 4);
1990 408 : for (opt = options; opt->keyword; opt++)
1991 : {
1992 400 : if (is_valid_dblink_option(options, opt->keyword, context))
1993 : {
1994 18 : has_valid_options = true;
1995 18 : updateClosestMatch(&match_state, opt->keyword);
1996 : }
1997 : }
1998 :
1999 8 : closest_match = getClosestMatch(&match_state);
2000 8 : ereport(ERROR,
2001 : (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2002 : errmsg("invalid option \"%s\"", def->defname),
2003 : has_valid_options ? closest_match ?
2004 : errhint("Perhaps you meant the option \"%s\".",
2005 : closest_match) : 0 :
2006 : errhint("There are no valid options in this context.")));
2007 : }
2008 : }
2009 :
2010 22 : PG_RETURN_VOID();
2011 : }
2012 :
2013 :
2014 : /*************************************************************
2015 : * internal functions
2016 : */
2017 :
2018 :
2019 : /*
2020 : * get_pkey_attnames
2021 : *
2022 : * Get the primary key attnames for the given relation.
2023 : * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2024 : */
2025 : static char **
2026 6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2027 : {
2028 : Relation indexRelation;
2029 : ScanKeyData skey;
2030 : SysScanDesc scan;
2031 : HeapTuple indexTuple;
2032 : int i;
2033 6 : char **result = NULL;
2034 : TupleDesc tupdesc;
2035 :
2036 : /* initialize indnkeyatts to 0 in case no primary key exists */
2037 6 : *indnkeyatts = 0;
2038 :
2039 6 : tupdesc = rel->rd_att;
2040 :
2041 : /* Prepare to scan pg_index for entries having indrelid = this rel. */
2042 6 : indexRelation = table_open(IndexRelationId, AccessShareLock);
2043 6 : ScanKeyInit(&skey,
2044 : Anum_pg_index_indrelid,
2045 : BTEqualStrategyNumber, F_OIDEQ,
2046 : ObjectIdGetDatum(RelationGetRelid(rel)));
2047 :
2048 6 : scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2049 : NULL, 1, &skey);
2050 :
2051 6 : while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2052 : {
2053 6 : Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2054 :
2055 : /* we're only interested if it is the primary key */
2056 6 : if (index->indisprimary)
2057 : {
2058 6 : *indnkeyatts = index->indnkeyatts;
2059 6 : if (*indnkeyatts > 0)
2060 : {
2061 6 : result = palloc_array(char *, *indnkeyatts);
2062 :
2063 18 : for (i = 0; i < *indnkeyatts; i++)
2064 12 : result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2065 : }
2066 6 : break;
2067 : }
2068 : }
2069 :
2070 6 : systable_endscan(scan);
2071 6 : table_close(indexRelation, AccessShareLock);
2072 :
2073 6 : return result;
2074 : }
2075 :
2076 : /*
2077 : * Deconstruct a text[] into C-strings (note any NULL elements will be
2078 : * returned as NULL pointers)
2079 : */
2080 : static char **
2081 40 : get_text_array_contents(ArrayType *array, int *numitems)
2082 : {
2083 40 : int ndim = ARR_NDIM(array);
2084 40 : int *dims = ARR_DIMS(array);
2085 : int nitems;
2086 : int16 typlen;
2087 : bool typbyval;
2088 : char typalign;
2089 : char **values;
2090 : char *ptr;
2091 : bits8 *bitmap;
2092 : int bitmask;
2093 : int i;
2094 :
2095 : Assert(ARR_ELEMTYPE(array) == TEXTOID);
2096 :
2097 40 : *numitems = nitems = ArrayGetNItems(ndim, dims);
2098 :
2099 40 : get_typlenbyvalalign(ARR_ELEMTYPE(array),
2100 : &typlen, &typbyval, &typalign);
2101 :
2102 40 : values = palloc_array(char *, nitems);
2103 :
2104 40 : ptr = ARR_DATA_PTR(array);
2105 40 : bitmap = ARR_NULLBITMAP(array);
2106 40 : bitmask = 1;
2107 :
2108 110 : for (i = 0; i < nitems; i++)
2109 : {
2110 70 : if (bitmap && (*bitmap & bitmask) == 0)
2111 : {
2112 0 : values[i] = NULL;
2113 : }
2114 : else
2115 : {
2116 70 : values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2117 70 : ptr = att_addlength_pointer(ptr, typlen, ptr);
2118 70 : ptr = (char *) att_align_nominal(ptr, typalign);
2119 : }
2120 :
2121 : /* advance bitmap pointer if any */
2122 70 : if (bitmap)
2123 : {
2124 0 : bitmask <<= 1;
2125 0 : if (bitmask == 0x100)
2126 : {
2127 0 : bitmap++;
2128 0 : bitmask = 1;
2129 : }
2130 : }
2131 : }
2132 :
2133 40 : return values;
2134 : }
2135 :
2136 : static char *
2137 8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2138 : {
2139 : char *relname;
2140 : HeapTuple tuple;
2141 : TupleDesc tupdesc;
2142 : int natts;
2143 : StringInfoData buf;
2144 : char *val;
2145 : int key;
2146 : int i;
2147 : bool needComma;
2148 :
2149 8 : initStringInfo(&buf);
2150 :
2151 : /* get relation name including any needed schema prefix and quoting */
2152 8 : relname = generate_relation_name(rel);
2153 :
2154 8 : tupdesc = rel->rd_att;
2155 8 : natts = tupdesc->natts;
2156 :
2157 8 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2158 8 : if (!tuple)
2159 0 : ereport(ERROR,
2160 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2161 : errmsg("source row not found")));
2162 :
2163 8 : appendStringInfo(&buf, "INSERT INTO %s(", relname);
2164 :
2165 8 : needComma = false;
2166 38 : for (i = 0; i < natts; i++)
2167 : {
2168 30 : Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2169 :
2170 30 : if (att->attisdropped)
2171 4 : continue;
2172 :
2173 26 : if (needComma)
2174 18 : appendStringInfoChar(&buf, ',');
2175 :
2176 26 : appendStringInfoString(&buf,
2177 26 : quote_ident_cstr(NameStr(att->attname)));
2178 26 : needComma = true;
2179 : }
2180 :
2181 8 : appendStringInfoString(&buf, ") VALUES(");
2182 :
2183 : /*
2184 : * Note: i is physical column number (counting from 0).
2185 : */
2186 8 : needComma = false;
2187 38 : for (i = 0; i < natts; i++)
2188 : {
2189 30 : if (TupleDescAttr(tupdesc, i)->attisdropped)
2190 4 : continue;
2191 :
2192 26 : if (needComma)
2193 18 : appendStringInfoChar(&buf, ',');
2194 :
2195 26 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2196 :
2197 26 : if (key >= 0)
2198 14 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2199 : else
2200 12 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2201 :
2202 26 : if (val != NULL)
2203 : {
2204 26 : appendStringInfoString(&buf, quote_literal_cstr(val));
2205 26 : pfree(val);
2206 : }
2207 : else
2208 0 : appendStringInfoString(&buf, "NULL");
2209 26 : needComma = true;
2210 : }
2211 8 : appendStringInfoChar(&buf, ')');
2212 :
2213 8 : return buf.data;
2214 : }
2215 :
2216 : static char *
2217 8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2218 : {
2219 : char *relname;
2220 : TupleDesc tupdesc;
2221 : StringInfoData buf;
2222 : int i;
2223 :
2224 8 : initStringInfo(&buf);
2225 :
2226 : /* get relation name including any needed schema prefix and quoting */
2227 8 : relname = generate_relation_name(rel);
2228 :
2229 8 : tupdesc = rel->rd_att;
2230 :
2231 8 : appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2232 22 : for (i = 0; i < pknumatts; i++)
2233 : {
2234 14 : int pkattnum = pkattnums[i];
2235 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2236 :
2237 14 : if (i > 0)
2238 6 : appendStringInfoString(&buf, " AND ");
2239 :
2240 14 : appendStringInfoString(&buf,
2241 14 : quote_ident_cstr(NameStr(attr->attname)));
2242 :
2243 14 : if (tgt_pkattvals[i] != NULL)
2244 14 : appendStringInfo(&buf, " = %s",
2245 14 : quote_literal_cstr(tgt_pkattvals[i]));
2246 : else
2247 0 : appendStringInfoString(&buf, " IS NULL");
2248 : }
2249 :
2250 8 : return buf.data;
2251 : }
2252 :
2253 : static char *
2254 8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2255 : {
2256 : char *relname;
2257 : HeapTuple tuple;
2258 : TupleDesc tupdesc;
2259 : int natts;
2260 : StringInfoData buf;
2261 : char *val;
2262 : int key;
2263 : int i;
2264 : bool needComma;
2265 :
2266 8 : initStringInfo(&buf);
2267 :
2268 : /* get relation name including any needed schema prefix and quoting */
2269 8 : relname = generate_relation_name(rel);
2270 :
2271 8 : tupdesc = rel->rd_att;
2272 8 : natts = tupdesc->natts;
2273 :
2274 8 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2275 8 : if (!tuple)
2276 0 : ereport(ERROR,
2277 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2278 : errmsg("source row not found")));
2279 :
2280 8 : appendStringInfo(&buf, "UPDATE %s SET ", relname);
2281 :
2282 : /*
2283 : * Note: i is physical column number (counting from 0).
2284 : */
2285 8 : needComma = false;
2286 38 : for (i = 0; i < natts; i++)
2287 : {
2288 30 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2289 :
2290 30 : if (attr->attisdropped)
2291 4 : continue;
2292 :
2293 26 : if (needComma)
2294 18 : appendStringInfoString(&buf, ", ");
2295 :
2296 26 : appendStringInfo(&buf, "%s = ",
2297 26 : quote_ident_cstr(NameStr(attr->attname)));
2298 :
2299 26 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2300 :
2301 26 : if (key >= 0)
2302 14 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2303 : else
2304 12 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2305 :
2306 26 : if (val != NULL)
2307 : {
2308 26 : appendStringInfoString(&buf, quote_literal_cstr(val));
2309 26 : pfree(val);
2310 : }
2311 : else
2312 0 : appendStringInfoString(&buf, "NULL");
2313 26 : needComma = true;
2314 : }
2315 :
2316 8 : appendStringInfoString(&buf, " WHERE ");
2317 :
2318 22 : for (i = 0; i < pknumatts; i++)
2319 : {
2320 14 : int pkattnum = pkattnums[i];
2321 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2322 :
2323 14 : if (i > 0)
2324 6 : appendStringInfoString(&buf, " AND ");
2325 :
2326 14 : appendStringInfoString(&buf,
2327 14 : quote_ident_cstr(NameStr(attr->attname)));
2328 :
2329 14 : val = tgt_pkattvals[i];
2330 :
2331 14 : if (val != NULL)
2332 14 : appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2333 : else
2334 0 : appendStringInfoString(&buf, " IS NULL");
2335 : }
2336 :
2337 8 : return buf.data;
2338 : }
2339 :
2340 : /*
2341 : * Return a properly quoted identifier.
2342 : * Uses quote_ident in quote.c
2343 : */
2344 : static char *
2345 160 : quote_ident_cstr(char *rawstr)
2346 : {
2347 : text *rawstr_text;
2348 : text *result_text;
2349 : char *result;
2350 :
2351 160 : rawstr_text = cstring_to_text(rawstr);
2352 160 : result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2353 : PointerGetDatum(rawstr_text)));
2354 160 : result = text_to_cstring(result_text);
2355 :
2356 160 : return result;
2357 : }
2358 :
2359 : static int
2360 52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2361 : {
2362 : int i;
2363 :
2364 : /*
2365 : * Not likely a long list anyway, so just scan for the value
2366 : */
2367 100 : for (i = 0; i < pknumatts; i++)
2368 76 : if (key == pkattnums[i])
2369 28 : return i;
2370 :
2371 24 : return -1;
2372 : }
2373 :
2374 : static HeapTuple
2375 16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2376 : {
2377 : char *relname;
2378 : TupleDesc tupdesc;
2379 : int natts;
2380 : StringInfoData buf;
2381 : int ret;
2382 : HeapTuple tuple;
2383 : int i;
2384 :
2385 : /*
2386 : * Connect to SPI manager
2387 : */
2388 16 : SPI_connect();
2389 :
2390 16 : initStringInfo(&buf);
2391 :
2392 : /* get relation name including any needed schema prefix and quoting */
2393 16 : relname = generate_relation_name(rel);
2394 :
2395 16 : tupdesc = rel->rd_att;
2396 16 : natts = tupdesc->natts;
2397 :
2398 : /*
2399 : * Build sql statement to look up tuple of interest, ie, the one matching
2400 : * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2401 : * generate a result tuple that matches the table's physical structure,
2402 : * with NULLs for any dropped columns. Otherwise we have to deal with two
2403 : * different tupdescs and everything's very confusing.
2404 : */
2405 16 : appendStringInfoString(&buf, "SELECT ");
2406 :
2407 76 : for (i = 0; i < natts; i++)
2408 : {
2409 60 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2410 :
2411 60 : if (i > 0)
2412 44 : appendStringInfoString(&buf, ", ");
2413 :
2414 60 : if (attr->attisdropped)
2415 8 : appendStringInfoString(&buf, "NULL");
2416 : else
2417 52 : appendStringInfoString(&buf,
2418 52 : quote_ident_cstr(NameStr(attr->attname)));
2419 : }
2420 :
2421 16 : appendStringInfo(&buf, " FROM %s WHERE ", relname);
2422 :
2423 44 : for (i = 0; i < pknumatts; i++)
2424 : {
2425 28 : int pkattnum = pkattnums[i];
2426 28 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2427 :
2428 28 : if (i > 0)
2429 12 : appendStringInfoString(&buf, " AND ");
2430 :
2431 28 : appendStringInfoString(&buf,
2432 28 : quote_ident_cstr(NameStr(attr->attname)));
2433 :
2434 28 : if (src_pkattvals[i] != NULL)
2435 28 : appendStringInfo(&buf, " = %s",
2436 28 : quote_literal_cstr(src_pkattvals[i]));
2437 : else
2438 0 : appendStringInfoString(&buf, " IS NULL");
2439 : }
2440 :
2441 : /*
2442 : * Retrieve the desired tuple
2443 : */
2444 16 : ret = SPI_exec(buf.data, 0);
2445 16 : pfree(buf.data);
2446 :
2447 : /*
2448 : * Only allow one qualifying tuple
2449 : */
2450 16 : if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2451 0 : ereport(ERROR,
2452 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2453 : errmsg("source criteria matched more than one record")));
2454 :
2455 16 : else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2456 : {
2457 16 : SPITupleTable *tuptable = SPI_tuptable;
2458 :
2459 16 : tuple = SPI_copytuple(tuptable->vals[0]);
2460 16 : SPI_finish();
2461 :
2462 16 : return tuple;
2463 : }
2464 : else
2465 : {
2466 : /*
2467 : * no qualifying tuples
2468 : */
2469 0 : SPI_finish();
2470 :
2471 0 : return NULL;
2472 : }
2473 :
2474 : /*
2475 : * never reached, but keep compiler quiet
2476 : */
2477 : return NULL;
2478 : }
2479 :
2480 : /*
2481 : * Open the relation named by relname_text, acquire specified type of lock,
2482 : * verify we have specified permissions.
2483 : * Caller must close rel when done with it.
2484 : */
2485 : static Relation
2486 42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2487 : {
2488 : RangeVar *relvar;
2489 : Relation rel;
2490 : AclResult aclresult;
2491 :
2492 42 : relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2493 42 : rel = table_openrv(relvar, lockmode);
2494 :
2495 42 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2496 : aclmode);
2497 42 : if (aclresult != ACLCHECK_OK)
2498 0 : aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2499 0 : RelationGetRelationName(rel));
2500 :
2501 42 : return rel;
2502 : }
2503 :
2504 : /*
2505 : * generate_relation_name - copied from ruleutils.c
2506 : * Compute the name to display for a relation
2507 : *
2508 : * The result includes all necessary quoting and schema-prefixing.
2509 : */
2510 : static char *
2511 40 : generate_relation_name(Relation rel)
2512 : {
2513 : char *nspname;
2514 : char *result;
2515 :
2516 : /* Qualify the name if not visible in search path */
2517 40 : if (RelationIsVisible(RelationGetRelid(rel)))
2518 30 : nspname = NULL;
2519 : else
2520 10 : nspname = get_namespace_name(rel->rd_rel->relnamespace);
2521 :
2522 40 : result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2523 :
2524 40 : return result;
2525 : }
2526 :
2527 :
2528 : static remoteConn *
2529 158 : getConnectionByName(const char *name)
2530 : {
2531 : remoteConnHashEnt *hentry;
2532 : char *key;
2533 :
2534 158 : if (!remoteConnHash)
2535 12 : remoteConnHash = createConnHash();
2536 :
2537 158 : key = pstrdup(name);
2538 158 : truncate_identifier(key, strlen(key), false);
2539 158 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2540 : key, HASH_FIND, NULL);
2541 :
2542 158 : if (hentry)
2543 136 : return hentry->rconn;
2544 :
2545 22 : return NULL;
2546 : }
2547 :
2548 : static HTAB *
2549 14 : createConnHash(void)
2550 : {
2551 : HASHCTL ctl;
2552 :
2553 14 : ctl.keysize = NAMEDATALEN;
2554 14 : ctl.entrysize = sizeof(remoteConnHashEnt);
2555 :
2556 14 : return hash_create("Remote Con hash", NUMCONN, &ctl,
2557 : HASH_ELEM | HASH_STRINGS);
2558 : }
2559 :
2560 : static void
2561 20 : createNewConnection(const char *name, remoteConn *rconn)
2562 : {
2563 : remoteConnHashEnt *hentry;
2564 : bool found;
2565 : char *key;
2566 :
2567 20 : if (!remoteConnHash)
2568 2 : remoteConnHash = createConnHash();
2569 :
2570 20 : key = pstrdup(name);
2571 20 : truncate_identifier(key, strlen(key), true);
2572 20 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2573 : HASH_ENTER, &found);
2574 :
2575 20 : if (found)
2576 : {
2577 2 : libpqsrv_disconnect(rconn->conn);
2578 2 : pfree(rconn);
2579 :
2580 2 : ereport(ERROR,
2581 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2582 : errmsg("duplicate connection name")));
2583 : }
2584 :
2585 18 : hentry->rconn = rconn;
2586 18 : }
2587 :
2588 : static void
2589 16 : deleteConnection(const char *name)
2590 : {
2591 : remoteConnHashEnt *hentry;
2592 : bool found;
2593 : char *key;
2594 :
2595 16 : if (!remoteConnHash)
2596 0 : remoteConnHash = createConnHash();
2597 :
2598 16 : key = pstrdup(name);
2599 16 : truncate_identifier(key, strlen(key), false);
2600 16 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2601 : key, HASH_REMOVE, &found);
2602 :
2603 16 : if (!hentry)
2604 0 : ereport(ERROR,
2605 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2606 : errmsg("undefined connection name")));
2607 16 : }
2608 :
2609 : /*
2610 : * Ensure that require_auth and SCRAM keys are correctly set on connstr.
2611 : * SCRAM keys used to pass-through are coming from the initial connection
2612 : * from the client with the server.
2613 : *
2614 : * All required SCRAM options are set by dblink, so we just need to ensure
2615 : * that these options are not overwritten by the user.
2616 : *
2617 : * See appendSCRAMKeysInfo and its usage for more.
2618 : */
2619 : bool
2620 12 : dblink_connstr_has_required_scram_options(const char *connstr)
2621 : {
2622 : PQconninfoOption *options;
2623 12 : bool has_scram_server_key = false;
2624 12 : bool has_scram_client_key = false;
2625 12 : bool has_require_auth = false;
2626 12 : bool has_scram_keys = false;
2627 :
2628 12 : options = PQconninfoParse(connstr, NULL);
2629 12 : if (options)
2630 : {
2631 : /*
2632 : * Continue iterating even if we found the keys that we need to
2633 : * validate to make sure that there is no other declaration of these
2634 : * keys that can overwrite the first.
2635 : */
2636 612 : for (PQconninfoOption *option = options; option->keyword != NULL; option++)
2637 : {
2638 600 : if (strcmp(option->keyword, "require_auth") == 0)
2639 : {
2640 12 : if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
2641 10 : has_require_auth = true;
2642 : else
2643 2 : has_require_auth = false;
2644 : }
2645 :
2646 600 : if (strcmp(option->keyword, "scram_client_key") == 0)
2647 : {
2648 12 : if (option->val != NULL && option->val[0] != '\0')
2649 12 : has_scram_client_key = true;
2650 : else
2651 0 : has_scram_client_key = false;
2652 : }
2653 :
2654 600 : if (strcmp(option->keyword, "scram_server_key") == 0)
2655 : {
2656 12 : if (option->val != NULL && option->val[0] != '\0')
2657 12 : has_scram_server_key = true;
2658 : else
2659 0 : has_scram_server_key = false;
2660 : }
2661 : }
2662 12 : PQconninfoFree(options);
2663 : }
2664 :
2665 12 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
2666 :
2667 12 : return (has_scram_keys && has_require_auth);
2668 : }
2669 :
2670 : /*
2671 : * We need to make sure that the connection made used credentials
2672 : * which were provided by the user, so check what credentials were
2673 : * used to connect and then make sure that they came from the user.
2674 : */
2675 : static void
2676 42 : dblink_security_check(PGconn *conn, remoteConn *rconn, const char *connstr)
2677 : {
2678 : /* Superuser bypasses security check */
2679 42 : if (superuser())
2680 38 : return;
2681 :
2682 : /* If password was used to connect, make sure it was one provided */
2683 4 : if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
2684 0 : return;
2685 :
2686 : /*
2687 : * Password was not used to connect, check if SCRAM pass-through is in
2688 : * use.
2689 : *
2690 : * If dblink_connstr_has_required_scram_options is true we assume that
2691 : * UseScramPassthrough is also true because the required SCRAM keys are
2692 : * only added if UseScramPassthrough is set, and the user is not allowed
2693 : * to add the SCRAM keys on fdw and user mapping options.
2694 : */
2695 4 : if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2696 4 : return;
2697 :
2698 : #ifdef ENABLE_GSS
2699 : /* If GSSAPI creds used to connect, make sure it was one delegated */
2700 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
2701 : return;
2702 : #endif
2703 :
2704 : /* Otherwise, fail out */
2705 0 : libpqsrv_disconnect(conn);
2706 0 : if (rconn)
2707 0 : pfree(rconn);
2708 :
2709 0 : ereport(ERROR,
2710 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2711 : errmsg("password or GSSAPI delegated credentials required"),
2712 : errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2713 : errhint("Ensure provided credentials match target server's authentication method.")));
2714 : }
2715 :
2716 : /*
2717 : * Function to check if the connection string includes an explicit
2718 : * password, needed to ensure that non-superuser password-based auth
2719 : * is using a provided password and not one picked up from the
2720 : * environment.
2721 : */
2722 : static bool
2723 14 : dblink_connstr_has_pw(const char *connstr)
2724 : {
2725 : PQconninfoOption *options;
2726 : PQconninfoOption *option;
2727 14 : bool connstr_gives_password = false;
2728 :
2729 14 : options = PQconninfoParse(connstr, NULL);
2730 14 : if (options)
2731 : {
2732 714 : for (option = options; option->keyword != NULL; option++)
2733 : {
2734 700 : if (strcmp(option->keyword, "password") == 0)
2735 : {
2736 14 : if (option->val != NULL && option->val[0] != '\0')
2737 : {
2738 0 : connstr_gives_password = true;
2739 0 : break;
2740 : }
2741 : }
2742 : }
2743 14 : PQconninfoFree(options);
2744 : }
2745 :
2746 14 : return connstr_gives_password;
2747 : }
2748 :
2749 : /*
2750 : * For non-superusers, insist that the connstr specify a password, except if
2751 : * GSSAPI credentials have been delegated (and we check that they are used for
2752 : * the connection in dblink_security_check later) or if SCRAM pass-through is
2753 : * being used. This prevents a password or GSSAPI credentials from being
2754 : * picked up from .pgpass, a service file, the environment, etc. We don't want
2755 : * the postgres user's passwords or Kerberos credentials to be accessible to
2756 : * non-superusers. In case of SCRAM pass-through insist that the connstr
2757 : * has the required SCRAM pass-through options.
2758 : */
2759 : static void
2760 52 : dblink_connstr_check(const char *connstr)
2761 : {
2762 52 : if (superuser())
2763 42 : return;
2764 :
2765 10 : if (dblink_connstr_has_pw(connstr))
2766 0 : return;
2767 :
2768 10 : if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2769 6 : return;
2770 :
2771 : #ifdef ENABLE_GSS
2772 : if (be_gssapi_get_delegation(MyProcPort))
2773 : return;
2774 : #endif
2775 :
2776 4 : ereport(ERROR,
2777 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2778 : errmsg("password or GSSAPI delegated credentials required"),
2779 : errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2780 : }
2781 :
2782 : /*
2783 : * Report an error received from the remote server
2784 : *
2785 : * res: the received error result (will be freed)
2786 : * fail: true for ERROR ereport, false for NOTICE
2787 : * fmt and following args: sprintf-style format and values for errcontext;
2788 : * the resulting string should be worded like "while <some action>"
2789 : */
2790 : static void
2791 24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2792 : bool fail, const char *fmt,...)
2793 : {
2794 : int level;
2795 24 : char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2796 24 : char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2797 24 : char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2798 24 : char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2799 24 : char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2800 : int sqlstate;
2801 : char *message_primary;
2802 : char *message_detail;
2803 : char *message_hint;
2804 : char *message_context;
2805 : va_list ap;
2806 : char dblink_context_msg[512];
2807 :
2808 24 : if (fail)
2809 6 : level = ERROR;
2810 : else
2811 18 : level = NOTICE;
2812 :
2813 24 : if (pg_diag_sqlstate)
2814 24 : sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2815 : pg_diag_sqlstate[1],
2816 : pg_diag_sqlstate[2],
2817 : pg_diag_sqlstate[3],
2818 : pg_diag_sqlstate[4]);
2819 : else
2820 0 : sqlstate = ERRCODE_CONNECTION_FAILURE;
2821 :
2822 24 : message_primary = xpstrdup(pg_diag_message_primary);
2823 24 : message_detail = xpstrdup(pg_diag_message_detail);
2824 24 : message_hint = xpstrdup(pg_diag_message_hint);
2825 24 : message_context = xpstrdup(pg_diag_context);
2826 :
2827 : /*
2828 : * If we don't get a message from the PGresult, try the PGconn. This is
2829 : * needed because for connection-level failures, PQgetResult may just
2830 : * return NULL, not a PGresult at all.
2831 : */
2832 24 : if (message_primary == NULL)
2833 0 : message_primary = pchomp(PQerrorMessage(conn));
2834 :
2835 : /*
2836 : * Now that we've copied all the data we need out of the PGresult, it's
2837 : * safe to free it. We must do this to avoid PGresult leakage. We're
2838 : * leaking all the strings too, but those are in palloc'd memory that will
2839 : * get cleaned up eventually.
2840 : */
2841 24 : PQclear(res);
2842 :
2843 : /*
2844 : * Format the basic errcontext string. Below, we'll add on something
2845 : * about the connection name. That's a violation of the translatability
2846 : * guidelines about constructing error messages out of parts, but since
2847 : * there's no translation support for dblink, there's no need to worry
2848 : * about that (yet).
2849 : */
2850 24 : va_start(ap, fmt);
2851 24 : vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2852 24 : va_end(ap);
2853 :
2854 24 : ereport(level,
2855 : (errcode(sqlstate),
2856 : (message_primary != NULL && message_primary[0] != '\0') ?
2857 : errmsg_internal("%s", message_primary) :
2858 : errmsg("could not obtain message string for remote error"),
2859 : message_detail ? errdetail_internal("%s", message_detail) : 0,
2860 : message_hint ? errhint("%s", message_hint) : 0,
2861 : message_context ? (errcontext("%s", message_context)) : 0,
2862 : conname ?
2863 : (errcontext("%s on dblink connection named \"%s\"",
2864 : dblink_context_msg, conname)) :
2865 : (errcontext("%s on unnamed dblink connection",
2866 : dblink_context_msg))));
2867 18 : }
2868 :
2869 : /*
2870 : * Obtain connection string for a foreign server
2871 : */
2872 : static char *
2873 52 : get_connect_string(const char *servername)
2874 : {
2875 52 : ForeignServer *foreign_server = NULL;
2876 : UserMapping *user_mapping;
2877 : ListCell *cell;
2878 : StringInfoData buf;
2879 : ForeignDataWrapper *fdw;
2880 : AclResult aclresult;
2881 : char *srvname;
2882 :
2883 : static const PQconninfoOption *options = NULL;
2884 :
2885 52 : initStringInfo(&buf);
2886 :
2887 : /*
2888 : * Get list of valid libpq options.
2889 : *
2890 : * To avoid unnecessary work, we get the list once and use it throughout
2891 : * the lifetime of this backend process. We don't need to care about
2892 : * memory context issues, because PQconndefaults allocates with malloc.
2893 : */
2894 52 : if (!options)
2895 : {
2896 14 : options = PQconndefaults();
2897 14 : if (!options) /* assume reason for failure is OOM */
2898 0 : ereport(ERROR,
2899 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2900 : errmsg("out of memory"),
2901 : errdetail("Could not get libpq's default connection options.")));
2902 : }
2903 :
2904 : /* first gather the server connstr options */
2905 52 : srvname = pstrdup(servername);
2906 52 : truncate_identifier(srvname, strlen(srvname), false);
2907 52 : foreign_server = GetForeignServerByName(srvname, true);
2908 :
2909 52 : if (foreign_server)
2910 : {
2911 12 : Oid serverid = foreign_server->serverid;
2912 12 : Oid fdwid = foreign_server->fdwid;
2913 12 : Oid userid = GetUserId();
2914 :
2915 12 : user_mapping = GetUserMapping(userid, serverid);
2916 12 : fdw = GetForeignDataWrapper(fdwid);
2917 :
2918 : /* Check permissions, user must have usage on the server. */
2919 12 : aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2920 12 : if (aclresult != ACLCHECK_OK)
2921 0 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2922 :
2923 : /*
2924 : * First append hardcoded options needed for SCRAM pass-through, so if
2925 : * the user overwrites these options we can ereport on
2926 : * dblink_connstr_check and dblink_security_check.
2927 : */
2928 12 : if (MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
2929 8 : appendSCRAMKeysInfo(&buf);
2930 :
2931 12 : foreach(cell, fdw->options)
2932 : {
2933 0 : DefElem *def = lfirst(cell);
2934 :
2935 0 : if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2936 0 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2937 0 : escape_param_str(strVal(def->arg)));
2938 : }
2939 :
2940 54 : foreach(cell, foreign_server->options)
2941 : {
2942 42 : DefElem *def = lfirst(cell);
2943 :
2944 42 : if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2945 34 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2946 34 : escape_param_str(strVal(def->arg)));
2947 : }
2948 :
2949 24 : foreach(cell, user_mapping->options)
2950 : {
2951 :
2952 12 : DefElem *def = lfirst(cell);
2953 :
2954 12 : if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2955 12 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2956 12 : escape_param_str(strVal(def->arg)));
2957 : }
2958 :
2959 12 : return buf.data;
2960 : }
2961 : else
2962 40 : return NULL;
2963 : }
2964 :
2965 : /*
2966 : * Escaping libpq connect parameter strings.
2967 : *
2968 : * Replaces "'" with "\'" and "\" with "\\".
2969 : */
2970 : static char *
2971 46 : escape_param_str(const char *str)
2972 : {
2973 : const char *cp;
2974 : StringInfoData buf;
2975 :
2976 46 : initStringInfo(&buf);
2977 :
2978 410 : for (cp = str; *cp; cp++)
2979 : {
2980 364 : if (*cp == '\\' || *cp == '\'')
2981 0 : appendStringInfoChar(&buf, '\\');
2982 364 : appendStringInfoChar(&buf, *cp);
2983 : }
2984 :
2985 46 : return buf.data;
2986 : }
2987 :
2988 : /*
2989 : * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2990 : * functions, and translate to the internal representation.
2991 : *
2992 : * The user supplies an int2vector of 1-based logical attnums, plus a count
2993 : * argument (the need for the separate count argument is historical, but we
2994 : * still check it). We check that each attnum corresponds to a valid,
2995 : * non-dropped attribute of the rel. We do *not* prevent attnums from being
2996 : * listed twice, though the actual use-case for such things is dubious.
2997 : * Note that before Postgres 9.0, the user's attnums were interpreted as
2998 : * physical not logical column numbers; this was changed for future-proofing.
2999 : *
3000 : * The internal representation is a palloc'd int array of 0-based physical
3001 : * attnums.
3002 : */
3003 : static void
3004 36 : validate_pkattnums(Relation rel,
3005 : int2vector *pkattnums_arg, int32 pknumatts_arg,
3006 : int **pkattnums, int *pknumatts)
3007 : {
3008 36 : TupleDesc tupdesc = rel->rd_att;
3009 36 : int natts = tupdesc->natts;
3010 : int i;
3011 :
3012 : /* Don't take more array elements than there are */
3013 36 : pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
3014 :
3015 : /* Must have at least one pk attnum selected */
3016 36 : if (pknumatts_arg <= 0)
3017 0 : ereport(ERROR,
3018 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3019 : errmsg("number of key attributes must be > 0")));
3020 :
3021 : /* Allocate output array */
3022 36 : *pkattnums = palloc_array(int, pknumatts_arg);
3023 36 : *pknumatts = pknumatts_arg;
3024 :
3025 : /* Validate attnums and convert to internal form */
3026 114 : for (i = 0; i < pknumatts_arg; i++)
3027 : {
3028 90 : int pkattnum = pkattnums_arg->values[i];
3029 : int lnum;
3030 : int j;
3031 :
3032 : /* Can throw error immediately if out of range */
3033 90 : if (pkattnum <= 0 || pkattnum > natts)
3034 12 : ereport(ERROR,
3035 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3036 : errmsg("invalid attribute number %d", pkattnum)));
3037 :
3038 : /* Identify which physical column has this logical number */
3039 78 : lnum = 0;
3040 138 : for (j = 0; j < natts; j++)
3041 : {
3042 : /* dropped columns don't count */
3043 138 : if (TupleDescAttr(tupdesc, j)->attisdropped)
3044 6 : continue;
3045 :
3046 132 : if (++lnum == pkattnum)
3047 78 : break;
3048 : }
3049 :
3050 78 : if (j < natts)
3051 78 : (*pkattnums)[i] = j;
3052 : else
3053 0 : ereport(ERROR,
3054 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3055 : errmsg("invalid attribute number %d", pkattnum)));
3056 : }
3057 24 : }
3058 :
3059 : /*
3060 : * Check if the specified connection option is valid.
3061 : *
3062 : * We basically allow whatever libpq thinks is an option, with these
3063 : * restrictions:
3064 : * debug options: disallowed
3065 : * "client_encoding": disallowed
3066 : * "user": valid only in USER MAPPING options
3067 : * secure options (eg password): valid only in USER MAPPING options
3068 : * others: valid only in FOREIGN SERVER options
3069 : *
3070 : * We disallow client_encoding because it would be overridden anyway via
3071 : * PQclientEncoding; allowing it to be specified would merely promote
3072 : * confusion.
3073 : */
3074 : static bool
3075 504 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3076 : Oid context)
3077 : {
3078 : const PQconninfoOption *opt;
3079 :
3080 : /* Look up the option in libpq result */
3081 11668 : for (opt = options; opt->keyword; opt++)
3082 : {
3083 11656 : if (strcmp(opt->keyword, option) == 0)
3084 492 : break;
3085 : }
3086 504 : if (opt->keyword == NULL)
3087 12 : return false;
3088 :
3089 : /* Disallow debug options (particularly "replication") */
3090 492 : if (strchr(opt->dispchar, 'D'))
3091 36 : return false;
3092 :
3093 : /* Disallow "client_encoding" */
3094 456 : if (strcmp(opt->keyword, "client_encoding") == 0)
3095 8 : return false;
3096 :
3097 : /*
3098 : * If the option is "user" or marked secure, it should be specified only
3099 : * in USER MAPPING. Others should be specified only in SERVER.
3100 : */
3101 448 : if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3102 : {
3103 48 : if (context != UserMappingRelationId)
3104 6 : return false;
3105 : }
3106 : else
3107 : {
3108 400 : if (context != ForeignServerRelationId)
3109 336 : return false;
3110 : }
3111 :
3112 106 : return true;
3113 : }
3114 :
3115 : /*
3116 : * Same as is_valid_dblink_option but also check for only dblink_fdw specific
3117 : * options.
3118 : */
3119 : static bool
3120 58 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
3121 : Oid context)
3122 : {
3123 58 : if (strcmp(option, "use_scram_passthrough") == 0)
3124 8 : return true;
3125 :
3126 50 : return is_valid_dblink_option(options, option, context);
3127 : }
3128 :
3129 : /*
3130 : * Copy the remote session's values of GUCs that affect datatype I/O
3131 : * and apply them locally in a new GUC nesting level. Returns the new
3132 : * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3133 : * or -1 if no new nestlevel was needed.
3134 : *
3135 : * We use the equivalent of a function SET option to allow the settings to
3136 : * persist only until the caller calls restoreLocalGucs. If an error is
3137 : * thrown in between, guc.c will take care of undoing the settings.
3138 : */
3139 : static int
3140 68 : applyRemoteGucs(PGconn *conn)
3141 : {
3142 : static const char *const GUCsAffectingIO[] = {
3143 : "DateStyle",
3144 : "IntervalStyle"
3145 : };
3146 :
3147 68 : int nestlevel = -1;
3148 : int i;
3149 :
3150 204 : for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3151 : {
3152 136 : const char *gucName = GUCsAffectingIO[i];
3153 136 : const char *remoteVal = PQparameterStatus(conn, gucName);
3154 : const char *localVal;
3155 :
3156 : /*
3157 : * If the remote server is pre-8.4, it won't have IntervalStyle, but
3158 : * that's okay because its output format won't be ambiguous. So just
3159 : * skip the GUC if we don't get a value for it. (We might eventually
3160 : * need more complicated logic with remote-version checks here.)
3161 : */
3162 136 : if (remoteVal == NULL)
3163 0 : continue;
3164 :
3165 : /*
3166 : * Avoid GUC-setting overhead if the remote and local GUCs already
3167 : * have the same value.
3168 : */
3169 136 : localVal = GetConfigOption(gucName, false, false);
3170 : Assert(localVal != NULL);
3171 :
3172 136 : if (strcmp(remoteVal, localVal) == 0)
3173 102 : continue;
3174 :
3175 : /* Create new GUC nest level if we didn't already */
3176 34 : if (nestlevel < 0)
3177 18 : nestlevel = NewGUCNestLevel();
3178 :
3179 : /* Apply the option (this will throw error on failure) */
3180 34 : (void) set_config_option(gucName, remoteVal,
3181 : PGC_USERSET, PGC_S_SESSION,
3182 : GUC_ACTION_SAVE, true, 0, false);
3183 : }
3184 :
3185 68 : return nestlevel;
3186 : }
3187 :
3188 : /*
3189 : * Restore local GUCs after they have been overlaid with remote settings.
3190 : */
3191 : static void
3192 70 : restoreLocalGucs(int nestlevel)
3193 : {
3194 : /* Do nothing if no new nestlevel was created */
3195 70 : if (nestlevel > 0)
3196 16 : AtEOXact_GUC(true, nestlevel);
3197 70 : }
3198 :
3199 : /*
3200 : * Append SCRAM client key and server key information from the global
3201 : * MyProcPort into the given StringInfo buffer.
3202 : */
3203 : static void
3204 8 : appendSCRAMKeysInfo(StringInfo buf)
3205 : {
3206 : int len;
3207 : int encoded_len;
3208 : char *client_key;
3209 : char *server_key;
3210 :
3211 8 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
3212 : /* don't forget the zero-terminator */
3213 8 : client_key = palloc0(len + 1);
3214 8 : encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ClientKey,
3215 : sizeof(MyProcPort->scram_ClientKey),
3216 : client_key, len);
3217 8 : if (encoded_len < 0)
3218 0 : elog(ERROR, "could not encode SCRAM client key");
3219 :
3220 8 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
3221 : /* don't forget the zero-terminator */
3222 8 : server_key = palloc0(len + 1);
3223 8 : encoded_len = pg_b64_encode((const char *) MyProcPort->scram_ServerKey,
3224 : sizeof(MyProcPort->scram_ServerKey),
3225 : server_key, len);
3226 8 : if (encoded_len < 0)
3227 0 : elog(ERROR, "could not encode SCRAM server key");
3228 :
3229 8 : appendStringInfo(buf, "scram_client_key='%s' ", client_key);
3230 8 : appendStringInfo(buf, "scram_server_key='%s' ", server_key);
3231 8 : appendStringInfoString(buf, "require_auth='scram-sha-256' ");
3232 :
3233 8 : pfree(client_key);
3234 8 : pfree(server_key);
3235 8 : }
3236 :
3237 :
3238 : static bool
3239 8 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
3240 : {
3241 : ListCell *cell;
3242 :
3243 32 : foreach(cell, foreign_server->options)
3244 : {
3245 32 : DefElem *def = lfirst(cell);
3246 :
3247 32 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3248 8 : return defGetBoolean(def);
3249 : }
3250 :
3251 0 : foreach(cell, user->options)
3252 : {
3253 0 : DefElem *def = (DefElem *) lfirst(cell);
3254 :
3255 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3256 0 : return defGetBoolean(def);
3257 : }
3258 :
3259 0 : return false;
3260 : }
|