Line data Source code
1 : /*
2 : * dblink.c
3 : *
4 : * Functions returning results from a remote database
5 : *
6 : * Joe Conway <mail@joeconway.com>
7 : * And contributors:
8 : * Darko Prenosil <Darko.Prenosil@finteh.hr>
9 : * Shridhar Daithankar <shridhar_daithankar@persistent.co.in>
10 : *
11 : * contrib/dblink/dblink.c
12 : * Copyright (c) 2001-2025, PostgreSQL Global Development Group
13 : * ALL RIGHTS RESERVED;
14 : *
15 : * Permission to use, copy, modify, and distribute this software and its
16 : * documentation for any purpose, without fee, and without a written agreement
17 : * is hereby granted, provided that the above copyright notice and this
18 : * paragraph and the following two paragraphs appear in all copies.
19 : *
20 : * IN NO EVENT SHALL THE AUTHOR OR DISTRIBUTORS BE LIABLE TO ANY PARTY FOR
21 : * DIRECT, INDIRECT, SPECIAL, INCIDENTAL, OR CONSEQUENTIAL DAMAGES, INCLUDING
22 : * LOST PROFITS, ARISING OUT OF THE USE OF THIS SOFTWARE AND ITS
23 : * DOCUMENTATION, EVEN IF THE AUTHOR OR DISTRIBUTORS HAVE BEEN ADVISED OF THE
24 : * POSSIBILITY OF SUCH DAMAGE.
25 : *
26 : * THE AUTHOR AND DISTRIBUTORS SPECIFICALLY DISCLAIMS ANY WARRANTIES,
27 : * INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY
28 : * AND FITNESS FOR A PARTICULAR PURPOSE. THE SOFTWARE PROVIDED HEREUNDER IS
29 : * ON AN "AS IS" BASIS, AND THE AUTHOR AND DISTRIBUTORS HAS NO OBLIGATIONS TO
30 : * PROVIDE MAINTENANCE, SUPPORT, UPDATES, ENHANCEMENTS, OR MODIFICATIONS.
31 : *
32 : */
33 : #include "postgres.h"
34 :
35 : #include <limits.h>
36 :
37 : #include "access/htup_details.h"
38 : #include "access/relation.h"
39 : #include "access/reloptions.h"
40 : #include "access/table.h"
41 : #include "catalog/namespace.h"
42 : #include "catalog/pg_foreign_data_wrapper.h"
43 : #include "catalog/pg_foreign_server.h"
44 : #include "catalog/pg_type.h"
45 : #include "catalog/pg_user_mapping.h"
46 : #include "commands/defrem.h"
47 : #include "common/base64.h"
48 : #include "executor/spi.h"
49 : #include "foreign/foreign.h"
50 : #include "funcapi.h"
51 : #include "lib/stringinfo.h"
52 : #include "libpq-fe.h"
53 : #include "libpq/libpq-be.h"
54 : #include "libpq/libpq-be-fe-helpers.h"
55 : #include "mb/pg_wchar.h"
56 : #include "miscadmin.h"
57 : #include "parser/scansup.h"
58 : #include "utils/acl.h"
59 : #include "utils/builtins.h"
60 : #include "utils/fmgroids.h"
61 : #include "utils/guc.h"
62 : #include "utils/lsyscache.h"
63 : #include "utils/memutils.h"
64 : #include "utils/rel.h"
65 : #include "utils/varlena.h"
66 : #include "utils/wait_event.h"
67 :
68 32 : PG_MODULE_MAGIC_EXT(
69 : .name = "dblink",
70 : .version = PG_VERSION
71 : );
72 :
73 : typedef struct remoteConn
74 : {
75 : PGconn *conn; /* Hold the remote connection */
76 : int openCursorCount; /* The number of open cursors */
77 : bool newXactForCursor; /* Opened a transaction for a cursor */
78 : } remoteConn;
79 :
80 : typedef struct storeInfo
81 : {
82 : FunctionCallInfo fcinfo;
83 : Tuplestorestate *tuplestore;
84 : AttInMetadata *attinmeta;
85 : MemoryContext tmpcontext;
86 : char **cstrs;
87 : /* temp storage for results to avoid leaks on exception */
88 : PGresult *last_res;
89 : PGresult *cur_res;
90 : } storeInfo;
91 :
92 : /*
93 : * Internal declarations
94 : */
95 : static Datum dblink_record_internal(FunctionCallInfo fcinfo, bool is_async);
96 : static void prepTuplestoreResult(FunctionCallInfo fcinfo);
97 : static void materializeResult(FunctionCallInfo fcinfo, PGconn *conn,
98 : PGresult *res);
99 : static void materializeQueryResult(FunctionCallInfo fcinfo,
100 : PGconn *conn,
101 : const char *conname,
102 : const char *sql,
103 : bool fail);
104 : static PGresult *storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql);
105 : static void storeRow(volatile storeInfo *sinfo, PGresult *res, bool first);
106 : static remoteConn *getConnectionByName(const char *name);
107 : static HTAB *createConnHash(void);
108 : static remoteConn *createNewConnection(const char *name);
109 : static void deleteConnection(const char *name);
110 : static char **get_pkey_attnames(Relation rel, int16 *indnkeyatts);
111 : static char **get_text_array_contents(ArrayType *array, int *numitems);
112 : static char *get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
113 : static char *get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals);
114 : static char *get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals);
115 : static char *quote_ident_cstr(char *rawstr);
116 : static int get_attnum_pk_pos(int *pkattnums, int pknumatts, int key);
117 : static HeapTuple get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals);
118 : static Relation get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode);
119 : static char *generate_relation_name(Relation rel);
120 : static void dblink_connstr_check(const char *connstr);
121 : static bool dblink_connstr_has_pw(const char *connstr);
122 : static void dblink_security_check(PGconn *conn, const char *connname,
123 : const char *connstr);
124 : static void dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
125 : bool fail, const char *fmt,...) pg_attribute_printf(5, 6);
126 : static char *get_connect_string(const char *servername);
127 : static char *escape_param_str(const char *str);
128 : static void validate_pkattnums(Relation rel,
129 : int2vector *pkattnums_arg, int32 pknumatts_arg,
130 : int **pkattnums, int *pknumatts);
131 : static bool is_valid_dblink_option(const PQconninfoOption *options,
132 : const char *option, Oid context);
133 : static int applyRemoteGucs(PGconn *conn);
134 : static void restoreLocalGucs(int nestlevel);
135 : static bool UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user);
136 : static void appendSCRAMKeysInfo(StringInfo buf);
137 : static bool is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
138 : Oid context);
139 : static bool dblink_connstr_has_required_scram_options(const char *connstr);
140 :
141 : /* Global */
142 : static remoteConn *pconn = NULL;
143 : static HTAB *remoteConnHash = NULL;
144 :
145 : /* custom wait event values, retrieved from shared memory */
146 : static uint32 dblink_we_connect = 0;
147 : static uint32 dblink_we_get_conn = 0;
148 : static uint32 dblink_we_get_result = 0;
149 :
150 : /*
151 : * Following is hash that holds multiple remote connections.
152 : * Calling convention of each dblink function changes to accept
153 : * connection name as the first parameter. The connection hash is
154 : * much like ecpg e.g. a mapping between a name and a PGconn object.
155 : *
156 : * To avoid potentially leaking a PGconn object in case of out-of-memory
157 : * errors, we first create the hash entry, then open the PGconn.
158 : * Hence, a hash entry whose rconn.conn pointer is NULL must be
159 : * understood as a leftover from a failed create; it should be ignored
160 : * by lookup operations, and silently replaced by create operations.
161 : */
162 :
163 : typedef struct remoteConnHashEnt
164 : {
165 : char name[NAMEDATALEN];
166 : remoteConn rconn;
167 : } remoteConnHashEnt;
168 :
169 : /* initial number of connection hashes */
170 : #define NUMCONN 16
171 :
172 : static char *
173 96 : xpstrdup(const char *in)
174 : {
175 96 : if (in == NULL)
176 72 : return NULL;
177 24 : return pstrdup(in);
178 : }
179 :
180 : pg_noreturn static void
181 0 : dblink_res_internalerror(PGconn *conn, PGresult *res, const char *p2)
182 : {
183 0 : char *msg = pchomp(PQerrorMessage(conn));
184 :
185 0 : PQclear(res);
186 0 : elog(ERROR, "%s: %s", p2, msg);
187 : }
188 :
189 : pg_noreturn static void
190 6 : dblink_conn_not_avail(const char *conname)
191 : {
192 6 : if (conname)
193 2 : ereport(ERROR,
194 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
195 : errmsg("connection \"%s\" not available", conname)));
196 : else
197 4 : ereport(ERROR,
198 : (errcode(ERRCODE_CONNECTION_DOES_NOT_EXIST),
199 : errmsg("connection not available")));
200 : }
201 :
202 : static void
203 72 : dblink_get_conn(char *conname_or_str,
204 : PGconn *volatile *conn_p, char **conname_p, volatile bool *freeconn_p)
205 : {
206 72 : remoteConn *rconn = getConnectionByName(conname_or_str);
207 : PGconn *conn;
208 : char *conname;
209 : bool freeconn;
210 :
211 72 : if (rconn)
212 : {
213 54 : conn = rconn->conn;
214 54 : conname = conname_or_str;
215 54 : freeconn = false;
216 : }
217 : else
218 : {
219 : const char *connstr;
220 :
221 18 : connstr = get_connect_string(conname_or_str);
222 18 : if (connstr == NULL)
223 12 : connstr = conname_or_str;
224 18 : dblink_connstr_check(connstr);
225 :
226 : /* first time, allocate or get the custom wait event */
227 16 : if (dblink_we_get_conn == 0)
228 8 : dblink_we_get_conn = WaitEventExtensionNew("DblinkGetConnect");
229 :
230 : /* OK to make connection */
231 16 : conn = libpqsrv_connect(connstr, dblink_we_get_conn);
232 :
233 16 : if (PQstatus(conn) == CONNECTION_BAD)
234 : {
235 4 : char *msg = pchomp(PQerrorMessage(conn));
236 :
237 4 : libpqsrv_disconnect(conn);
238 4 : ereport(ERROR,
239 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
240 : errmsg("could not establish connection"),
241 : errdetail_internal("%s", msg)));
242 : }
243 12 : dblink_security_check(conn, NULL, connstr);
244 12 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
245 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
246 12 : freeconn = true;
247 12 : conname = NULL;
248 : }
249 :
250 66 : *conn_p = conn;
251 66 : *conname_p = conname;
252 66 : *freeconn_p = freeconn;
253 66 : }
254 :
255 : static PGconn *
256 34 : dblink_get_named_conn(const char *conname)
257 : {
258 34 : remoteConn *rconn = getConnectionByName(conname);
259 :
260 34 : if (rconn)
261 34 : return rconn->conn;
262 :
263 0 : dblink_conn_not_avail(conname);
264 : return NULL; /* keep compiler quiet */
265 : }
266 :
267 : static void
268 246 : dblink_init(void)
269 : {
270 246 : if (!pconn)
271 : {
272 12 : if (dblink_we_get_result == 0)
273 12 : dblink_we_get_result = WaitEventExtensionNew("DblinkGetResult");
274 :
275 12 : pconn = (remoteConn *) MemoryContextAlloc(TopMemoryContext, sizeof(remoteConn));
276 12 : pconn->conn = NULL;
277 12 : pconn->openCursorCount = 0;
278 12 : pconn->newXactForCursor = false;
279 : }
280 246 : }
281 :
282 : /*
283 : * Create a persistent connection to another database
284 : */
285 26 : PG_FUNCTION_INFO_V1(dblink_connect);
286 : Datum
287 32 : dblink_connect(PG_FUNCTION_ARGS)
288 : {
289 32 : char *conname_or_str = NULL;
290 32 : char *connstr = NULL;
291 32 : char *connname = NULL;
292 : char *msg;
293 32 : PGconn *conn = NULL;
294 32 : remoteConn *rconn = NULL;
295 :
296 32 : dblink_init();
297 :
298 32 : if (PG_NARGS() == 2)
299 : {
300 22 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(1));
301 22 : connname = text_to_cstring(PG_GETARG_TEXT_PP(0));
302 : }
303 10 : else if (PG_NARGS() == 1)
304 10 : conname_or_str = text_to_cstring(PG_GETARG_TEXT_PP(0));
305 :
306 : /* first check for valid foreign data server */
307 32 : connstr = get_connect_string(conname_or_str);
308 32 : if (connstr == NULL)
309 28 : connstr = conname_or_str;
310 :
311 : /* check password in connection string if not superuser */
312 32 : dblink_connstr_check(connstr);
313 :
314 : /* first time, allocate or get the custom wait event */
315 30 : if (dblink_we_connect == 0)
316 4 : dblink_we_connect = WaitEventExtensionNew("DblinkConnect");
317 :
318 : /* if we need a hashtable entry, make that first, since it might fail */
319 30 : if (connname)
320 : {
321 20 : rconn = createNewConnection(connname);
322 : Assert(rconn->conn == NULL);
323 : }
324 :
325 : /* OK to make connection */
326 28 : conn = libpqsrv_connect(connstr, dblink_we_connect);
327 :
328 28 : if (PQstatus(conn) == CONNECTION_BAD)
329 : {
330 0 : msg = pchomp(PQerrorMessage(conn));
331 0 : libpqsrv_disconnect(conn);
332 0 : if (connname)
333 0 : deleteConnection(connname);
334 :
335 0 : ereport(ERROR,
336 : (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
337 : errmsg("could not establish connection"),
338 : errdetail_internal("%s", msg)));
339 : }
340 :
341 : /* check password actually used if not superuser */
342 28 : dblink_security_check(conn, connname, connstr);
343 :
344 : /* attempt to set client encoding to match server encoding, if needed */
345 28 : if (PQclientEncoding(conn) != GetDatabaseEncoding())
346 0 : PQsetClientEncoding(conn, GetDatabaseEncodingName());
347 :
348 : /* all OK, save away the conn */
349 28 : if (connname)
350 : {
351 18 : rconn->conn = conn;
352 : }
353 : else
354 : {
355 10 : if (pconn->conn)
356 2 : libpqsrv_disconnect(pconn->conn);
357 10 : pconn->conn = conn;
358 : }
359 :
360 28 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
361 : }
362 :
363 : /*
364 : * Clear a persistent connection to another database
365 : */
366 16 : PG_FUNCTION_INFO_V1(dblink_disconnect);
367 : Datum
368 26 : dblink_disconnect(PG_FUNCTION_ARGS)
369 : {
370 26 : char *conname = NULL;
371 26 : remoteConn *rconn = NULL;
372 26 : PGconn *conn = NULL;
373 :
374 26 : dblink_init();
375 :
376 26 : if (PG_NARGS() == 1)
377 : {
378 18 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
379 18 : rconn = getConnectionByName(conname);
380 18 : if (rconn)
381 16 : conn = rconn->conn;
382 : }
383 : else
384 8 : conn = pconn->conn;
385 :
386 26 : if (!conn)
387 2 : dblink_conn_not_avail(conname);
388 :
389 24 : libpqsrv_disconnect(conn);
390 24 : if (rconn)
391 16 : deleteConnection(conname);
392 : else
393 8 : pconn->conn = NULL;
394 :
395 24 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
396 : }
397 :
398 : /*
399 : * opens a cursor using a persistent connection
400 : */
401 26 : PG_FUNCTION_INFO_V1(dblink_open);
402 : Datum
403 18 : dblink_open(PG_FUNCTION_ARGS)
404 : {
405 18 : PGresult *res = NULL;
406 : PGconn *conn;
407 18 : char *curname = NULL;
408 18 : char *sql = NULL;
409 18 : char *conname = NULL;
410 : StringInfoData buf;
411 18 : remoteConn *rconn = NULL;
412 18 : bool fail = true; /* default to backward compatible behavior */
413 :
414 18 : dblink_init();
415 18 : initStringInfo(&buf);
416 :
417 18 : if (PG_NARGS() == 2)
418 : {
419 : /* text,text */
420 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
421 4 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
422 4 : rconn = pconn;
423 : }
424 14 : else if (PG_NARGS() == 3)
425 : {
426 : /* might be text,text,text or text,text,bool */
427 12 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
428 : {
429 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
430 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
431 2 : fail = PG_GETARG_BOOL(2);
432 2 : rconn = pconn;
433 : }
434 : else
435 : {
436 10 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
437 10 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
438 10 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
439 10 : rconn = getConnectionByName(conname);
440 : }
441 : }
442 2 : else if (PG_NARGS() == 4)
443 : {
444 : /* text,text,text,bool */
445 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
446 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
447 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(2));
448 2 : fail = PG_GETARG_BOOL(3);
449 2 : rconn = getConnectionByName(conname);
450 : }
451 :
452 18 : if (!rconn || !rconn->conn)
453 0 : dblink_conn_not_avail(conname);
454 :
455 18 : conn = rconn->conn;
456 :
457 : /* If we are not in a transaction, start one */
458 18 : if (PQtransactionStatus(conn) == PQTRANS_IDLE)
459 : {
460 14 : res = libpqsrv_exec(conn, "BEGIN", dblink_we_get_result);
461 14 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
462 0 : dblink_res_internalerror(conn, res, "begin error");
463 14 : PQclear(res);
464 14 : rconn->newXactForCursor = true;
465 :
466 : /*
467 : * Since transaction state was IDLE, we force cursor count to
468 : * initially be 0. This is needed as a previous ABORT might have wiped
469 : * out our transaction without maintaining the cursor count for us.
470 : */
471 14 : rconn->openCursorCount = 0;
472 : }
473 :
474 : /* if we started a transaction, increment cursor count */
475 18 : if (rconn->newXactForCursor)
476 18 : (rconn->openCursorCount)++;
477 :
478 18 : appendStringInfo(&buf, "DECLARE %s CURSOR FOR %s", curname, sql);
479 18 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
480 18 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
481 : {
482 4 : dblink_res_error(conn, conname, res, fail,
483 : "while opening cursor \"%s\"", curname);
484 4 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
485 : }
486 :
487 14 : PQclear(res);
488 14 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
489 : }
490 :
491 : /*
492 : * closes a cursor
493 : */
494 20 : PG_FUNCTION_INFO_V1(dblink_close);
495 : Datum
496 10 : dblink_close(PG_FUNCTION_ARGS)
497 : {
498 : PGconn *conn;
499 10 : PGresult *res = NULL;
500 10 : char *curname = NULL;
501 10 : char *conname = NULL;
502 : StringInfoData buf;
503 10 : remoteConn *rconn = NULL;
504 10 : bool fail = true; /* default to backward compatible behavior */
505 :
506 10 : dblink_init();
507 10 : initStringInfo(&buf);
508 :
509 10 : if (PG_NARGS() == 1)
510 : {
511 : /* text */
512 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
513 0 : rconn = pconn;
514 : }
515 10 : else if (PG_NARGS() == 2)
516 : {
517 : /* might be text,text or text,bool */
518 10 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
519 : {
520 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
521 4 : fail = PG_GETARG_BOOL(1);
522 4 : rconn = pconn;
523 : }
524 : else
525 : {
526 6 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
527 6 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
528 6 : rconn = getConnectionByName(conname);
529 : }
530 : }
531 10 : if (PG_NARGS() == 3)
532 : {
533 : /* text,text,bool */
534 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
535 0 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
536 0 : fail = PG_GETARG_BOOL(2);
537 0 : rconn = getConnectionByName(conname);
538 : }
539 :
540 10 : if (!rconn || !rconn->conn)
541 0 : dblink_conn_not_avail(conname);
542 :
543 10 : conn = rconn->conn;
544 :
545 10 : appendStringInfo(&buf, "CLOSE %s", curname);
546 :
547 : /* close the cursor */
548 10 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
549 10 : if (!res || PQresultStatus(res) != PGRES_COMMAND_OK)
550 : {
551 2 : dblink_res_error(conn, conname, res, fail,
552 : "while closing cursor \"%s\"", curname);
553 2 : PG_RETURN_TEXT_P(cstring_to_text("ERROR"));
554 : }
555 :
556 8 : PQclear(res);
557 :
558 : /* if we started a transaction, decrement cursor count */
559 8 : if (rconn->newXactForCursor)
560 : {
561 8 : (rconn->openCursorCount)--;
562 :
563 : /* if count is zero, commit the transaction */
564 8 : if (rconn->openCursorCount == 0)
565 : {
566 4 : rconn->newXactForCursor = false;
567 :
568 4 : res = libpqsrv_exec(conn, "COMMIT", dblink_we_get_result);
569 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
570 0 : dblink_res_internalerror(conn, res, "commit error");
571 4 : PQclear(res);
572 : }
573 : }
574 :
575 8 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
576 : }
577 :
578 : /*
579 : * Fetch results from an open cursor
580 : */
581 26 : PG_FUNCTION_INFO_V1(dblink_fetch);
582 : Datum
583 26 : dblink_fetch(PG_FUNCTION_ARGS)
584 : {
585 26 : PGresult *res = NULL;
586 26 : char *conname = NULL;
587 26 : remoteConn *rconn = NULL;
588 26 : PGconn *conn = NULL;
589 : StringInfoData buf;
590 26 : char *curname = NULL;
591 26 : int howmany = 0;
592 26 : bool fail = true; /* default to backward compatible */
593 :
594 26 : prepTuplestoreResult(fcinfo);
595 :
596 26 : dblink_init();
597 :
598 26 : if (PG_NARGS() == 4)
599 : {
600 : /* text,text,int,bool */
601 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
602 2 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
603 2 : howmany = PG_GETARG_INT32(2);
604 2 : fail = PG_GETARG_BOOL(3);
605 :
606 2 : rconn = getConnectionByName(conname);
607 2 : if (rconn)
608 2 : conn = rconn->conn;
609 : }
610 24 : else if (PG_NARGS() == 3)
611 : {
612 : /* text,text,int or text,int,bool */
613 16 : if (get_fn_expr_argtype(fcinfo->flinfo, 2) == BOOLOID)
614 : {
615 4 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
616 4 : howmany = PG_GETARG_INT32(1);
617 4 : fail = PG_GETARG_BOOL(2);
618 4 : conn = pconn->conn;
619 : }
620 : else
621 : {
622 12 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
623 12 : curname = text_to_cstring(PG_GETARG_TEXT_PP(1));
624 12 : howmany = PG_GETARG_INT32(2);
625 :
626 12 : rconn = getConnectionByName(conname);
627 12 : if (rconn)
628 12 : conn = rconn->conn;
629 : }
630 : }
631 8 : else if (PG_NARGS() == 2)
632 : {
633 : /* text,int */
634 8 : curname = text_to_cstring(PG_GETARG_TEXT_PP(0));
635 8 : howmany = PG_GETARG_INT32(1);
636 8 : conn = pconn->conn;
637 : }
638 :
639 26 : if (!conn)
640 0 : dblink_conn_not_avail(conname);
641 :
642 26 : initStringInfo(&buf);
643 26 : appendStringInfo(&buf, "FETCH %d FROM %s", howmany, curname);
644 :
645 : /*
646 : * Try to execute the query. Note that since libpq uses malloc, the
647 : * PGresult will be long-lived even though we are still in a short-lived
648 : * memory context.
649 : */
650 26 : res = libpqsrv_exec(conn, buf.data, dblink_we_get_result);
651 52 : if (!res ||
652 52 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
653 26 : PQresultStatus(res) != PGRES_TUPLES_OK))
654 : {
655 10 : dblink_res_error(conn, conname, res, fail,
656 : "while fetching from cursor \"%s\"", curname);
657 6 : return (Datum) 0;
658 : }
659 16 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
660 : {
661 : /* cursor does not exist - closed already or bad name */
662 0 : PQclear(res);
663 0 : ereport(ERROR,
664 : (errcode(ERRCODE_INVALID_CURSOR_NAME),
665 : errmsg("cursor \"%s\" does not exist", curname)));
666 : }
667 :
668 16 : materializeResult(fcinfo, conn, res);
669 14 : return (Datum) 0;
670 : }
671 :
672 : /*
673 : * Note: this is the new preferred version of dblink
674 : */
675 36 : PG_FUNCTION_INFO_V1(dblink_record);
676 : Datum
677 56 : dblink_record(PG_FUNCTION_ARGS)
678 : {
679 56 : return dblink_record_internal(fcinfo, false);
680 : }
681 :
682 8 : PG_FUNCTION_INFO_V1(dblink_send_query);
683 : Datum
684 12 : dblink_send_query(PG_FUNCTION_ARGS)
685 : {
686 : PGconn *conn;
687 : char *sql;
688 : int retval;
689 :
690 12 : if (PG_NARGS() == 2)
691 : {
692 12 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
693 12 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
694 : }
695 : else
696 : /* shouldn't happen */
697 0 : elog(ERROR, "wrong number of arguments");
698 :
699 : /* async query send */
700 12 : retval = PQsendQuery(conn, sql);
701 12 : if (retval != 1)
702 0 : elog(NOTICE, "could not send query: %s", pchomp(PQerrorMessage(conn)));
703 :
704 12 : PG_RETURN_INT32(retval);
705 : }
706 :
707 12 : PG_FUNCTION_INFO_V1(dblink_get_result);
708 : Datum
709 16 : dblink_get_result(PG_FUNCTION_ARGS)
710 : {
711 16 : return dblink_record_internal(fcinfo, true);
712 : }
713 :
714 : static Datum
715 72 : dblink_record_internal(FunctionCallInfo fcinfo, bool is_async)
716 : {
717 72 : PGconn *volatile conn = NULL;
718 72 : volatile bool freeconn = false;
719 :
720 72 : prepTuplestoreResult(fcinfo);
721 :
722 72 : dblink_init();
723 :
724 72 : PG_TRY();
725 : {
726 72 : char *sql = NULL;
727 72 : char *conname = NULL;
728 72 : bool fail = true; /* default to backward compatible */
729 :
730 72 : if (!is_async)
731 : {
732 56 : if (PG_NARGS() == 3)
733 : {
734 : /* text,text,bool */
735 2 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
736 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
737 2 : fail = PG_GETARG_BOOL(2);
738 2 : dblink_get_conn(conname, &conn, &conname, &freeconn);
739 : }
740 54 : else if (PG_NARGS() == 2)
741 : {
742 : /* text,text or text,bool */
743 40 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
744 : {
745 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
746 2 : fail = PG_GETARG_BOOL(1);
747 2 : conn = pconn->conn;
748 : }
749 : else
750 : {
751 38 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
752 38 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
753 38 : dblink_get_conn(conname, &conn, &conname, &freeconn);
754 : }
755 : }
756 14 : else if (PG_NARGS() == 1)
757 : {
758 : /* text */
759 14 : conn = pconn->conn;
760 14 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
761 : }
762 : else
763 : /* shouldn't happen */
764 0 : elog(ERROR, "wrong number of arguments");
765 : }
766 : else /* is_async */
767 : {
768 : /* get async result */
769 16 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
770 :
771 16 : if (PG_NARGS() == 2)
772 : {
773 : /* text,bool */
774 0 : fail = PG_GETARG_BOOL(1);
775 0 : conn = dblink_get_named_conn(conname);
776 : }
777 16 : else if (PG_NARGS() == 1)
778 : {
779 : /* text */
780 16 : conn = dblink_get_named_conn(conname);
781 : }
782 : else
783 : /* shouldn't happen */
784 0 : elog(ERROR, "wrong number of arguments");
785 : }
786 :
787 66 : if (!conn)
788 4 : dblink_conn_not_avail(conname);
789 :
790 62 : if (!is_async)
791 : {
792 : /* synchronous query, use efficient tuple collection method */
793 46 : materializeQueryResult(fcinfo, conn, conname, sql, fail);
794 : }
795 : else
796 : {
797 : /* async result retrieval, do it the old way */
798 16 : PGresult *res = libpqsrv_get_result(conn, dblink_we_get_result);
799 :
800 : /* NULL means we're all done with the async results */
801 16 : if (res)
802 : {
803 10 : if (PQresultStatus(res) != PGRES_COMMAND_OK &&
804 10 : PQresultStatus(res) != PGRES_TUPLES_OK)
805 : {
806 0 : dblink_res_error(conn, conname, res, fail,
807 : "while executing query");
808 : /* if fail isn't set, we'll return an empty query result */
809 : }
810 : else
811 : {
812 10 : materializeResult(fcinfo, conn, res);
813 : }
814 : }
815 : }
816 : }
817 10 : PG_FINALLY();
818 : {
819 : /* if needed, close the connection to the database */
820 72 : if (freeconn)
821 10 : libpqsrv_disconnect(conn);
822 : }
823 72 : PG_END_TRY();
824 :
825 62 : return (Datum) 0;
826 : }
827 :
828 : /*
829 : * Verify function caller can handle a tuplestore result, and set up for that.
830 : *
831 : * Note: if the caller returns without actually creating a tuplestore, the
832 : * executor will treat the function result as an empty set.
833 : */
834 : static void
835 98 : prepTuplestoreResult(FunctionCallInfo fcinfo)
836 : {
837 98 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
838 :
839 : /* check to see if query supports us returning a tuplestore */
840 98 : if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
841 0 : ereport(ERROR,
842 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
843 : errmsg("set-valued function called in context that cannot accept a set")));
844 98 : if (!(rsinfo->allowedModes & SFRM_Materialize))
845 0 : ereport(ERROR,
846 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
847 : errmsg("materialize mode required, but it is not allowed in this context")));
848 :
849 : /* let the executor know we're sending back a tuplestore */
850 98 : rsinfo->returnMode = SFRM_Materialize;
851 :
852 : /* caller must fill these to return a non-empty result */
853 98 : rsinfo->setResult = NULL;
854 98 : rsinfo->setDesc = NULL;
855 98 : }
856 :
857 : /*
858 : * Copy the contents of the PGresult into a tuplestore to be returned
859 : * as the result of the current function.
860 : * The PGresult will be released in this function.
861 : */
862 : static void
863 26 : materializeResult(FunctionCallInfo fcinfo, PGconn *conn, PGresult *res)
864 : {
865 26 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
866 :
867 : /* prepTuplestoreResult must have been called previously */
868 : Assert(rsinfo->returnMode == SFRM_Materialize);
869 :
870 26 : PG_TRY();
871 : {
872 : TupleDesc tupdesc;
873 : bool is_sql_cmd;
874 : int ntuples;
875 : int nfields;
876 :
877 26 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
878 : {
879 0 : is_sql_cmd = true;
880 :
881 : /*
882 : * need a tuple descriptor representing one TEXT column to return
883 : * the command status string as our result tuple
884 : */
885 0 : tupdesc = CreateTemplateTupleDesc(1);
886 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
887 : TEXTOID, -1, 0);
888 0 : ntuples = 1;
889 0 : nfields = 1;
890 : }
891 : else
892 : {
893 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
894 :
895 26 : is_sql_cmd = false;
896 :
897 : /* get a tuple descriptor for our result type */
898 26 : switch (get_call_result_type(fcinfo, NULL, &tupdesc))
899 : {
900 26 : case TYPEFUNC_COMPOSITE:
901 : /* success */
902 26 : break;
903 0 : case TYPEFUNC_RECORD:
904 : /* failed to determine actual type of RECORD */
905 0 : ereport(ERROR,
906 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
907 : errmsg("function returning record called in context "
908 : "that cannot accept type record")));
909 : break;
910 0 : default:
911 : /* result type isn't composite */
912 0 : elog(ERROR, "return type must be a row type");
913 : break;
914 : }
915 :
916 : /* make sure we have a persistent copy of the tupdesc */
917 26 : tupdesc = CreateTupleDescCopy(tupdesc);
918 26 : ntuples = PQntuples(res);
919 26 : nfields = PQnfields(res);
920 : }
921 :
922 : /*
923 : * check result and tuple descriptor have the same number of columns
924 : */
925 26 : if (nfields != tupdesc->natts)
926 0 : ereport(ERROR,
927 : (errcode(ERRCODE_DATATYPE_MISMATCH),
928 : errmsg("remote query result rowtype does not match "
929 : "the specified FROM clause rowtype")));
930 :
931 26 : if (ntuples > 0)
932 : {
933 : AttInMetadata *attinmeta;
934 26 : int nestlevel = -1;
935 : Tuplestorestate *tupstore;
936 : MemoryContext oldcontext;
937 : int row;
938 : char **values;
939 :
940 26 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
941 :
942 : /* Set GUCs to ensure we read GUC-sensitive data types correctly */
943 26 : if (!is_sql_cmd)
944 26 : nestlevel = applyRemoteGucs(conn);
945 :
946 26 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
947 26 : tupstore = tuplestore_begin_heap(true, false, work_mem);
948 26 : rsinfo->setResult = tupstore;
949 26 : rsinfo->setDesc = tupdesc;
950 26 : MemoryContextSwitchTo(oldcontext);
951 :
952 26 : values = palloc_array(char *, nfields);
953 :
954 : /* put all tuples into the tuplestore */
955 98 : for (row = 0; row < ntuples; row++)
956 : {
957 : HeapTuple tuple;
958 :
959 74 : if (!is_sql_cmd)
960 : {
961 : int i;
962 :
963 276 : for (i = 0; i < nfields; i++)
964 : {
965 202 : if (PQgetisnull(res, row, i))
966 0 : values[i] = NULL;
967 : else
968 202 : values[i] = PQgetvalue(res, row, i);
969 : }
970 : }
971 : else
972 : {
973 0 : values[0] = PQcmdStatus(res);
974 : }
975 :
976 : /* build the tuple and put it into the tuplestore. */
977 74 : tuple = BuildTupleFromCStrings(attinmeta, values);
978 72 : tuplestore_puttuple(tupstore, tuple);
979 : }
980 :
981 : /* clean up GUC settings, if we changed any */
982 24 : restoreLocalGucs(nestlevel);
983 : }
984 : }
985 2 : PG_FINALLY();
986 : {
987 : /* be sure to release the libpq result */
988 26 : PQclear(res);
989 : }
990 26 : PG_END_TRY();
991 24 : }
992 :
993 : /*
994 : * Execute the given SQL command and store its results into a tuplestore
995 : * to be returned as the result of the current function.
996 : *
997 : * This is equivalent to PQexec followed by materializeResult, but we make
998 : * use of libpq's single-row mode to avoid accumulating the whole result
999 : * inside libpq before it gets transferred to the tuplestore.
1000 : */
1001 : static void
1002 46 : materializeQueryResult(FunctionCallInfo fcinfo,
1003 : PGconn *conn,
1004 : const char *conname,
1005 : const char *sql,
1006 : bool fail)
1007 : {
1008 46 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1009 46 : PGresult *volatile res = NULL;
1010 46 : volatile storeInfo sinfo = {0};
1011 :
1012 : /* prepTuplestoreResult must have been called previously */
1013 : Assert(rsinfo->returnMode == SFRM_Materialize);
1014 :
1015 46 : sinfo.fcinfo = fcinfo;
1016 :
1017 46 : PG_TRY();
1018 : {
1019 : /* Create short-lived memory context for data conversions */
1020 46 : sinfo.tmpcontext = AllocSetContextCreate(CurrentMemoryContext,
1021 : "dblink temporary context",
1022 : ALLOCSET_DEFAULT_SIZES);
1023 :
1024 : /* execute query, collecting any tuples into the tuplestore */
1025 46 : res = storeQueryResult(&sinfo, conn, sql);
1026 :
1027 46 : if (!res ||
1028 46 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1029 46 : PQresultStatus(res) != PGRES_TUPLES_OK))
1030 4 : {
1031 : /*
1032 : * dblink_res_error will clear the passed PGresult, so we need
1033 : * this ugly dance to avoid doing so twice during error exit
1034 : */
1035 4 : PGresult *res1 = res;
1036 :
1037 4 : res = NULL;
1038 4 : dblink_res_error(conn, conname, res1, fail,
1039 : "while executing query");
1040 : /* if fail isn't set, we'll return an empty query result */
1041 : }
1042 42 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1043 : {
1044 : /*
1045 : * storeRow didn't get called, so we need to convert the command
1046 : * status string to a tuple manually
1047 : */
1048 : TupleDesc tupdesc;
1049 : AttInMetadata *attinmeta;
1050 : Tuplestorestate *tupstore;
1051 : HeapTuple tuple;
1052 : char *values[1];
1053 : MemoryContext oldcontext;
1054 :
1055 : /*
1056 : * need a tuple descriptor representing one TEXT column to return
1057 : * the command status string as our result tuple
1058 : */
1059 0 : tupdesc = CreateTemplateTupleDesc(1);
1060 0 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "status",
1061 : TEXTOID, -1, 0);
1062 0 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1063 :
1064 0 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1065 0 : tupstore = tuplestore_begin_heap(true, false, work_mem);
1066 0 : rsinfo->setResult = tupstore;
1067 0 : rsinfo->setDesc = tupdesc;
1068 0 : MemoryContextSwitchTo(oldcontext);
1069 :
1070 0 : values[0] = PQcmdStatus(res);
1071 :
1072 : /* build the tuple and put it into the tuplestore. */
1073 0 : tuple = BuildTupleFromCStrings(attinmeta, values);
1074 0 : tuplestore_puttuple(tupstore, tuple);
1075 :
1076 0 : PQclear(res);
1077 0 : res = NULL;
1078 : }
1079 : else
1080 : {
1081 : Assert(PQresultStatus(res) == PGRES_TUPLES_OK);
1082 : /* storeRow should have created a tuplestore */
1083 : Assert(rsinfo->setResult != NULL);
1084 :
1085 42 : PQclear(res);
1086 42 : res = NULL;
1087 : }
1088 :
1089 : /* clean up data conversion short-lived memory context */
1090 46 : if (sinfo.tmpcontext != NULL)
1091 46 : MemoryContextDelete(sinfo.tmpcontext);
1092 46 : sinfo.tmpcontext = NULL;
1093 :
1094 46 : PQclear(sinfo.last_res);
1095 46 : sinfo.last_res = NULL;
1096 46 : PQclear(sinfo.cur_res);
1097 46 : sinfo.cur_res = NULL;
1098 : }
1099 0 : PG_CATCH();
1100 : {
1101 : /* be sure to release any libpq result we collected */
1102 0 : PQclear(res);
1103 0 : PQclear(sinfo.last_res);
1104 0 : PQclear(sinfo.cur_res);
1105 : /* and clear out any pending data in libpq */
1106 0 : while ((res = libpqsrv_get_result(conn, dblink_we_get_result)) !=
1107 : NULL)
1108 0 : PQclear(res);
1109 0 : PG_RE_THROW();
1110 : }
1111 46 : PG_END_TRY();
1112 46 : }
1113 :
1114 : /*
1115 : * Execute query, and send any result rows to sinfo->tuplestore.
1116 : */
1117 : static PGresult *
1118 46 : storeQueryResult(volatile storeInfo *sinfo, PGconn *conn, const char *sql)
1119 : {
1120 46 : bool first = true;
1121 46 : int nestlevel = -1;
1122 : PGresult *res;
1123 :
1124 46 : if (!PQsendQuery(conn, sql))
1125 0 : elog(ERROR, "could not send query: %s", pchomp(PQerrorMessage(conn)));
1126 :
1127 46 : if (!PQsetSingleRowMode(conn)) /* shouldn't fail */
1128 0 : elog(ERROR, "failed to set single-row mode for dblink query");
1129 :
1130 : for (;;)
1131 : {
1132 396 : CHECK_FOR_INTERRUPTS();
1133 :
1134 396 : sinfo->cur_res = libpqsrv_get_result(conn, dblink_we_get_result);
1135 396 : if (!sinfo->cur_res)
1136 46 : break;
1137 :
1138 350 : if (PQresultStatus(sinfo->cur_res) == PGRES_SINGLE_TUPLE)
1139 : {
1140 : /* got one row from possibly-bigger resultset */
1141 :
1142 : /*
1143 : * Set GUCs to ensure we read GUC-sensitive data types correctly.
1144 : * We shouldn't do this until we have a row in hand, to ensure
1145 : * libpq has seen any earlier ParameterStatus protocol messages.
1146 : */
1147 304 : if (first && nestlevel < 0)
1148 42 : nestlevel = applyRemoteGucs(conn);
1149 :
1150 304 : storeRow(sinfo, sinfo->cur_res, first);
1151 :
1152 304 : PQclear(sinfo->cur_res);
1153 304 : sinfo->cur_res = NULL;
1154 304 : first = false;
1155 : }
1156 : else
1157 : {
1158 : /* if empty resultset, fill tuplestore header */
1159 46 : if (first && PQresultStatus(sinfo->cur_res) == PGRES_TUPLES_OK)
1160 0 : storeRow(sinfo, sinfo->cur_res, first);
1161 :
1162 : /* store completed result at last_res */
1163 46 : PQclear(sinfo->last_res);
1164 46 : sinfo->last_res = sinfo->cur_res;
1165 46 : sinfo->cur_res = NULL;
1166 46 : first = true;
1167 : }
1168 : }
1169 :
1170 : /* clean up GUC settings, if we changed any */
1171 46 : restoreLocalGucs(nestlevel);
1172 :
1173 : /* return last_res */
1174 46 : res = sinfo->last_res;
1175 46 : sinfo->last_res = NULL;
1176 46 : return res;
1177 : }
1178 :
1179 : /*
1180 : * Send single row to sinfo->tuplestore.
1181 : *
1182 : * If "first" is true, create the tuplestore using PGresult's metadata
1183 : * (in this case the PGresult might contain either zero or one row).
1184 : */
1185 : static void
1186 304 : storeRow(volatile storeInfo *sinfo, PGresult *res, bool first)
1187 : {
1188 304 : int nfields = PQnfields(res);
1189 : HeapTuple tuple;
1190 : int i;
1191 : MemoryContext oldcontext;
1192 :
1193 304 : if (first)
1194 : {
1195 : /* Prepare for new result set */
1196 42 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) sinfo->fcinfo->resultinfo;
1197 : TupleDesc tupdesc;
1198 :
1199 : /*
1200 : * It's possible to get more than one result set if the query string
1201 : * contained multiple SQL commands. In that case, we follow PQexec's
1202 : * traditional behavior of throwing away all but the last result.
1203 : */
1204 42 : if (sinfo->tuplestore)
1205 0 : tuplestore_end(sinfo->tuplestore);
1206 42 : sinfo->tuplestore = NULL;
1207 :
1208 : /* get a tuple descriptor for our result type */
1209 42 : switch (get_call_result_type(sinfo->fcinfo, NULL, &tupdesc))
1210 : {
1211 42 : case TYPEFUNC_COMPOSITE:
1212 : /* success */
1213 42 : break;
1214 0 : case TYPEFUNC_RECORD:
1215 : /* failed to determine actual type of RECORD */
1216 0 : ereport(ERROR,
1217 : (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
1218 : errmsg("function returning record called in context "
1219 : "that cannot accept type record")));
1220 : break;
1221 0 : default:
1222 : /* result type isn't composite */
1223 0 : elog(ERROR, "return type must be a row type");
1224 : break;
1225 : }
1226 :
1227 : /* make sure we have a persistent copy of the tupdesc */
1228 42 : tupdesc = CreateTupleDescCopy(tupdesc);
1229 :
1230 : /* check result and tuple descriptor have the same number of columns */
1231 42 : if (nfields != tupdesc->natts)
1232 0 : ereport(ERROR,
1233 : (errcode(ERRCODE_DATATYPE_MISMATCH),
1234 : errmsg("remote query result rowtype does not match "
1235 : "the specified FROM clause rowtype")));
1236 :
1237 : /* Prepare attinmeta for later data conversions */
1238 42 : sinfo->attinmeta = TupleDescGetAttInMetadata(tupdesc);
1239 :
1240 : /* Create a new, empty tuplestore */
1241 42 : oldcontext = MemoryContextSwitchTo(rsinfo->econtext->ecxt_per_query_memory);
1242 42 : sinfo->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1243 42 : rsinfo->setResult = sinfo->tuplestore;
1244 42 : rsinfo->setDesc = tupdesc;
1245 42 : MemoryContextSwitchTo(oldcontext);
1246 :
1247 : /* Done if empty resultset */
1248 42 : if (PQntuples(res) == 0)
1249 0 : return;
1250 :
1251 : /*
1252 : * Set up sufficiently-wide string pointers array; this won't change
1253 : * in size so it's easy to preallocate.
1254 : */
1255 42 : if (sinfo->cstrs)
1256 0 : pfree(sinfo->cstrs);
1257 42 : sinfo->cstrs = palloc_array(char *, nfields);
1258 : }
1259 :
1260 : /* Should have a single-row result if we get here */
1261 : Assert(PQntuples(res) == 1);
1262 :
1263 : /*
1264 : * Do the following work in a temp context that we reset after each tuple.
1265 : * This cleans up not only the data we have direct access to, but any
1266 : * cruft the I/O functions might leak.
1267 : */
1268 304 : oldcontext = MemoryContextSwitchTo(sinfo->tmpcontext);
1269 :
1270 : /*
1271 : * Fill cstrs with null-terminated strings of column values.
1272 : */
1273 1140 : for (i = 0; i < nfields; i++)
1274 : {
1275 836 : if (PQgetisnull(res, 0, i))
1276 0 : sinfo->cstrs[i] = NULL;
1277 : else
1278 836 : sinfo->cstrs[i] = PQgetvalue(res, 0, i);
1279 : }
1280 :
1281 : /* Convert row to a tuple, and add it to the tuplestore */
1282 304 : tuple = BuildTupleFromCStrings(sinfo->attinmeta, sinfo->cstrs);
1283 :
1284 304 : tuplestore_puttuple(sinfo->tuplestore, tuple);
1285 :
1286 : /* Clean up */
1287 304 : MemoryContextSwitchTo(oldcontext);
1288 304 : MemoryContextReset(sinfo->tmpcontext);
1289 : }
1290 :
1291 : /*
1292 : * List all open dblink connections by name.
1293 : * Returns an array of all connection names.
1294 : * Takes no params
1295 : */
1296 6 : PG_FUNCTION_INFO_V1(dblink_get_connections);
1297 : Datum
1298 2 : dblink_get_connections(PG_FUNCTION_ARGS)
1299 : {
1300 : HASH_SEQ_STATUS status;
1301 : remoteConnHashEnt *hentry;
1302 2 : ArrayBuildState *astate = NULL;
1303 :
1304 2 : if (remoteConnHash)
1305 : {
1306 2 : hash_seq_init(&status, remoteConnHash);
1307 8 : while ((hentry = (remoteConnHashEnt *) hash_seq_search(&status)) != NULL)
1308 : {
1309 : /* ignore it if it's not an open connection */
1310 6 : if (hentry->rconn.conn == NULL)
1311 0 : continue;
1312 : /* stash away current value */
1313 6 : astate = accumArrayResult(astate,
1314 6 : CStringGetTextDatum(hentry->name),
1315 : false, TEXTOID, CurrentMemoryContext);
1316 : }
1317 : }
1318 :
1319 2 : if (astate)
1320 2 : PG_RETURN_DATUM(makeArrayResult(astate,
1321 : CurrentMemoryContext));
1322 : else
1323 0 : PG_RETURN_NULL();
1324 : }
1325 :
1326 : /*
1327 : * Checks if a given remote connection is busy
1328 : *
1329 : * Returns 1 if the connection is busy, 0 otherwise
1330 : * Params:
1331 : * text connection_name - name of the connection to check
1332 : *
1333 : */
1334 6 : PG_FUNCTION_INFO_V1(dblink_is_busy);
1335 : Datum
1336 2 : dblink_is_busy(PG_FUNCTION_ARGS)
1337 : {
1338 : PGconn *conn;
1339 :
1340 2 : dblink_init();
1341 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1342 :
1343 2 : PQconsumeInput(conn);
1344 2 : PG_RETURN_INT32(PQisBusy(conn));
1345 : }
1346 :
1347 : /*
1348 : * Cancels a running request on a connection
1349 : *
1350 : * Returns text:
1351 : * "OK" if the cancel request has been sent correctly,
1352 : * an error message otherwise
1353 : *
1354 : * Params:
1355 : * text connection_name - name of the connection to check
1356 : *
1357 : */
1358 6 : PG_FUNCTION_INFO_V1(dblink_cancel_query);
1359 : Datum
1360 2 : dblink_cancel_query(PG_FUNCTION_ARGS)
1361 : {
1362 : PGconn *conn;
1363 : const char *msg;
1364 : TimestampTz endtime;
1365 :
1366 2 : dblink_init();
1367 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1368 2 : endtime = TimestampTzPlusMilliseconds(GetCurrentTimestamp(),
1369 : 30000);
1370 2 : msg = libpqsrv_cancel(conn, endtime);
1371 2 : if (msg == NULL)
1372 2 : msg = "OK";
1373 :
1374 2 : PG_RETURN_TEXT_P(cstring_to_text(msg));
1375 : }
1376 :
1377 :
1378 : /*
1379 : * Get error message from a connection
1380 : *
1381 : * Returns text:
1382 : * "OK" if no error, an error message otherwise
1383 : *
1384 : * Params:
1385 : * text connection_name - name of the connection to check
1386 : *
1387 : */
1388 6 : PG_FUNCTION_INFO_V1(dblink_error_message);
1389 : Datum
1390 2 : dblink_error_message(PG_FUNCTION_ARGS)
1391 : {
1392 : char *msg;
1393 : PGconn *conn;
1394 :
1395 2 : dblink_init();
1396 2 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1397 :
1398 2 : msg = PQerrorMessage(conn);
1399 2 : if (msg == NULL || msg[0] == '\0')
1400 2 : PG_RETURN_TEXT_P(cstring_to_text("OK"));
1401 : else
1402 0 : PG_RETURN_TEXT_P(cstring_to_text(pchomp(msg)));
1403 : }
1404 :
1405 : /*
1406 : * Execute an SQL non-SELECT command
1407 : */
1408 26 : PG_FUNCTION_INFO_V1(dblink_exec);
1409 : Datum
1410 52 : dblink_exec(PG_FUNCTION_ARGS)
1411 : {
1412 52 : text *volatile sql_cmd_status = NULL;
1413 52 : PGconn *volatile conn = NULL;
1414 52 : volatile bool freeconn = false;
1415 :
1416 52 : dblink_init();
1417 :
1418 52 : PG_TRY();
1419 : {
1420 52 : PGresult *res = NULL;
1421 52 : char *sql = NULL;
1422 52 : char *conname = NULL;
1423 52 : bool fail = true; /* default to backward compatible behavior */
1424 :
1425 52 : if (PG_NARGS() == 3)
1426 : {
1427 : /* must be text,text,bool */
1428 0 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1429 0 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1430 0 : fail = PG_GETARG_BOOL(2);
1431 0 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1432 : }
1433 52 : else if (PG_NARGS() == 2)
1434 : {
1435 : /* might be text,text or text,bool */
1436 34 : if (get_fn_expr_argtype(fcinfo->flinfo, 1) == BOOLOID)
1437 : {
1438 2 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1439 2 : fail = PG_GETARG_BOOL(1);
1440 2 : conn = pconn->conn;
1441 : }
1442 : else
1443 : {
1444 32 : conname = text_to_cstring(PG_GETARG_TEXT_PP(0));
1445 32 : sql = text_to_cstring(PG_GETARG_TEXT_PP(1));
1446 32 : dblink_get_conn(conname, &conn, &conname, &freeconn);
1447 : }
1448 : }
1449 18 : else if (PG_NARGS() == 1)
1450 : {
1451 : /* must be single text argument */
1452 18 : conn = pconn->conn;
1453 18 : sql = text_to_cstring(PG_GETARG_TEXT_PP(0));
1454 : }
1455 : else
1456 : /* shouldn't happen */
1457 0 : elog(ERROR, "wrong number of arguments");
1458 :
1459 52 : if (!conn)
1460 0 : dblink_conn_not_avail(conname);
1461 :
1462 52 : res = libpqsrv_exec(conn, sql, dblink_we_get_result);
1463 52 : if (!res ||
1464 52 : (PQresultStatus(res) != PGRES_COMMAND_OK &&
1465 4 : PQresultStatus(res) != PGRES_TUPLES_OK))
1466 : {
1467 4 : dblink_res_error(conn, conname, res, fail,
1468 : "while executing command");
1469 :
1470 : /*
1471 : * and save a copy of the command status string to return as our
1472 : * result tuple
1473 : */
1474 2 : sql_cmd_status = cstring_to_text("ERROR");
1475 : }
1476 48 : else if (PQresultStatus(res) == PGRES_COMMAND_OK)
1477 : {
1478 : /*
1479 : * and save a copy of the command status string to return as our
1480 : * result tuple
1481 : */
1482 48 : sql_cmd_status = cstring_to_text(PQcmdStatus(res));
1483 48 : PQclear(res);
1484 : }
1485 : else
1486 : {
1487 0 : PQclear(res);
1488 0 : ereport(ERROR,
1489 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
1490 : errmsg("statement returning results not allowed")));
1491 : }
1492 : }
1493 2 : PG_FINALLY();
1494 : {
1495 : /* if needed, close the connection to the database */
1496 52 : if (freeconn)
1497 2 : libpqsrv_disconnect(conn);
1498 : }
1499 52 : PG_END_TRY();
1500 :
1501 50 : PG_RETURN_TEXT_P(sql_cmd_status);
1502 : }
1503 :
1504 :
1505 : /*
1506 : * dblink_get_pkey
1507 : *
1508 : * Return list of primary key fields for the supplied relation,
1509 : * or NULL if none exists.
1510 : */
1511 6 : PG_FUNCTION_INFO_V1(dblink_get_pkey);
1512 : Datum
1513 18 : dblink_get_pkey(PG_FUNCTION_ARGS)
1514 : {
1515 : int16 indnkeyatts;
1516 : char **results;
1517 : FuncCallContext *funcctx;
1518 : int32 call_cntr;
1519 : int32 max_calls;
1520 : AttInMetadata *attinmeta;
1521 : MemoryContext oldcontext;
1522 :
1523 : /* stuff done only on the first call of the function */
1524 18 : if (SRF_IS_FIRSTCALL())
1525 : {
1526 : Relation rel;
1527 : TupleDesc tupdesc;
1528 :
1529 : /* create a function context for cross-call persistence */
1530 6 : funcctx = SRF_FIRSTCALL_INIT();
1531 :
1532 : /*
1533 : * switch to memory context appropriate for multiple function calls
1534 : */
1535 6 : oldcontext = MemoryContextSwitchTo(funcctx->multi_call_memory_ctx);
1536 :
1537 : /* open target relation */
1538 6 : rel = get_rel_from_relname(PG_GETARG_TEXT_PP(0), AccessShareLock, ACL_SELECT);
1539 :
1540 : /* get the array of attnums */
1541 6 : results = get_pkey_attnames(rel, &indnkeyatts);
1542 :
1543 6 : relation_close(rel, AccessShareLock);
1544 :
1545 : /*
1546 : * need a tuple descriptor representing one INT and one TEXT column
1547 : */
1548 6 : tupdesc = CreateTemplateTupleDesc(2);
1549 6 : TupleDescInitEntry(tupdesc, (AttrNumber) 1, "position",
1550 : INT4OID, -1, 0);
1551 6 : TupleDescInitEntry(tupdesc, (AttrNumber) 2, "colname",
1552 : TEXTOID, -1, 0);
1553 :
1554 : /*
1555 : * Generate attribute metadata needed later to produce tuples from raw
1556 : * C strings
1557 : */
1558 6 : attinmeta = TupleDescGetAttInMetadata(tupdesc);
1559 6 : funcctx->attinmeta = attinmeta;
1560 :
1561 6 : if ((results != NULL) && (indnkeyatts > 0))
1562 : {
1563 6 : funcctx->max_calls = indnkeyatts;
1564 :
1565 : /* got results, keep track of them */
1566 6 : funcctx->user_fctx = results;
1567 : }
1568 : else
1569 : {
1570 : /* fast track when no results */
1571 0 : MemoryContextSwitchTo(oldcontext);
1572 0 : SRF_RETURN_DONE(funcctx);
1573 : }
1574 :
1575 6 : MemoryContextSwitchTo(oldcontext);
1576 : }
1577 :
1578 : /* stuff done on every call of the function */
1579 18 : funcctx = SRF_PERCALL_SETUP();
1580 :
1581 : /*
1582 : * initialize per-call variables
1583 : */
1584 18 : call_cntr = funcctx->call_cntr;
1585 18 : max_calls = funcctx->max_calls;
1586 :
1587 18 : results = (char **) funcctx->user_fctx;
1588 18 : attinmeta = funcctx->attinmeta;
1589 :
1590 18 : if (call_cntr < max_calls) /* do when there is more left to send */
1591 : {
1592 : char **values;
1593 : HeapTuple tuple;
1594 : Datum result;
1595 :
1596 12 : values = palloc_array(char *, 2);
1597 12 : values[0] = psprintf("%d", call_cntr + 1);
1598 12 : values[1] = results[call_cntr];
1599 :
1600 : /* build the tuple */
1601 12 : tuple = BuildTupleFromCStrings(attinmeta, values);
1602 :
1603 : /* make the tuple into a datum */
1604 12 : result = HeapTupleGetDatum(tuple);
1605 :
1606 12 : SRF_RETURN_NEXT(funcctx, result);
1607 : }
1608 : else
1609 : {
1610 : /* do when there is no more left */
1611 6 : SRF_RETURN_DONE(funcctx);
1612 : }
1613 : }
1614 :
1615 :
1616 : /*
1617 : * dblink_build_sql_insert
1618 : *
1619 : * Used to generate an SQL insert statement
1620 : * based on an existing tuple in a local relation.
1621 : * This is useful for selectively replicating data
1622 : * to another server via dblink.
1623 : *
1624 : * API:
1625 : * <relname> - name of local table of interest
1626 : * <pkattnums> - an int2vector of attnums which will be used
1627 : * to identify the local tuple of interest
1628 : * <pknumatts> - number of attnums in pkattnums
1629 : * <src_pkattvals_arry> - text array of key values which will be used
1630 : * to identify the local tuple of interest
1631 : * <tgt_pkattvals_arry> - text array of key values which will be used
1632 : * to build the string for execution remotely. These are substituted
1633 : * for their counterparts in src_pkattvals_arry
1634 : */
1635 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_insert);
1636 : Datum
1637 12 : dblink_build_sql_insert(PG_FUNCTION_ARGS)
1638 : {
1639 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1640 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1641 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1642 12 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1643 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1644 : Relation rel;
1645 : int *pkattnums;
1646 : int pknumatts;
1647 : char **src_pkattvals;
1648 : char **tgt_pkattvals;
1649 : int src_nitems;
1650 : int tgt_nitems;
1651 : char *sql;
1652 :
1653 : /*
1654 : * Open target relation.
1655 : */
1656 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1657 :
1658 : /*
1659 : * Process pkattnums argument.
1660 : */
1661 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1662 : &pkattnums, &pknumatts);
1663 :
1664 : /*
1665 : * Source array is made up of key values that will be used to locate the
1666 : * tuple of interest from the local system.
1667 : */
1668 8 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1669 :
1670 : /*
1671 : * There should be one source array key value for each key attnum
1672 : */
1673 8 : if (src_nitems != pknumatts)
1674 0 : ereport(ERROR,
1675 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1676 : errmsg("source key array length must match number of key attributes")));
1677 :
1678 : /*
1679 : * Target array is made up of key values that will be used to build the
1680 : * SQL string for use on the remote system.
1681 : */
1682 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1683 :
1684 : /*
1685 : * There should be one target array key value for each key attnum
1686 : */
1687 8 : if (tgt_nitems != pknumatts)
1688 0 : ereport(ERROR,
1689 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1690 : errmsg("target key array length must match number of key attributes")));
1691 :
1692 : /*
1693 : * Prep work is finally done. Go get the SQL string.
1694 : */
1695 8 : sql = get_sql_insert(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1696 :
1697 : /*
1698 : * Now we can close the relation.
1699 : */
1700 8 : relation_close(rel, AccessShareLock);
1701 :
1702 : /*
1703 : * And send it
1704 : */
1705 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1706 : }
1707 :
1708 :
1709 : /*
1710 : * dblink_build_sql_delete
1711 : *
1712 : * Used to generate an SQL delete statement.
1713 : * This is useful for selectively replicating a
1714 : * delete to another server via dblink.
1715 : *
1716 : * API:
1717 : * <relname> - name of remote table of interest
1718 : * <pkattnums> - an int2vector of attnums which will be used
1719 : * to identify the remote tuple of interest
1720 : * <pknumatts> - number of attnums in pkattnums
1721 : * <tgt_pkattvals_arry> - text array of key values which will be used
1722 : * to build the string for execution remotely.
1723 : */
1724 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_delete);
1725 : Datum
1726 12 : dblink_build_sql_delete(PG_FUNCTION_ARGS)
1727 : {
1728 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1729 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1730 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1731 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1732 : Relation rel;
1733 : int *pkattnums;
1734 : int pknumatts;
1735 : char **tgt_pkattvals;
1736 : int tgt_nitems;
1737 : char *sql;
1738 :
1739 : /*
1740 : * Open target relation.
1741 : */
1742 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1743 :
1744 : /*
1745 : * Process pkattnums argument.
1746 : */
1747 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1748 : &pkattnums, &pknumatts);
1749 :
1750 : /*
1751 : * Target array is made up of key values that will be used to build the
1752 : * SQL string for use on the remote system.
1753 : */
1754 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1755 :
1756 : /*
1757 : * There should be one target array key value for each key attnum
1758 : */
1759 8 : if (tgt_nitems != pknumatts)
1760 0 : ereport(ERROR,
1761 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1762 : errmsg("target key array length must match number of key attributes")));
1763 :
1764 : /*
1765 : * Prep work is finally done. Go get the SQL string.
1766 : */
1767 8 : sql = get_sql_delete(rel, pkattnums, pknumatts, tgt_pkattvals);
1768 :
1769 : /*
1770 : * Now we can close the relation.
1771 : */
1772 8 : relation_close(rel, AccessShareLock);
1773 :
1774 : /*
1775 : * And send it
1776 : */
1777 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1778 : }
1779 :
1780 :
1781 : /*
1782 : * dblink_build_sql_update
1783 : *
1784 : * Used to generate an SQL update statement
1785 : * based on an existing tuple in a local relation.
1786 : * This is useful for selectively replicating data
1787 : * to another server via dblink.
1788 : *
1789 : * API:
1790 : * <relname> - name of local table of interest
1791 : * <pkattnums> - an int2vector of attnums which will be used
1792 : * to identify the local tuple of interest
1793 : * <pknumatts> - number of attnums in pkattnums
1794 : * <src_pkattvals_arry> - text array of key values which will be used
1795 : * to identify the local tuple of interest
1796 : * <tgt_pkattvals_arry> - text array of key values which will be used
1797 : * to build the string for execution remotely. These are substituted
1798 : * for their counterparts in src_pkattvals_arry
1799 : */
1800 8 : PG_FUNCTION_INFO_V1(dblink_build_sql_update);
1801 : Datum
1802 12 : dblink_build_sql_update(PG_FUNCTION_ARGS)
1803 : {
1804 12 : text *relname_text = PG_GETARG_TEXT_PP(0);
1805 12 : int2vector *pkattnums_arg = (int2vector *) PG_GETARG_POINTER(1);
1806 12 : int32 pknumatts_arg = PG_GETARG_INT32(2);
1807 12 : ArrayType *src_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(3);
1808 12 : ArrayType *tgt_pkattvals_arry = PG_GETARG_ARRAYTYPE_P(4);
1809 : Relation rel;
1810 : int *pkattnums;
1811 : int pknumatts;
1812 : char **src_pkattvals;
1813 : char **tgt_pkattvals;
1814 : int src_nitems;
1815 : int tgt_nitems;
1816 : char *sql;
1817 :
1818 : /*
1819 : * Open target relation.
1820 : */
1821 12 : rel = get_rel_from_relname(relname_text, AccessShareLock, ACL_SELECT);
1822 :
1823 : /*
1824 : * Process pkattnums argument.
1825 : */
1826 12 : validate_pkattnums(rel, pkattnums_arg, pknumatts_arg,
1827 : &pkattnums, &pknumatts);
1828 :
1829 : /*
1830 : * Source array is made up of key values that will be used to locate the
1831 : * tuple of interest from the local system.
1832 : */
1833 8 : src_pkattvals = get_text_array_contents(src_pkattvals_arry, &src_nitems);
1834 :
1835 : /*
1836 : * There should be one source array key value for each key attnum
1837 : */
1838 8 : if (src_nitems != pknumatts)
1839 0 : ereport(ERROR,
1840 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1841 : errmsg("source key array length must match number of key attributes")));
1842 :
1843 : /*
1844 : * Target array is made up of key values that will be used to build the
1845 : * SQL string for use on the remote system.
1846 : */
1847 8 : tgt_pkattvals = get_text_array_contents(tgt_pkattvals_arry, &tgt_nitems);
1848 :
1849 : /*
1850 : * There should be one target array key value for each key attnum
1851 : */
1852 8 : if (tgt_nitems != pknumatts)
1853 0 : ereport(ERROR,
1854 : (errcode(ERRCODE_ARRAY_SUBSCRIPT_ERROR),
1855 : errmsg("target key array length must match number of key attributes")));
1856 :
1857 : /*
1858 : * Prep work is finally done. Go get the SQL string.
1859 : */
1860 8 : sql = get_sql_update(rel, pkattnums, pknumatts, src_pkattvals, tgt_pkattvals);
1861 :
1862 : /*
1863 : * Now we can close the relation.
1864 : */
1865 8 : relation_close(rel, AccessShareLock);
1866 :
1867 : /*
1868 : * And send it
1869 : */
1870 8 : PG_RETURN_TEXT_P(cstring_to_text(sql));
1871 : }
1872 :
1873 : /*
1874 : * dblink_current_query
1875 : * return the current query string
1876 : * to allow its use in (among other things)
1877 : * rewrite rules
1878 : */
1879 4 : PG_FUNCTION_INFO_V1(dblink_current_query);
1880 : Datum
1881 0 : dblink_current_query(PG_FUNCTION_ARGS)
1882 : {
1883 : /* This is now just an alias for the built-in function current_query() */
1884 0 : PG_RETURN_DATUM(current_query(fcinfo));
1885 : }
1886 :
1887 : /*
1888 : * Retrieve async notifications for a connection.
1889 : *
1890 : * Returns a setof record of notifications, or an empty set if none received.
1891 : * Can optionally take a named connection as parameter, but uses the unnamed
1892 : * connection per default.
1893 : *
1894 : */
1895 : #define DBLINK_NOTIFY_COLS 3
1896 :
1897 10 : PG_FUNCTION_INFO_V1(dblink_get_notify);
1898 : Datum
1899 4 : dblink_get_notify(PG_FUNCTION_ARGS)
1900 : {
1901 : PGconn *conn;
1902 : PGnotify *notify;
1903 4 : ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
1904 :
1905 4 : dblink_init();
1906 4 : if (PG_NARGS() == 1)
1907 0 : conn = dblink_get_named_conn(text_to_cstring(PG_GETARG_TEXT_PP(0)));
1908 : else
1909 4 : conn = pconn->conn;
1910 :
1911 4 : InitMaterializedSRF(fcinfo, 0);
1912 :
1913 4 : PQconsumeInput(conn);
1914 8 : while ((notify = PQnotifies(conn)) != NULL)
1915 : {
1916 : Datum values[DBLINK_NOTIFY_COLS];
1917 : bool nulls[DBLINK_NOTIFY_COLS];
1918 :
1919 4 : memset(values, 0, sizeof(values));
1920 4 : memset(nulls, 0, sizeof(nulls));
1921 :
1922 4 : if (notify->relname != NULL)
1923 4 : values[0] = CStringGetTextDatum(notify->relname);
1924 : else
1925 0 : nulls[0] = true;
1926 :
1927 4 : values[1] = Int32GetDatum(notify->be_pid);
1928 :
1929 4 : if (notify->extra != NULL)
1930 4 : values[2] = CStringGetTextDatum(notify->extra);
1931 : else
1932 0 : nulls[2] = true;
1933 :
1934 4 : tuplestore_putvalues(rsinfo->setResult, rsinfo->setDesc, values, nulls);
1935 :
1936 4 : PQfreemem(notify);
1937 4 : PQconsumeInput(conn);
1938 : }
1939 :
1940 4 : return (Datum) 0;
1941 : }
1942 :
1943 : /*
1944 : * Validate the options given to a dblink foreign server or user mapping.
1945 : * Raise an error if any option is invalid.
1946 : *
1947 : * We just check the names of options here, so semantic errors in options,
1948 : * such as invalid numeric format, will be detected at the attempt to connect.
1949 : */
1950 28 : PG_FUNCTION_INFO_V1(dblink_fdw_validator);
1951 : Datum
1952 38 : dblink_fdw_validator(PG_FUNCTION_ARGS)
1953 : {
1954 38 : List *options_list = untransformRelOptions(PG_GETARG_DATUM(0));
1955 38 : Oid context = PG_GETARG_OID(1);
1956 : ListCell *cell;
1957 :
1958 : static const PQconninfoOption *options = NULL;
1959 :
1960 : /*
1961 : * Get list of valid libpq options.
1962 : *
1963 : * To avoid unnecessary work, we get the list once and use it throughout
1964 : * the lifetime of this backend process. We don't need to care about
1965 : * memory context issues, because PQconndefaults allocates with malloc.
1966 : */
1967 38 : if (!options)
1968 : {
1969 24 : options = PQconndefaults();
1970 24 : if (!options) /* assume reason for failure is OOM */
1971 0 : ereport(ERROR,
1972 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
1973 : errmsg("out of memory"),
1974 : errdetail("Could not get libpq's default connection options.")));
1975 : }
1976 :
1977 : /* Validate each supplied option. */
1978 100 : foreach(cell, options_list)
1979 : {
1980 78 : DefElem *def = (DefElem *) lfirst(cell);
1981 :
1982 78 : if (!is_valid_dblink_fdw_option(options, def->defname, context))
1983 : {
1984 : /*
1985 : * Unknown option, or invalid option for the context specified, so
1986 : * complain about it. Provide a hint with a valid option that
1987 : * looks similar, if there is one.
1988 : */
1989 : const PQconninfoOption *opt;
1990 : const char *closest_match;
1991 : ClosestMatchState match_state;
1992 16 : bool has_valid_options = false;
1993 :
1994 16 : initClosestMatch(&match_state, def->defname, 4);
1995 816 : for (opt = options; opt->keyword; opt++)
1996 : {
1997 800 : if (is_valid_dblink_option(options, opt->keyword, context))
1998 : {
1999 182 : has_valid_options = true;
2000 182 : updateClosestMatch(&match_state, opt->keyword);
2001 : }
2002 : }
2003 :
2004 16 : closest_match = getClosestMatch(&match_state);
2005 16 : ereport(ERROR,
2006 : (errcode(ERRCODE_FDW_OPTION_NAME_NOT_FOUND),
2007 : errmsg("invalid option \"%s\"", def->defname),
2008 : has_valid_options ? closest_match ?
2009 : errhint("Perhaps you meant the option \"%s\".",
2010 : closest_match) : 0 :
2011 : errhint("There are no valid options in this context.")));
2012 : }
2013 : }
2014 :
2015 22 : PG_RETURN_VOID();
2016 : }
2017 :
2018 :
2019 : /*************************************************************
2020 : * internal functions
2021 : */
2022 :
2023 :
2024 : /*
2025 : * get_pkey_attnames
2026 : *
2027 : * Get the primary key attnames for the given relation.
2028 : * Return NULL, and set indnkeyatts = 0, if no primary key exists.
2029 : */
2030 : static char **
2031 6 : get_pkey_attnames(Relation rel, int16 *indnkeyatts)
2032 : {
2033 : Relation indexRelation;
2034 : ScanKeyData skey;
2035 : SysScanDesc scan;
2036 : HeapTuple indexTuple;
2037 : int i;
2038 6 : char **result = NULL;
2039 : TupleDesc tupdesc;
2040 :
2041 : /* initialize indnkeyatts to 0 in case no primary key exists */
2042 6 : *indnkeyatts = 0;
2043 :
2044 6 : tupdesc = rel->rd_att;
2045 :
2046 : /* Prepare to scan pg_index for entries having indrelid = this rel. */
2047 6 : indexRelation = table_open(IndexRelationId, AccessShareLock);
2048 6 : ScanKeyInit(&skey,
2049 : Anum_pg_index_indrelid,
2050 : BTEqualStrategyNumber, F_OIDEQ,
2051 : ObjectIdGetDatum(RelationGetRelid(rel)));
2052 :
2053 6 : scan = systable_beginscan(indexRelation, IndexIndrelidIndexId, true,
2054 : NULL, 1, &skey);
2055 :
2056 6 : while (HeapTupleIsValid(indexTuple = systable_getnext(scan)))
2057 : {
2058 6 : Form_pg_index index = (Form_pg_index) GETSTRUCT(indexTuple);
2059 :
2060 : /* we're only interested if it is the primary key */
2061 6 : if (index->indisprimary)
2062 : {
2063 6 : *indnkeyatts = index->indnkeyatts;
2064 6 : if (*indnkeyatts > 0)
2065 : {
2066 6 : result = palloc_array(char *, *indnkeyatts);
2067 :
2068 18 : for (i = 0; i < *indnkeyatts; i++)
2069 12 : result[i] = SPI_fname(tupdesc, index->indkey.values[i]);
2070 : }
2071 6 : break;
2072 : }
2073 : }
2074 :
2075 6 : systable_endscan(scan);
2076 6 : table_close(indexRelation, AccessShareLock);
2077 :
2078 6 : return result;
2079 : }
2080 :
2081 : /*
2082 : * Deconstruct a text[] into C-strings (note any NULL elements will be
2083 : * returned as NULL pointers)
2084 : */
2085 : static char **
2086 40 : get_text_array_contents(ArrayType *array, int *numitems)
2087 : {
2088 40 : int ndim = ARR_NDIM(array);
2089 40 : int *dims = ARR_DIMS(array);
2090 : int nitems;
2091 : int16 typlen;
2092 : bool typbyval;
2093 : char typalign;
2094 : char **values;
2095 : char *ptr;
2096 : bits8 *bitmap;
2097 : int bitmask;
2098 : int i;
2099 :
2100 : Assert(ARR_ELEMTYPE(array) == TEXTOID);
2101 :
2102 40 : *numitems = nitems = ArrayGetNItems(ndim, dims);
2103 :
2104 40 : get_typlenbyvalalign(ARR_ELEMTYPE(array),
2105 : &typlen, &typbyval, &typalign);
2106 :
2107 40 : values = palloc_array(char *, nitems);
2108 :
2109 40 : ptr = ARR_DATA_PTR(array);
2110 40 : bitmap = ARR_NULLBITMAP(array);
2111 40 : bitmask = 1;
2112 :
2113 110 : for (i = 0; i < nitems; i++)
2114 : {
2115 70 : if (bitmap && (*bitmap & bitmask) == 0)
2116 : {
2117 0 : values[i] = NULL;
2118 : }
2119 : else
2120 : {
2121 70 : values[i] = TextDatumGetCString(PointerGetDatum(ptr));
2122 70 : ptr = att_addlength_pointer(ptr, typlen, ptr);
2123 70 : ptr = (char *) att_align_nominal(ptr, typalign);
2124 : }
2125 :
2126 : /* advance bitmap pointer if any */
2127 70 : if (bitmap)
2128 : {
2129 0 : bitmask <<= 1;
2130 0 : if (bitmask == 0x100)
2131 : {
2132 0 : bitmap++;
2133 0 : bitmask = 1;
2134 : }
2135 : }
2136 : }
2137 :
2138 40 : return values;
2139 : }
2140 :
2141 : static char *
2142 8 : get_sql_insert(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2143 : {
2144 : char *relname;
2145 : HeapTuple tuple;
2146 : TupleDesc tupdesc;
2147 : int natts;
2148 : StringInfoData buf;
2149 : char *val;
2150 : int key;
2151 : int i;
2152 : bool needComma;
2153 :
2154 8 : initStringInfo(&buf);
2155 :
2156 : /* get relation name including any needed schema prefix and quoting */
2157 8 : relname = generate_relation_name(rel);
2158 :
2159 8 : tupdesc = rel->rd_att;
2160 8 : natts = tupdesc->natts;
2161 :
2162 8 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2163 8 : if (!tuple)
2164 0 : ereport(ERROR,
2165 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2166 : errmsg("source row not found")));
2167 :
2168 8 : appendStringInfo(&buf, "INSERT INTO %s(", relname);
2169 :
2170 8 : needComma = false;
2171 38 : for (i = 0; i < natts; i++)
2172 : {
2173 30 : Form_pg_attribute att = TupleDescAttr(tupdesc, i);
2174 :
2175 30 : if (att->attisdropped)
2176 4 : continue;
2177 :
2178 26 : if (needComma)
2179 18 : appendStringInfoChar(&buf, ',');
2180 :
2181 26 : appendStringInfoString(&buf,
2182 26 : quote_ident_cstr(NameStr(att->attname)));
2183 26 : needComma = true;
2184 : }
2185 :
2186 8 : appendStringInfoString(&buf, ") VALUES(");
2187 :
2188 : /*
2189 : * Note: i is physical column number (counting from 0).
2190 : */
2191 8 : needComma = false;
2192 38 : for (i = 0; i < natts; i++)
2193 : {
2194 30 : if (TupleDescAttr(tupdesc, i)->attisdropped)
2195 4 : continue;
2196 :
2197 26 : if (needComma)
2198 18 : appendStringInfoChar(&buf, ',');
2199 :
2200 26 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2201 :
2202 26 : if (key >= 0)
2203 14 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2204 : else
2205 12 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2206 :
2207 26 : if (val != NULL)
2208 : {
2209 26 : appendStringInfoString(&buf, quote_literal_cstr(val));
2210 26 : pfree(val);
2211 : }
2212 : else
2213 0 : appendStringInfoString(&buf, "NULL");
2214 26 : needComma = true;
2215 : }
2216 8 : appendStringInfoChar(&buf, ')');
2217 :
2218 8 : return buf.data;
2219 : }
2220 :
2221 : static char *
2222 8 : get_sql_delete(Relation rel, int *pkattnums, int pknumatts, char **tgt_pkattvals)
2223 : {
2224 : char *relname;
2225 : TupleDesc tupdesc;
2226 : StringInfoData buf;
2227 : int i;
2228 :
2229 8 : initStringInfo(&buf);
2230 :
2231 : /* get relation name including any needed schema prefix and quoting */
2232 8 : relname = generate_relation_name(rel);
2233 :
2234 8 : tupdesc = rel->rd_att;
2235 :
2236 8 : appendStringInfo(&buf, "DELETE FROM %s WHERE ", relname);
2237 22 : for (i = 0; i < pknumatts; i++)
2238 : {
2239 14 : int pkattnum = pkattnums[i];
2240 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2241 :
2242 14 : if (i > 0)
2243 6 : appendStringInfoString(&buf, " AND ");
2244 :
2245 14 : appendStringInfoString(&buf,
2246 14 : quote_ident_cstr(NameStr(attr->attname)));
2247 :
2248 14 : if (tgt_pkattvals[i] != NULL)
2249 14 : appendStringInfo(&buf, " = %s",
2250 14 : quote_literal_cstr(tgt_pkattvals[i]));
2251 : else
2252 0 : appendStringInfoString(&buf, " IS NULL");
2253 : }
2254 :
2255 8 : return buf.data;
2256 : }
2257 :
2258 : static char *
2259 8 : get_sql_update(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals, char **tgt_pkattvals)
2260 : {
2261 : char *relname;
2262 : HeapTuple tuple;
2263 : TupleDesc tupdesc;
2264 : int natts;
2265 : StringInfoData buf;
2266 : char *val;
2267 : int key;
2268 : int i;
2269 : bool needComma;
2270 :
2271 8 : initStringInfo(&buf);
2272 :
2273 : /* get relation name including any needed schema prefix and quoting */
2274 8 : relname = generate_relation_name(rel);
2275 :
2276 8 : tupdesc = rel->rd_att;
2277 8 : natts = tupdesc->natts;
2278 :
2279 8 : tuple = get_tuple_of_interest(rel, pkattnums, pknumatts, src_pkattvals);
2280 8 : if (!tuple)
2281 0 : ereport(ERROR,
2282 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2283 : errmsg("source row not found")));
2284 :
2285 8 : appendStringInfo(&buf, "UPDATE %s SET ", relname);
2286 :
2287 : /*
2288 : * Note: i is physical column number (counting from 0).
2289 : */
2290 8 : needComma = false;
2291 38 : for (i = 0; i < natts; i++)
2292 : {
2293 30 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2294 :
2295 30 : if (attr->attisdropped)
2296 4 : continue;
2297 :
2298 26 : if (needComma)
2299 18 : appendStringInfoString(&buf, ", ");
2300 :
2301 26 : appendStringInfo(&buf, "%s = ",
2302 26 : quote_ident_cstr(NameStr(attr->attname)));
2303 :
2304 26 : key = get_attnum_pk_pos(pkattnums, pknumatts, i);
2305 :
2306 26 : if (key >= 0)
2307 14 : val = tgt_pkattvals[key] ? pstrdup(tgt_pkattvals[key]) : NULL;
2308 : else
2309 12 : val = SPI_getvalue(tuple, tupdesc, i + 1);
2310 :
2311 26 : if (val != NULL)
2312 : {
2313 26 : appendStringInfoString(&buf, quote_literal_cstr(val));
2314 26 : pfree(val);
2315 : }
2316 : else
2317 0 : appendStringInfoString(&buf, "NULL");
2318 26 : needComma = true;
2319 : }
2320 :
2321 8 : appendStringInfoString(&buf, " WHERE ");
2322 :
2323 22 : for (i = 0; i < pknumatts; i++)
2324 : {
2325 14 : int pkattnum = pkattnums[i];
2326 14 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2327 :
2328 14 : if (i > 0)
2329 6 : appendStringInfoString(&buf, " AND ");
2330 :
2331 14 : appendStringInfoString(&buf,
2332 14 : quote_ident_cstr(NameStr(attr->attname)));
2333 :
2334 14 : val = tgt_pkattvals[i];
2335 :
2336 14 : if (val != NULL)
2337 14 : appendStringInfo(&buf, " = %s", quote_literal_cstr(val));
2338 : else
2339 0 : appendStringInfoString(&buf, " IS NULL");
2340 : }
2341 :
2342 8 : return buf.data;
2343 : }
2344 :
2345 : /*
2346 : * Return a properly quoted identifier.
2347 : * Uses quote_ident in quote.c
2348 : */
2349 : static char *
2350 160 : quote_ident_cstr(char *rawstr)
2351 : {
2352 : text *rawstr_text;
2353 : text *result_text;
2354 : char *result;
2355 :
2356 160 : rawstr_text = cstring_to_text(rawstr);
2357 160 : result_text = DatumGetTextPP(DirectFunctionCall1(quote_ident,
2358 : PointerGetDatum(rawstr_text)));
2359 160 : result = text_to_cstring(result_text);
2360 :
2361 160 : return result;
2362 : }
2363 :
2364 : static int
2365 52 : get_attnum_pk_pos(int *pkattnums, int pknumatts, int key)
2366 : {
2367 : int i;
2368 :
2369 : /*
2370 : * Not likely a long list anyway, so just scan for the value
2371 : */
2372 100 : for (i = 0; i < pknumatts; i++)
2373 76 : if (key == pkattnums[i])
2374 28 : return i;
2375 :
2376 24 : return -1;
2377 : }
2378 :
2379 : static HeapTuple
2380 16 : get_tuple_of_interest(Relation rel, int *pkattnums, int pknumatts, char **src_pkattvals)
2381 : {
2382 : char *relname;
2383 : TupleDesc tupdesc;
2384 : int natts;
2385 : StringInfoData buf;
2386 : int ret;
2387 : HeapTuple tuple;
2388 : int i;
2389 :
2390 : /*
2391 : * Connect to SPI manager
2392 : */
2393 16 : SPI_connect();
2394 :
2395 16 : initStringInfo(&buf);
2396 :
2397 : /* get relation name including any needed schema prefix and quoting */
2398 16 : relname = generate_relation_name(rel);
2399 :
2400 16 : tupdesc = rel->rd_att;
2401 16 : natts = tupdesc->natts;
2402 :
2403 : /*
2404 : * Build sql statement to look up tuple of interest, ie, the one matching
2405 : * src_pkattvals. We used to use "SELECT *" here, but it's simpler to
2406 : * generate a result tuple that matches the table's physical structure,
2407 : * with NULLs for any dropped columns. Otherwise we have to deal with two
2408 : * different tupdescs and everything's very confusing.
2409 : */
2410 16 : appendStringInfoString(&buf, "SELECT ");
2411 :
2412 76 : for (i = 0; i < natts; i++)
2413 : {
2414 60 : Form_pg_attribute attr = TupleDescAttr(tupdesc, i);
2415 :
2416 60 : if (i > 0)
2417 44 : appendStringInfoString(&buf, ", ");
2418 :
2419 60 : if (attr->attisdropped)
2420 8 : appendStringInfoString(&buf, "NULL");
2421 : else
2422 52 : appendStringInfoString(&buf,
2423 52 : quote_ident_cstr(NameStr(attr->attname)));
2424 : }
2425 :
2426 16 : appendStringInfo(&buf, " FROM %s WHERE ", relname);
2427 :
2428 44 : for (i = 0; i < pknumatts; i++)
2429 : {
2430 28 : int pkattnum = pkattnums[i];
2431 28 : Form_pg_attribute attr = TupleDescAttr(tupdesc, pkattnum);
2432 :
2433 28 : if (i > 0)
2434 12 : appendStringInfoString(&buf, " AND ");
2435 :
2436 28 : appendStringInfoString(&buf,
2437 28 : quote_ident_cstr(NameStr(attr->attname)));
2438 :
2439 28 : if (src_pkattvals[i] != NULL)
2440 28 : appendStringInfo(&buf, " = %s",
2441 28 : quote_literal_cstr(src_pkattvals[i]));
2442 : else
2443 0 : appendStringInfoString(&buf, " IS NULL");
2444 : }
2445 :
2446 : /*
2447 : * Retrieve the desired tuple
2448 : */
2449 16 : ret = SPI_exec(buf.data, 0);
2450 16 : pfree(buf.data);
2451 :
2452 : /*
2453 : * Only allow one qualifying tuple
2454 : */
2455 16 : if ((ret == SPI_OK_SELECT) && (SPI_processed > 1))
2456 0 : ereport(ERROR,
2457 : (errcode(ERRCODE_CARDINALITY_VIOLATION),
2458 : errmsg("source criteria matched more than one record")));
2459 :
2460 16 : else if (ret == SPI_OK_SELECT && SPI_processed == 1)
2461 : {
2462 16 : SPITupleTable *tuptable = SPI_tuptable;
2463 :
2464 16 : tuple = SPI_copytuple(tuptable->vals[0]);
2465 16 : SPI_finish();
2466 :
2467 16 : return tuple;
2468 : }
2469 : else
2470 : {
2471 : /*
2472 : * no qualifying tuples
2473 : */
2474 0 : SPI_finish();
2475 :
2476 0 : return NULL;
2477 : }
2478 :
2479 : /*
2480 : * never reached, but keep compiler quiet
2481 : */
2482 : return NULL;
2483 : }
2484 :
2485 : /*
2486 : * Open the relation named by relname_text, acquire specified type of lock,
2487 : * verify we have specified permissions.
2488 : * Caller must close rel when done with it.
2489 : */
2490 : static Relation
2491 42 : get_rel_from_relname(text *relname_text, LOCKMODE lockmode, AclMode aclmode)
2492 : {
2493 : RangeVar *relvar;
2494 : Relation rel;
2495 : AclResult aclresult;
2496 :
2497 42 : relvar = makeRangeVarFromNameList(textToQualifiedNameList(relname_text));
2498 42 : rel = table_openrv(relvar, lockmode);
2499 :
2500 42 : aclresult = pg_class_aclcheck(RelationGetRelid(rel), GetUserId(),
2501 : aclmode);
2502 42 : if (aclresult != ACLCHECK_OK)
2503 0 : aclcheck_error(aclresult, get_relkind_objtype(rel->rd_rel->relkind),
2504 0 : RelationGetRelationName(rel));
2505 :
2506 42 : return rel;
2507 : }
2508 :
2509 : /*
2510 : * generate_relation_name - copied from ruleutils.c
2511 : * Compute the name to display for a relation
2512 : *
2513 : * The result includes all necessary quoting and schema-prefixing.
2514 : */
2515 : static char *
2516 40 : generate_relation_name(Relation rel)
2517 : {
2518 : char *nspname;
2519 : char *result;
2520 :
2521 : /* Qualify the name if not visible in search path */
2522 40 : if (RelationIsVisible(RelationGetRelid(rel)))
2523 30 : nspname = NULL;
2524 : else
2525 10 : nspname = get_namespace_name(rel->rd_rel->relnamespace);
2526 :
2527 40 : result = quote_qualified_identifier(nspname, RelationGetRelationName(rel));
2528 :
2529 40 : return result;
2530 : }
2531 :
2532 :
2533 : static remoteConn *
2534 156 : getConnectionByName(const char *name)
2535 : {
2536 : remoteConnHashEnt *hentry;
2537 : char *key;
2538 :
2539 156 : if (!remoteConnHash)
2540 10 : remoteConnHash = createConnHash();
2541 :
2542 156 : key = pstrdup(name);
2543 156 : truncate_identifier(key, strlen(key), false);
2544 156 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2545 : key, HASH_FIND, NULL);
2546 :
2547 156 : if (hentry && hentry->rconn.conn != NULL)
2548 136 : return &hentry->rconn;
2549 :
2550 20 : return NULL;
2551 : }
2552 :
2553 : static HTAB *
2554 12 : createConnHash(void)
2555 : {
2556 : HASHCTL ctl;
2557 :
2558 12 : ctl.keysize = NAMEDATALEN;
2559 12 : ctl.entrysize = sizeof(remoteConnHashEnt);
2560 :
2561 12 : return hash_create("Remote Con hash", NUMCONN, &ctl,
2562 : HASH_ELEM | HASH_STRINGS);
2563 : }
2564 :
2565 : static remoteConn *
2566 20 : createNewConnection(const char *name)
2567 : {
2568 : remoteConnHashEnt *hentry;
2569 : bool found;
2570 : char *key;
2571 :
2572 20 : if (!remoteConnHash)
2573 2 : remoteConnHash = createConnHash();
2574 :
2575 20 : key = pstrdup(name);
2576 20 : truncate_identifier(key, strlen(key), true);
2577 20 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash, key,
2578 : HASH_ENTER, &found);
2579 :
2580 20 : if (found && hentry->rconn.conn != NULL)
2581 2 : ereport(ERROR,
2582 : (errcode(ERRCODE_DUPLICATE_OBJECT),
2583 : errmsg("duplicate connection name")));
2584 :
2585 : /* New, or reusable, so initialize the rconn struct to zeroes */
2586 18 : memset(&hentry->rconn, 0, sizeof(remoteConn));
2587 :
2588 18 : return &hentry->rconn;
2589 : }
2590 :
2591 : static void
2592 16 : deleteConnection(const char *name)
2593 : {
2594 : remoteConnHashEnt *hentry;
2595 : bool found;
2596 : char *key;
2597 :
2598 16 : if (!remoteConnHash)
2599 0 : remoteConnHash = createConnHash();
2600 :
2601 16 : key = pstrdup(name);
2602 16 : truncate_identifier(key, strlen(key), false);
2603 16 : hentry = (remoteConnHashEnt *) hash_search(remoteConnHash,
2604 : key, HASH_REMOVE, &found);
2605 :
2606 16 : if (!hentry)
2607 0 : ereport(ERROR,
2608 : (errcode(ERRCODE_UNDEFINED_OBJECT),
2609 : errmsg("undefined connection name")));
2610 16 : }
2611 :
2612 : /*
2613 : * Ensure that require_auth and SCRAM keys are correctly set on connstr.
2614 : * SCRAM keys used to pass-through are coming from the initial connection
2615 : * from the client with the server.
2616 : *
2617 : * All required SCRAM options are set by dblink, so we just need to ensure
2618 : * that these options are not overwritten by the user.
2619 : *
2620 : * See appendSCRAMKeysInfo and its usage for more.
2621 : */
2622 : bool
2623 10 : dblink_connstr_has_required_scram_options(const char *connstr)
2624 : {
2625 : PQconninfoOption *options;
2626 10 : bool has_scram_server_key = false;
2627 10 : bool has_scram_client_key = false;
2628 10 : bool has_require_auth = false;
2629 10 : bool has_scram_keys = false;
2630 :
2631 10 : options = PQconninfoParse(connstr, NULL);
2632 10 : if (options)
2633 : {
2634 : /*
2635 : * Continue iterating even if we found the keys that we need to
2636 : * validate to make sure that there is no other declaration of these
2637 : * keys that can overwrite the first.
2638 : */
2639 510 : for (PQconninfoOption *option = options; option->keyword != NULL; option++)
2640 : {
2641 500 : if (strcmp(option->keyword, "require_auth") == 0)
2642 : {
2643 10 : if (option->val != NULL && strcmp(option->val, "scram-sha-256") == 0)
2644 8 : has_require_auth = true;
2645 : else
2646 2 : has_require_auth = false;
2647 : }
2648 :
2649 500 : if (strcmp(option->keyword, "scram_client_key") == 0)
2650 : {
2651 10 : if (option->val != NULL && option->val[0] != '\0')
2652 10 : has_scram_client_key = true;
2653 : else
2654 0 : has_scram_client_key = false;
2655 : }
2656 :
2657 500 : if (strcmp(option->keyword, "scram_server_key") == 0)
2658 : {
2659 10 : if (option->val != NULL && option->val[0] != '\0')
2660 10 : has_scram_server_key = true;
2661 : else
2662 0 : has_scram_server_key = false;
2663 : }
2664 : }
2665 10 : PQconninfoFree(options);
2666 : }
2667 :
2668 10 : has_scram_keys = has_scram_client_key && has_scram_server_key && MyProcPort->has_scram_keys;
2669 :
2670 10 : return (has_scram_keys && has_require_auth);
2671 : }
2672 :
2673 : /*
2674 : * We need to make sure that the connection made used credentials
2675 : * which were provided by the user, so check what credentials were
2676 : * used to connect and then make sure that they came from the user.
2677 : *
2678 : * On failure, we close "conn" and also delete the hashtable entry
2679 : * identified by "connname" (if that's not NULL).
2680 : */
2681 : static void
2682 40 : dblink_security_check(PGconn *conn, const char *connname, const char *connstr)
2683 : {
2684 : /* Superuser bypasses security check */
2685 40 : if (superuser())
2686 36 : return;
2687 :
2688 : /* If password was used to connect, make sure it was one provided */
2689 4 : if (PQconnectionUsedPassword(conn) && dblink_connstr_has_pw(connstr))
2690 0 : return;
2691 :
2692 : /*
2693 : * Password was not used to connect, check if SCRAM pass-through is in
2694 : * use.
2695 : *
2696 : * If dblink_connstr_has_required_scram_options is true we assume that
2697 : * UseScramPassthrough is also true because the required SCRAM keys are
2698 : * only added if UseScramPassthrough is set, and the user is not allowed
2699 : * to add the SCRAM keys on fdw and user mapping options.
2700 : */
2701 4 : if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2702 4 : return;
2703 :
2704 : #ifdef ENABLE_GSS
2705 : /* If GSSAPI creds used to connect, make sure it was one delegated */
2706 : if (PQconnectionUsedGSSAPI(conn) && be_gssapi_get_delegation(MyProcPort))
2707 : return;
2708 : #endif
2709 :
2710 : /* Otherwise, fail out */
2711 0 : libpqsrv_disconnect(conn);
2712 0 : if (connname)
2713 0 : deleteConnection(connname);
2714 :
2715 0 : ereport(ERROR,
2716 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2717 : errmsg("password or GSSAPI delegated credentials required"),
2718 : errdetail("Non-superusers may only connect using credentials they provide, eg: password in connection string or delegated GSSAPI credentials"),
2719 : errhint("Ensure provided credentials match target server's authentication method.")));
2720 : }
2721 :
2722 : /*
2723 : * Function to check if the connection string includes an explicit
2724 : * password, needed to ensure that non-superuser password-based auth
2725 : * is using a provided password and not one picked up from the
2726 : * environment.
2727 : */
2728 : static bool
2729 12 : dblink_connstr_has_pw(const char *connstr)
2730 : {
2731 : PQconninfoOption *options;
2732 : PQconninfoOption *option;
2733 12 : bool connstr_gives_password = false;
2734 :
2735 12 : options = PQconninfoParse(connstr, NULL);
2736 12 : if (options)
2737 : {
2738 612 : for (option = options; option->keyword != NULL; option++)
2739 : {
2740 600 : if (strcmp(option->keyword, "password") == 0)
2741 : {
2742 12 : if (option->val != NULL && option->val[0] != '\0')
2743 : {
2744 0 : connstr_gives_password = true;
2745 0 : break;
2746 : }
2747 : }
2748 : }
2749 12 : PQconninfoFree(options);
2750 : }
2751 :
2752 12 : return connstr_gives_password;
2753 : }
2754 :
2755 : /*
2756 : * For non-superusers, insist that the connstr specify a password, except if
2757 : * GSSAPI credentials have been delegated (and we check that they are used for
2758 : * the connection in dblink_security_check later) or if SCRAM pass-through is
2759 : * being used. This prevents a password or GSSAPI credentials from being
2760 : * picked up from .pgpass, a service file, the environment, etc. We don't want
2761 : * the postgres user's passwords or Kerberos credentials to be accessible to
2762 : * non-superusers. In case of SCRAM pass-through insist that the connstr
2763 : * has the required SCRAM pass-through options.
2764 : */
2765 : static void
2766 50 : dblink_connstr_check(const char *connstr)
2767 : {
2768 50 : if (superuser())
2769 42 : return;
2770 :
2771 8 : if (dblink_connstr_has_pw(connstr))
2772 0 : return;
2773 :
2774 8 : if (MyProcPort->has_scram_keys && dblink_connstr_has_required_scram_options(connstr))
2775 4 : return;
2776 :
2777 : #ifdef ENABLE_GSS
2778 : if (be_gssapi_get_delegation(MyProcPort))
2779 : return;
2780 : #endif
2781 :
2782 4 : ereport(ERROR,
2783 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
2784 : errmsg("password or GSSAPI delegated credentials required"),
2785 : errdetail("Non-superusers must provide a password in the connection string or send delegated GSSAPI credentials.")));
2786 : }
2787 :
2788 : /*
2789 : * Report an error received from the remote server
2790 : *
2791 : * res: the received error result (will be freed)
2792 : * fail: true for ERROR ereport, false for NOTICE
2793 : * fmt and following args: sprintf-style format and values for errcontext;
2794 : * the resulting string should be worded like "while <some action>"
2795 : */
2796 : static void
2797 24 : dblink_res_error(PGconn *conn, const char *conname, PGresult *res,
2798 : bool fail, const char *fmt,...)
2799 : {
2800 : int level;
2801 24 : char *pg_diag_sqlstate = PQresultErrorField(res, PG_DIAG_SQLSTATE);
2802 24 : char *pg_diag_message_primary = PQresultErrorField(res, PG_DIAG_MESSAGE_PRIMARY);
2803 24 : char *pg_diag_message_detail = PQresultErrorField(res, PG_DIAG_MESSAGE_DETAIL);
2804 24 : char *pg_diag_message_hint = PQresultErrorField(res, PG_DIAG_MESSAGE_HINT);
2805 24 : char *pg_diag_context = PQresultErrorField(res, PG_DIAG_CONTEXT);
2806 : int sqlstate;
2807 : char *message_primary;
2808 : char *message_detail;
2809 : char *message_hint;
2810 : char *message_context;
2811 : va_list ap;
2812 : char dblink_context_msg[512];
2813 :
2814 24 : if (fail)
2815 6 : level = ERROR;
2816 : else
2817 18 : level = NOTICE;
2818 :
2819 24 : if (pg_diag_sqlstate)
2820 24 : sqlstate = MAKE_SQLSTATE(pg_diag_sqlstate[0],
2821 : pg_diag_sqlstate[1],
2822 : pg_diag_sqlstate[2],
2823 : pg_diag_sqlstate[3],
2824 : pg_diag_sqlstate[4]);
2825 : else
2826 0 : sqlstate = ERRCODE_CONNECTION_FAILURE;
2827 :
2828 24 : message_primary = xpstrdup(pg_diag_message_primary);
2829 24 : message_detail = xpstrdup(pg_diag_message_detail);
2830 24 : message_hint = xpstrdup(pg_diag_message_hint);
2831 24 : message_context = xpstrdup(pg_diag_context);
2832 :
2833 : /*
2834 : * If we don't get a message from the PGresult, try the PGconn. This is
2835 : * needed because for connection-level failures, PQgetResult may just
2836 : * return NULL, not a PGresult at all.
2837 : */
2838 24 : if (message_primary == NULL)
2839 0 : message_primary = pchomp(PQerrorMessage(conn));
2840 :
2841 : /*
2842 : * Now that we've copied all the data we need out of the PGresult, it's
2843 : * safe to free it. We must do this to avoid PGresult leakage. We're
2844 : * leaking all the strings too, but those are in palloc'd memory that will
2845 : * get cleaned up eventually.
2846 : */
2847 24 : PQclear(res);
2848 :
2849 : /*
2850 : * Format the basic errcontext string. Below, we'll add on something
2851 : * about the connection name. That's a violation of the translatability
2852 : * guidelines about constructing error messages out of parts, but since
2853 : * there's no translation support for dblink, there's no need to worry
2854 : * about that (yet).
2855 : */
2856 24 : va_start(ap, fmt);
2857 24 : vsnprintf(dblink_context_msg, sizeof(dblink_context_msg), fmt, ap);
2858 24 : va_end(ap);
2859 :
2860 24 : ereport(level,
2861 : (errcode(sqlstate),
2862 : (message_primary != NULL && message_primary[0] != '\0') ?
2863 : errmsg_internal("%s", message_primary) :
2864 : errmsg("could not obtain message string for remote error"),
2865 : message_detail ? errdetail_internal("%s", message_detail) : 0,
2866 : message_hint ? errhint("%s", message_hint) : 0,
2867 : message_context ? (errcontext("%s", message_context)) : 0,
2868 : conname ?
2869 : (errcontext("%s on dblink connection named \"%s\"",
2870 : dblink_context_msg, conname)) :
2871 : (errcontext("%s on unnamed dblink connection",
2872 : dblink_context_msg))));
2873 18 : }
2874 :
2875 : /*
2876 : * Obtain connection string for a foreign server
2877 : */
2878 : static char *
2879 50 : get_connect_string(const char *servername)
2880 : {
2881 50 : ForeignServer *foreign_server = NULL;
2882 : UserMapping *user_mapping;
2883 : ListCell *cell;
2884 : StringInfoData buf;
2885 : ForeignDataWrapper *fdw;
2886 : AclResult aclresult;
2887 : char *srvname;
2888 :
2889 : static const PQconninfoOption *options = NULL;
2890 :
2891 50 : initStringInfo(&buf);
2892 :
2893 : /*
2894 : * Get list of valid libpq options.
2895 : *
2896 : * To avoid unnecessary work, we get the list once and use it throughout
2897 : * the lifetime of this backend process. We don't need to care about
2898 : * memory context issues, because PQconndefaults allocates with malloc.
2899 : */
2900 50 : if (!options)
2901 : {
2902 12 : options = PQconndefaults();
2903 12 : if (!options) /* assume reason for failure is OOM */
2904 0 : ereport(ERROR,
2905 : (errcode(ERRCODE_FDW_OUT_OF_MEMORY),
2906 : errmsg("out of memory"),
2907 : errdetail("Could not get libpq's default connection options.")));
2908 : }
2909 :
2910 : /* first gather the server connstr options */
2911 50 : srvname = pstrdup(servername);
2912 50 : truncate_identifier(srvname, strlen(srvname), false);
2913 50 : foreign_server = GetForeignServerByName(srvname, true);
2914 :
2915 50 : if (foreign_server)
2916 : {
2917 10 : Oid serverid = foreign_server->serverid;
2918 10 : Oid fdwid = foreign_server->fdwid;
2919 10 : Oid userid = GetUserId();
2920 :
2921 10 : user_mapping = GetUserMapping(userid, serverid);
2922 10 : fdw = GetForeignDataWrapper(fdwid);
2923 :
2924 : /* Check permissions, user must have usage on the server. */
2925 10 : aclresult = object_aclcheck(ForeignServerRelationId, serverid, userid, ACL_USAGE);
2926 10 : if (aclresult != ACLCHECK_OK)
2927 0 : aclcheck_error(aclresult, OBJECT_FOREIGN_SERVER, foreign_server->servername);
2928 :
2929 : /*
2930 : * First append hardcoded options needed for SCRAM pass-through, so if
2931 : * the user overwrites these options we can ereport on
2932 : * dblink_connstr_check and dblink_security_check.
2933 : */
2934 10 : if (MyProcPort->has_scram_keys && UseScramPassthrough(foreign_server, user_mapping))
2935 6 : appendSCRAMKeysInfo(&buf);
2936 :
2937 10 : foreach(cell, fdw->options)
2938 : {
2939 0 : DefElem *def = lfirst(cell);
2940 :
2941 0 : if (is_valid_dblink_option(options, def->defname, ForeignDataWrapperRelationId))
2942 0 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2943 0 : escape_param_str(strVal(def->arg)));
2944 : }
2945 :
2946 44 : foreach(cell, foreign_server->options)
2947 : {
2948 34 : DefElem *def = lfirst(cell);
2949 :
2950 34 : if (is_valid_dblink_option(options, def->defname, ForeignServerRelationId))
2951 28 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2952 28 : escape_param_str(strVal(def->arg)));
2953 : }
2954 :
2955 20 : foreach(cell, user_mapping->options)
2956 : {
2957 :
2958 10 : DefElem *def = lfirst(cell);
2959 :
2960 10 : if (is_valid_dblink_option(options, def->defname, UserMappingRelationId))
2961 10 : appendStringInfo(&buf, "%s='%s' ", def->defname,
2962 10 : escape_param_str(strVal(def->arg)));
2963 : }
2964 :
2965 10 : return buf.data;
2966 : }
2967 : else
2968 40 : return NULL;
2969 : }
2970 :
2971 : /*
2972 : * Escaping libpq connect parameter strings.
2973 : *
2974 : * Replaces "'" with "\'" and "\" with "\\".
2975 : */
2976 : static char *
2977 38 : escape_param_str(const char *str)
2978 : {
2979 : const char *cp;
2980 : StringInfoData buf;
2981 :
2982 38 : initStringInfo(&buf);
2983 :
2984 344 : for (cp = str; *cp; cp++)
2985 : {
2986 306 : if (*cp == '\\' || *cp == '\'')
2987 0 : appendStringInfoChar(&buf, '\\');
2988 306 : appendStringInfoChar(&buf, *cp);
2989 : }
2990 :
2991 38 : return buf.data;
2992 : }
2993 :
2994 : /*
2995 : * Validate the PK-attnums argument for dblink_build_sql_insert() and related
2996 : * functions, and translate to the internal representation.
2997 : *
2998 : * The user supplies an int2vector of 1-based logical attnums, plus a count
2999 : * argument (the need for the separate count argument is historical, but we
3000 : * still check it). We check that each attnum corresponds to a valid,
3001 : * non-dropped attribute of the rel. We do *not* prevent attnums from being
3002 : * listed twice, though the actual use-case for such things is dubious.
3003 : * Note that before Postgres 9.0, the user's attnums were interpreted as
3004 : * physical not logical column numbers; this was changed for future-proofing.
3005 : *
3006 : * The internal representation is a palloc'd int array of 0-based physical
3007 : * attnums.
3008 : */
3009 : static void
3010 36 : validate_pkattnums(Relation rel,
3011 : int2vector *pkattnums_arg, int32 pknumatts_arg,
3012 : int **pkattnums, int *pknumatts)
3013 : {
3014 36 : TupleDesc tupdesc = rel->rd_att;
3015 36 : int natts = tupdesc->natts;
3016 : int i;
3017 :
3018 : /* Don't take more array elements than there are */
3019 36 : pknumatts_arg = Min(pknumatts_arg, pkattnums_arg->dim1);
3020 :
3021 : /* Must have at least one pk attnum selected */
3022 36 : if (pknumatts_arg <= 0)
3023 0 : ereport(ERROR,
3024 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3025 : errmsg("number of key attributes must be > 0")));
3026 :
3027 : /* Allocate output array */
3028 36 : *pkattnums = palloc_array(int, pknumatts_arg);
3029 36 : *pknumatts = pknumatts_arg;
3030 :
3031 : /* Validate attnums and convert to internal form */
3032 114 : for (i = 0; i < pknumatts_arg; i++)
3033 : {
3034 90 : int pkattnum = pkattnums_arg->values[i];
3035 : int lnum;
3036 : int j;
3037 :
3038 : /* Can throw error immediately if out of range */
3039 90 : if (pkattnum <= 0 || pkattnum > natts)
3040 12 : ereport(ERROR,
3041 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3042 : errmsg("invalid attribute number %d", pkattnum)));
3043 :
3044 : /* Identify which physical column has this logical number */
3045 78 : lnum = 0;
3046 138 : for (j = 0; j < natts; j++)
3047 : {
3048 : /* dropped columns don't count */
3049 138 : if (TupleDescAttr(tupdesc, j)->attisdropped)
3050 6 : continue;
3051 :
3052 132 : if (++lnum == pkattnum)
3053 78 : break;
3054 : }
3055 :
3056 78 : if (j < natts)
3057 78 : (*pkattnums)[i] = j;
3058 : else
3059 0 : ereport(ERROR,
3060 : (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
3061 : errmsg("invalid attribute number %d", pkattnum)));
3062 : }
3063 24 : }
3064 :
3065 : /*
3066 : * Check if the specified connection option is valid.
3067 : *
3068 : * We basically allow whatever libpq thinks is an option, with these
3069 : * restrictions:
3070 : * debug options: disallowed
3071 : * "client_encoding": disallowed
3072 : * "user": valid only in USER MAPPING options
3073 : * secure options (eg password): valid only in USER MAPPING options
3074 : * others: valid only in FOREIGN SERVER options
3075 : *
3076 : * We disallow client_encoding because it would be overridden anyway via
3077 : * PQclientEncoding; allowing it to be specified would merely promote
3078 : * confusion.
3079 : */
3080 : static bool
3081 914 : is_valid_dblink_option(const PQconninfoOption *options, const char *option,
3082 : Oid context)
3083 : {
3084 : const PQconninfoOption *opt;
3085 :
3086 : /* Look up the option in libpq result */
3087 22160 : for (opt = options; opt->keyword; opt++)
3088 : {
3089 22150 : if (strcmp(opt->keyword, option) == 0)
3090 904 : break;
3091 : }
3092 914 : if (opt->keyword == NULL)
3093 10 : return false;
3094 :
3095 : /* Disallow debug options (particularly "replication") */
3096 904 : if (strchr(opt->dispchar, 'D'))
3097 68 : return false;
3098 :
3099 : /* Disallow "client_encoding" */
3100 836 : if (strcmp(opt->keyword, "client_encoding") == 0)
3101 16 : return false;
3102 :
3103 : /*
3104 : * Disallow OAuth options for now, since the builtin flow communicates on
3105 : * stderr by default and can't cache tokens yet.
3106 : */
3107 820 : if (strncmp(opt->keyword, "oauth_", strlen("oauth_")) == 0)
3108 72 : return false;
3109 :
3110 : /*
3111 : * If the option is "user" or marked secure, it should be specified only
3112 : * in USER MAPPING. Others should be specified only in SERVER.
3113 : */
3114 748 : if (strcmp(opt->keyword, "user") == 0 || strchr(opt->dispchar, '*'))
3115 : {
3116 74 : if (context != UserMappingRelationId)
3117 18 : return false;
3118 : }
3119 : else
3120 : {
3121 674 : if (context != ForeignServerRelationId)
3122 456 : return false;
3123 : }
3124 :
3125 274 : return true;
3126 : }
3127 :
3128 : /*
3129 : * Same as is_valid_dblink_option but also check for only dblink_fdw specific
3130 : * options.
3131 : */
3132 : static bool
3133 78 : is_valid_dblink_fdw_option(const PQconninfoOption *options, const char *option,
3134 : Oid context)
3135 : {
3136 78 : if (strcmp(option, "use_scram_passthrough") == 0)
3137 8 : return true;
3138 :
3139 70 : return is_valid_dblink_option(options, option, context);
3140 : }
3141 :
3142 : /*
3143 : * Copy the remote session's values of GUCs that affect datatype I/O
3144 : * and apply them locally in a new GUC nesting level. Returns the new
3145 : * nestlevel (which is needed by restoreLocalGucs to undo the settings),
3146 : * or -1 if no new nestlevel was needed.
3147 : *
3148 : * We use the equivalent of a function SET option to allow the settings to
3149 : * persist only until the caller calls restoreLocalGucs. If an error is
3150 : * thrown in between, guc.c will take care of undoing the settings.
3151 : */
3152 : static int
3153 68 : applyRemoteGucs(PGconn *conn)
3154 : {
3155 : static const char *const GUCsAffectingIO[] = {
3156 : "DateStyle",
3157 : "IntervalStyle"
3158 : };
3159 :
3160 68 : int nestlevel = -1;
3161 : int i;
3162 :
3163 204 : for (i = 0; i < lengthof(GUCsAffectingIO); i++)
3164 : {
3165 136 : const char *gucName = GUCsAffectingIO[i];
3166 136 : const char *remoteVal = PQparameterStatus(conn, gucName);
3167 : const char *localVal;
3168 :
3169 : /*
3170 : * If the remote server is pre-8.4, it won't have IntervalStyle, but
3171 : * that's okay because its output format won't be ambiguous. So just
3172 : * skip the GUC if we don't get a value for it. (We might eventually
3173 : * need more complicated logic with remote-version checks here.)
3174 : */
3175 136 : if (remoteVal == NULL)
3176 0 : continue;
3177 :
3178 : /*
3179 : * Avoid GUC-setting overhead if the remote and local GUCs already
3180 : * have the same value.
3181 : */
3182 136 : localVal = GetConfigOption(gucName, false, false);
3183 : Assert(localVal != NULL);
3184 :
3185 136 : if (strcmp(remoteVal, localVal) == 0)
3186 102 : continue;
3187 :
3188 : /* Create new GUC nest level if we didn't already */
3189 34 : if (nestlevel < 0)
3190 18 : nestlevel = NewGUCNestLevel();
3191 :
3192 : /* Apply the option (this will throw error on failure) */
3193 34 : (void) set_config_option(gucName, remoteVal,
3194 : PGC_USERSET, PGC_S_SESSION,
3195 : GUC_ACTION_SAVE, true, 0, false);
3196 : }
3197 :
3198 68 : return nestlevel;
3199 : }
3200 :
3201 : /*
3202 : * Restore local GUCs after they have been overlaid with remote settings.
3203 : */
3204 : static void
3205 70 : restoreLocalGucs(int nestlevel)
3206 : {
3207 : /* Do nothing if no new nestlevel was created */
3208 70 : if (nestlevel > 0)
3209 16 : AtEOXact_GUC(true, nestlevel);
3210 70 : }
3211 :
3212 : /*
3213 : * Append SCRAM client key and server key information from the global
3214 : * MyProcPort into the given StringInfo buffer.
3215 : */
3216 : static void
3217 6 : appendSCRAMKeysInfo(StringInfo buf)
3218 : {
3219 : int len;
3220 : int encoded_len;
3221 : char *client_key;
3222 : char *server_key;
3223 :
3224 6 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ClientKey));
3225 : /* don't forget the zero-terminator */
3226 6 : client_key = palloc0(len + 1);
3227 6 : encoded_len = pg_b64_encode(MyProcPort->scram_ClientKey,
3228 : sizeof(MyProcPort->scram_ClientKey),
3229 : client_key, len);
3230 6 : if (encoded_len < 0)
3231 0 : elog(ERROR, "could not encode SCRAM client key");
3232 :
3233 6 : len = pg_b64_enc_len(sizeof(MyProcPort->scram_ServerKey));
3234 : /* don't forget the zero-terminator */
3235 6 : server_key = palloc0(len + 1);
3236 6 : encoded_len = pg_b64_encode(MyProcPort->scram_ServerKey,
3237 : sizeof(MyProcPort->scram_ServerKey),
3238 : server_key, len);
3239 6 : if (encoded_len < 0)
3240 0 : elog(ERROR, "could not encode SCRAM server key");
3241 :
3242 6 : appendStringInfo(buf, "scram_client_key='%s' ", client_key);
3243 6 : appendStringInfo(buf, "scram_server_key='%s' ", server_key);
3244 6 : appendStringInfoString(buf, "require_auth='scram-sha-256' ");
3245 :
3246 6 : pfree(client_key);
3247 6 : pfree(server_key);
3248 6 : }
3249 :
3250 :
3251 : static bool
3252 6 : UseScramPassthrough(ForeignServer *foreign_server, UserMapping *user)
3253 : {
3254 : ListCell *cell;
3255 :
3256 24 : foreach(cell, foreign_server->options)
3257 : {
3258 24 : DefElem *def = lfirst(cell);
3259 :
3260 24 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3261 6 : return defGetBoolean(def);
3262 : }
3263 :
3264 0 : foreach(cell, user->options)
3265 : {
3266 0 : DefElem *def = (DefElem *) lfirst(cell);
3267 :
3268 0 : if (strcmp(def->defname, "use_scram_passthrough") == 0)
3269 0 : return defGetBoolean(def);
3270 : }
3271 :
3272 0 : return false;
3273 : }
|