Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * libpqwalreceiver.c
4 : *
5 : * This file contains the libpq-specific parts of walreceiver. It's
6 : * loaded as a dynamic module to avoid linking the main server binary with
7 : * libpq.
8 : *
9 : * Apart from walreceiver, the libpq-specific routines are now being used by
10 : * logical replication workers and slot synchronization.
11 : *
12 : * Portions Copyright (c) 2010-2024, PostgreSQL Global Development Group
13 : *
14 : *
15 : * IDENTIFICATION
16 : * src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
17 : *
18 : *-------------------------------------------------------------------------
19 : */
20 : #include "postgres.h"
21 :
22 : #include <unistd.h>
23 : #include <sys/time.h>
24 :
25 : #include "common/connect.h"
26 : #include "funcapi.h"
27 : #include "libpq-fe.h"
28 : #include "mb/pg_wchar.h"
29 : #include "miscadmin.h"
30 : #include "pgstat.h"
31 : #include "pqexpbuffer.h"
32 : #include "replication/walreceiver.h"
33 : #include "utils/builtins.h"
34 : #include "utils/memutils.h"
35 : #include "utils/pg_lsn.h"
36 : #include "utils/tuplestore.h"
37 :
38 1482 : PG_MODULE_MAGIC;
39 :
40 : struct WalReceiverConn
41 : {
42 : /* Current connection to the primary, if any */
43 : PGconn *streamConn;
44 : /* Used to remember if the connection is logical or physical */
45 : bool logical;
46 : /* Buffer for currently read records */
47 : char *recvBuf;
48 : };
49 :
50 : /* Prototypes for interface functions */
51 : static WalReceiverConn *libpqrcv_connect(const char *conninfo,
52 : bool replication, bool logical,
53 : bool must_use_password,
54 : const char *appname, char **err);
55 : static void libpqrcv_check_conninfo(const char *conninfo,
56 : bool must_use_password);
57 : static char *libpqrcv_get_conninfo(WalReceiverConn *conn);
58 : static void libpqrcv_get_senderinfo(WalReceiverConn *conn,
59 : char **sender_host, int *sender_port);
60 : static char *libpqrcv_identify_system(WalReceiverConn *conn,
61 : TimeLineID *primary_tli);
62 : static char *libpqrcv_get_dbname_from_conninfo(const char *conninfo);
63 : static int libpqrcv_server_version(WalReceiverConn *conn);
64 : static void libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
65 : TimeLineID tli, char **filename,
66 : char **content, int *len);
67 : static bool libpqrcv_startstreaming(WalReceiverConn *conn,
68 : const WalRcvStreamOptions *options);
69 : static void libpqrcv_endstreaming(WalReceiverConn *conn,
70 : TimeLineID *next_tli);
71 : static int libpqrcv_receive(WalReceiverConn *conn, char **buffer,
72 : pgsocket *wait_fd);
73 : static void libpqrcv_send(WalReceiverConn *conn, const char *buffer,
74 : int nbytes);
75 : static char *libpqrcv_create_slot(WalReceiverConn *conn,
76 : const char *slotname,
77 : bool temporary,
78 : bool two_phase,
79 : bool failover,
80 : CRSSnapshotAction snapshot_action,
81 : XLogRecPtr *lsn);
82 : static void libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
83 : bool failover);
84 : static pid_t libpqrcv_get_backend_pid(WalReceiverConn *conn);
85 : static WalRcvExecResult *libpqrcv_exec(WalReceiverConn *conn,
86 : const char *query,
87 : const int nRetTypes,
88 : const Oid *retTypes);
89 : static void libpqrcv_disconnect(WalReceiverConn *conn);
90 :
91 : static WalReceiverFunctionsType PQWalReceiverFunctions = {
92 : .walrcv_connect = libpqrcv_connect,
93 : .walrcv_check_conninfo = libpqrcv_check_conninfo,
94 : .walrcv_get_conninfo = libpqrcv_get_conninfo,
95 : .walrcv_get_senderinfo = libpqrcv_get_senderinfo,
96 : .walrcv_identify_system = libpqrcv_identify_system,
97 : .walrcv_server_version = libpqrcv_server_version,
98 : .walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile,
99 : .walrcv_startstreaming = libpqrcv_startstreaming,
100 : .walrcv_endstreaming = libpqrcv_endstreaming,
101 : .walrcv_receive = libpqrcv_receive,
102 : .walrcv_send = libpqrcv_send,
103 : .walrcv_create_slot = libpqrcv_create_slot,
104 : .walrcv_alter_slot = libpqrcv_alter_slot,
105 : .walrcv_get_dbname_from_conninfo = libpqrcv_get_dbname_from_conninfo,
106 : .walrcv_get_backend_pid = libpqrcv_get_backend_pid,
107 : .walrcv_exec = libpqrcv_exec,
108 : .walrcv_disconnect = libpqrcv_disconnect
109 : };
110 :
111 : /* Prototypes for private functions */
112 : static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
113 : static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
114 : static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
115 :
116 : /*
117 : * Module initialization function
118 : */
119 : void
120 1482 : _PG_init(void)
121 : {
122 1482 : if (WalReceiverFunctions != NULL)
123 0 : elog(ERROR, "libpqwalreceiver already loaded");
124 1482 : WalReceiverFunctions = &PQWalReceiverFunctions;
125 1482 : }
126 :
127 : /*
128 : * Establish the connection to the primary server.
129 : *
130 : * This function can be used for both replication and regular connections.
131 : * If it is a replication connection, it could be either logical or physical
132 : * based on input argument 'logical'.
133 : *
134 : * If an error occurs, this function will normally return NULL and set *err
135 : * to a palloc'ed error message. However, if must_use_password is true and
136 : * the connection fails to use the password, this function will ereport(ERROR).
137 : * We do this because in that case the error includes a detail and a hint for
138 : * consistency with other parts of the system, and it's not worth adding the
139 : * machinery to pass all of those back to the caller just to cover this one
140 : * case.
141 : */
142 : static WalReceiverConn *
143 1476 : libpqrcv_connect(const char *conninfo, bool replication, bool logical,
144 : bool must_use_password, const char *appname, char **err)
145 : {
146 : WalReceiverConn *conn;
147 : PostgresPollingStatusType status;
148 : const char *keys[6];
149 : const char *vals[6];
150 1476 : int i = 0;
151 :
152 : /*
153 : * Re-validate connection string. The validation already happened at DDL
154 : * time, but the subscription owner may have changed. If we don't recheck
155 : * with the correct must_use_password, it's possible that the connection
156 : * will obtain the password from a different source, such as PGPASSFILE or
157 : * PGPASSWORD.
158 : */
159 1476 : libpqrcv_check_conninfo(conninfo, must_use_password);
160 :
161 : /*
162 : * We use the expand_dbname parameter to process the connection string (or
163 : * URI), and pass some extra options.
164 : */
165 1466 : keys[i] = "dbname";
166 1466 : vals[i] = conninfo;
167 :
168 : /* We can not have logical without replication */
169 : Assert(replication || !logical);
170 :
171 1466 : if (replication)
172 : {
173 1444 : keys[++i] = "replication";
174 1444 : vals[i] = logical ? "database" : "true";
175 :
176 1444 : if (logical)
177 : {
178 : /* Tell the publisher to translate to our encoding */
179 1064 : keys[++i] = "client_encoding";
180 1064 : vals[i] = GetDatabaseEncodingName();
181 :
182 : /*
183 : * Force assorted GUC parameters to settings that ensure that the
184 : * publisher will output data values in a form that is unambiguous
185 : * to the subscriber. (We don't want to modify the subscriber's
186 : * GUC settings, since that might surprise user-defined code
187 : * running in the subscriber, such as triggers.) This should
188 : * match what pg_dump does.
189 : */
190 1064 : keys[++i] = "options";
191 1064 : vals[i] = "-c datestyle=ISO -c intervalstyle=postgres -c extra_float_digits=3";
192 : }
193 : else
194 : {
195 : /*
196 : * The database name is ignored by the server in replication mode,
197 : * but specify "replication" for .pgpass lookup.
198 : */
199 380 : keys[++i] = "dbname";
200 380 : vals[i] = "replication";
201 : }
202 : }
203 :
204 1466 : keys[++i] = "fallback_application_name";
205 1466 : vals[i] = appname;
206 :
207 1466 : keys[++i] = NULL;
208 1466 : vals[i] = NULL;
209 :
210 : Assert(i < sizeof(keys));
211 :
212 1466 : conn = palloc0(sizeof(WalReceiverConn));
213 1466 : conn->streamConn = PQconnectStartParams(keys, vals,
214 : /* expand_dbname = */ true);
215 1466 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
216 142 : goto bad_connection_errmsg;
217 :
218 : /*
219 : * Poll connection until we have OK or FAILED status.
220 : *
221 : * Per spec for PQconnectPoll, first wait till socket is write-ready.
222 : */
223 1324 : status = PGRES_POLLING_WRITING;
224 : do
225 : {
226 : int io_flag;
227 : int rc;
228 :
229 3632 : if (status == PGRES_POLLING_READING)
230 1328 : io_flag = WL_SOCKET_READABLE;
231 : #ifdef WIN32
232 : /* Windows needs a different test while waiting for connection-made */
233 : else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
234 : io_flag = WL_SOCKET_CONNECTED;
235 : #endif
236 : else
237 2304 : io_flag = WL_SOCKET_WRITEABLE;
238 :
239 3632 : rc = WaitLatchOrSocket(MyLatch,
240 : WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
241 3632 : PQsocket(conn->streamConn),
242 : 0,
243 : WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
244 :
245 : /* Interrupted? */
246 3632 : if (rc & WL_LATCH_SET)
247 : {
248 986 : ResetLatch(MyLatch);
249 986 : ProcessWalRcvInterrupts();
250 : }
251 :
252 : /* If socket is ready, advance the libpq state machine */
253 3628 : if (rc & io_flag)
254 2646 : status = PQconnectPoll(conn->streamConn);
255 3628 : } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
256 :
257 1320 : if (PQstatus(conn->streamConn) != CONNECTION_OK)
258 20 : goto bad_connection_errmsg;
259 :
260 1300 : if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
261 : {
262 0 : PQfinish(conn->streamConn);
263 0 : pfree(conn);
264 :
265 0 : ereport(ERROR,
266 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
267 : errmsg("password is required"),
268 : errdetail("Non-superuser cannot connect if the server does not request a password."),
269 : errhint("Target server's authentication method must be changed, or set password_required=false in the subscription parameters.")));
270 : }
271 :
272 : /*
273 : * Set always-secure search path for the cases where the connection is
274 : * used to run SQL queries, so malicious users can't get control.
275 : */
276 1300 : if (!replication || logical)
277 : {
278 : PGresult *res;
279 :
280 1048 : res = libpqrcv_PQexec(conn->streamConn,
281 : ALWAYS_SECURE_SEARCH_PATH_SQL);
282 1048 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
283 : {
284 0 : PQclear(res);
285 0 : *err = psprintf(_("could not clear search path: %s"),
286 0 : pchomp(PQerrorMessage(conn->streamConn)));
287 0 : goto bad_connection;
288 : }
289 1048 : PQclear(res);
290 : }
291 :
292 1300 : conn->logical = logical;
293 :
294 1300 : return conn;
295 :
296 : /* error path, using libpq's error message */
297 162 : bad_connection_errmsg:
298 162 : *err = pchomp(PQerrorMessage(conn->streamConn));
299 :
300 : /* error path, error already set */
301 162 : bad_connection:
302 162 : PQfinish(conn->streamConn);
303 162 : pfree(conn);
304 162 : return NULL;
305 : }
306 :
307 : /*
308 : * Validate connection info string, and determine whether it might cause
309 : * local filesystem access to be attempted.
310 : *
311 : * If the connection string can't be parsed, this function will raise
312 : * an error and will not return. If it can, it will return true if this
313 : * connection string specifies a password and false otherwise.
314 : */
315 : static void
316 1804 : libpqrcv_check_conninfo(const char *conninfo, bool must_use_password)
317 : {
318 1804 : PQconninfoOption *opts = NULL;
319 : PQconninfoOption *opt;
320 1804 : char *err = NULL;
321 :
322 1804 : opts = PQconninfoParse(conninfo, &err);
323 1804 : if (opts == NULL)
324 : {
325 : /* The error string is malloc'd, so we must free it explicitly */
326 18 : char *errcopy = err ? pstrdup(err) : "out of memory";
327 :
328 18 : PQfreemem(err);
329 18 : ereport(ERROR,
330 : (errcode(ERRCODE_SYNTAX_ERROR),
331 : errmsg("invalid connection string syntax: %s", errcopy)));
332 : }
333 :
334 1786 : if (must_use_password)
335 : {
336 26 : bool uses_password = false;
337 :
338 702 : for (opt = opts; opt->keyword != NULL; ++opt)
339 : {
340 : /* Ignore connection options that are not present. */
341 686 : if (opt->val == NULL)
342 632 : continue;
343 :
344 54 : if (strcmp(opt->keyword, "password") == 0 && opt->val[0] != '\0')
345 : {
346 10 : uses_password = true;
347 10 : break;
348 : }
349 : }
350 :
351 26 : if (!uses_password)
352 : {
353 : /* malloc'd, so we must free it explicitly */
354 16 : PQconninfoFree(opts);
355 :
356 16 : ereport(ERROR,
357 : (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
358 : errmsg("password is required"),
359 : errdetail("Non-superusers must provide a password in the connection string.")));
360 : }
361 : }
362 :
363 1770 : PQconninfoFree(opts);
364 1770 : }
365 :
366 : /*
367 : * Return a user-displayable conninfo string. Any security-sensitive fields
368 : * are obfuscated.
369 : */
370 : static char *
371 252 : libpqrcv_get_conninfo(WalReceiverConn *conn)
372 : {
373 : PQconninfoOption *conn_opts;
374 : PQconninfoOption *conn_opt;
375 : PQExpBufferData buf;
376 : char *retval;
377 :
378 : Assert(conn->streamConn != NULL);
379 :
380 252 : initPQExpBuffer(&buf);
381 252 : conn_opts = PQconninfo(conn->streamConn);
382 :
383 252 : if (conn_opts == NULL)
384 0 : ereport(ERROR,
385 : (errcode(ERRCODE_OUT_OF_MEMORY),
386 : errmsg("could not parse connection string: %s",
387 : _("out of memory"))));
388 :
389 : /* build a clean connection string from pieces */
390 10584 : for (conn_opt = conn_opts; conn_opt->keyword != NULL; conn_opt++)
391 : {
392 : bool obfuscate;
393 :
394 : /* Skip debug and empty options */
395 10332 : if (strchr(conn_opt->dispchar, 'D') ||
396 10080 : conn_opt->val == NULL ||
397 4806 : conn_opt->val[0] == '\0')
398 5778 : continue;
399 :
400 : /* Obfuscate security-sensitive options */
401 4554 : obfuscate = strchr(conn_opt->dispchar, '*') != NULL;
402 :
403 9108 : appendPQExpBuffer(&buf, "%s%s=%s",
404 4554 : buf.len == 0 ? "" : " ",
405 : conn_opt->keyword,
406 : obfuscate ? "********" : conn_opt->val);
407 : }
408 :
409 252 : PQconninfoFree(conn_opts);
410 :
411 252 : retval = PQExpBufferDataBroken(buf) ? NULL : pstrdup(buf.data);
412 252 : termPQExpBuffer(&buf);
413 252 : return retval;
414 : }
415 :
416 : /*
417 : * Provides information of sender this WAL receiver is connected to.
418 : */
419 : static void
420 252 : libpqrcv_get_senderinfo(WalReceiverConn *conn, char **sender_host,
421 : int *sender_port)
422 : {
423 252 : char *ret = NULL;
424 :
425 252 : *sender_host = NULL;
426 252 : *sender_port = 0;
427 :
428 : Assert(conn->streamConn != NULL);
429 :
430 252 : ret = PQhost(conn->streamConn);
431 252 : if (ret && strlen(ret) != 0)
432 252 : *sender_host = pstrdup(ret);
433 :
434 252 : ret = PQport(conn->streamConn);
435 252 : if (ret && strlen(ret) != 0)
436 252 : *sender_port = atoi(ret);
437 252 : }
438 :
439 : /*
440 : * Check that primary's system identifier matches ours, and fetch the current
441 : * timeline ID of the primary.
442 : */
443 : static char *
444 576 : libpqrcv_identify_system(WalReceiverConn *conn, TimeLineID *primary_tli)
445 : {
446 : PGresult *res;
447 : char *primary_sysid;
448 :
449 : /*
450 : * Get the system identifier and timeline ID as a DataRow message from the
451 : * primary server.
452 : */
453 576 : res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
454 576 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
455 : {
456 0 : PQclear(res);
457 0 : ereport(ERROR,
458 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
459 : errmsg("could not receive database system identifier and timeline ID from "
460 : "the primary server: %s",
461 : pchomp(PQerrorMessage(conn->streamConn)))));
462 : }
463 :
464 : /*
465 : * IDENTIFY_SYSTEM returns 3 columns in 9.3 and earlier, and 4 columns in
466 : * 9.4 and onwards.
467 : */
468 576 : if (PQnfields(res) < 3 || PQntuples(res) != 1)
469 : {
470 0 : int ntuples = PQntuples(res);
471 0 : int nfields = PQnfields(res);
472 :
473 0 : PQclear(res);
474 0 : ereport(ERROR,
475 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
476 : errmsg("invalid response from primary server"),
477 : errdetail("Could not identify system: got %d rows and %d fields, expected %d rows and %d or more fields.",
478 : ntuples, nfields, 1, 3)));
479 : }
480 576 : primary_sysid = pstrdup(PQgetvalue(res, 0, 0));
481 576 : *primary_tli = pg_strtoint32(PQgetvalue(res, 0, 1));
482 576 : PQclear(res);
483 :
484 576 : return primary_sysid;
485 : }
486 :
487 : /*
488 : * Thin wrapper around libpq to obtain server version.
489 : */
490 : static int
491 2252 : libpqrcv_server_version(WalReceiverConn *conn)
492 : {
493 2252 : return PQserverVersion(conn->streamConn);
494 : }
495 :
496 : /*
497 : * Get database name from the primary server's conninfo.
498 : *
499 : * If dbname is not found in connInfo, return NULL value.
500 : */
501 : static char *
502 24 : libpqrcv_get_dbname_from_conninfo(const char *connInfo)
503 : {
504 : PQconninfoOption *opts;
505 24 : char *dbname = NULL;
506 24 : char *err = NULL;
507 :
508 24 : opts = PQconninfoParse(connInfo, &err);
509 24 : if (opts == NULL)
510 : {
511 : /* The error string is malloc'd, so we must free it explicitly */
512 0 : char *errcopy = err ? pstrdup(err) : "out of memory";
513 :
514 0 : PQfreemem(err);
515 0 : ereport(ERROR,
516 : (errcode(ERRCODE_SYNTAX_ERROR),
517 : errmsg("invalid connection string syntax: %s", errcopy)));
518 : }
519 :
520 1008 : for (PQconninfoOption *opt = opts; opt->keyword != NULL; ++opt)
521 : {
522 : /*
523 : * If multiple dbnames are specified, then the last one will be
524 : * returned
525 : */
526 984 : if (strcmp(opt->keyword, "dbname") == 0 && opt->val &&
527 22 : *opt->val)
528 : {
529 22 : if (dbname)
530 0 : pfree(dbname);
531 :
532 22 : dbname = pstrdup(opt->val);
533 : }
534 : }
535 :
536 24 : PQconninfoFree(opts);
537 24 : return dbname;
538 : }
539 :
540 : /*
541 : * Start streaming WAL data from given streaming options.
542 : *
543 : * Returns true if we switched successfully to copy-both mode. False
544 : * means the server received the command and executed it successfully, but
545 : * didn't switch to copy-mode. That means that there was no WAL on the
546 : * requested timeline and starting point, because the server switched to
547 : * another timeline at or before the requested starting point. On failure,
548 : * throws an ERROR.
549 : */
550 : static bool
551 896 : libpqrcv_startstreaming(WalReceiverConn *conn,
552 : const WalRcvStreamOptions *options)
553 : {
554 : StringInfoData cmd;
555 : PGresult *res;
556 :
557 : Assert(options->logical == conn->logical);
558 : Assert(options->slotname || !options->logical);
559 :
560 896 : initStringInfo(&cmd);
561 :
562 : /* Build the command. */
563 896 : appendStringInfoString(&cmd, "START_REPLICATION");
564 896 : if (options->slotname != NULL)
565 722 : appendStringInfo(&cmd, " SLOT \"%s\"",
566 : options->slotname);
567 :
568 896 : if (options->logical)
569 644 : appendStringInfoString(&cmd, " LOGICAL");
570 :
571 896 : appendStringInfo(&cmd, " %X/%X", LSN_FORMAT_ARGS(options->startpoint));
572 :
573 : /*
574 : * Additional options are different depending on if we are doing logical
575 : * or physical replication.
576 : */
577 896 : if (options->logical)
578 : {
579 : char *pubnames_str;
580 : List *pubnames;
581 : char *pubnames_literal;
582 :
583 644 : appendStringInfoString(&cmd, " (");
584 :
585 644 : appendStringInfo(&cmd, "proto_version '%u'",
586 : options->proto.logical.proto_version);
587 :
588 644 : if (options->proto.logical.streaming_str)
589 66 : appendStringInfo(&cmd, ", streaming '%s'",
590 : options->proto.logical.streaming_str);
591 :
592 652 : if (options->proto.logical.twophase &&
593 8 : PQserverVersion(conn->streamConn) >= 150000)
594 8 : appendStringInfoString(&cmd, ", two_phase 'on'");
595 :
596 1288 : if (options->proto.logical.origin &&
597 644 : PQserverVersion(conn->streamConn) >= 160000)
598 644 : appendStringInfo(&cmd, ", origin '%s'",
599 : options->proto.logical.origin);
600 :
601 644 : pubnames = options->proto.logical.publication_names;
602 644 : pubnames_str = stringlist_to_identifierstr(conn->streamConn, pubnames);
603 644 : if (!pubnames_str)
604 0 : ereport(ERROR,
605 : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
606 : errmsg("could not start WAL streaming: %s",
607 : pchomp(PQerrorMessage(conn->streamConn)))));
608 644 : pubnames_literal = PQescapeLiteral(conn->streamConn, pubnames_str,
609 : strlen(pubnames_str));
610 644 : if (!pubnames_literal)
611 0 : ereport(ERROR,
612 : (errcode(ERRCODE_OUT_OF_MEMORY), /* likely guess */
613 : errmsg("could not start WAL streaming: %s",
614 : pchomp(PQerrorMessage(conn->streamConn)))));
615 644 : appendStringInfo(&cmd, ", publication_names %s", pubnames_literal);
616 644 : PQfreemem(pubnames_literal);
617 644 : pfree(pubnames_str);
618 :
619 664 : if (options->proto.logical.binary &&
620 20 : PQserverVersion(conn->streamConn) >= 140000)
621 20 : appendStringInfoString(&cmd, ", binary 'true'");
622 :
623 644 : appendStringInfoChar(&cmd, ')');
624 : }
625 : else
626 252 : appendStringInfo(&cmd, " TIMELINE %u",
627 : options->proto.physical.startpointTLI);
628 :
629 : /* Start streaming. */
630 896 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
631 894 : pfree(cmd.data);
632 :
633 894 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
634 : {
635 0 : PQclear(res);
636 0 : return false;
637 : }
638 894 : else if (PQresultStatus(res) != PGRES_COPY_BOTH)
639 : {
640 0 : PQclear(res);
641 0 : ereport(ERROR,
642 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
643 : errmsg("could not start WAL streaming: %s",
644 : pchomp(PQerrorMessage(conn->streamConn)))));
645 : }
646 894 : PQclear(res);
647 894 : return true;
648 : }
649 :
650 : /*
651 : * Stop streaming WAL data. Returns the next timeline's ID in *next_tli, as
652 : * reported by the server, or 0 if it did not report it.
653 : */
654 : static void
655 400 : libpqrcv_endstreaming(WalReceiverConn *conn, TimeLineID *next_tli)
656 : {
657 : PGresult *res;
658 :
659 : /*
660 : * Send copy-end message. As in libpqrcv_PQexec, this could theoretically
661 : * block, but the risk seems small.
662 : */
663 744 : if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
664 344 : PQflush(conn->streamConn))
665 56 : ereport(ERROR,
666 : (errcode(ERRCODE_CONNECTION_FAILURE),
667 : errmsg("could not send end-of-streaming message to primary: %s",
668 : pchomp(PQerrorMessage(conn->streamConn)))));
669 :
670 344 : *next_tli = 0;
671 :
672 : /*
673 : * After COPY is finished, we should receive a result set indicating the
674 : * next timeline's ID, or just CommandComplete if the server was shut
675 : * down.
676 : *
677 : * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
678 : * also possible in case we aborted the copy in mid-stream.
679 : */
680 344 : res = libpqrcv_PQgetResult(conn->streamConn);
681 344 : if (PQresultStatus(res) == PGRES_TUPLES_OK)
682 : {
683 : /*
684 : * Read the next timeline's ID. The server also sends the timeline's
685 : * starting point, but it is ignored.
686 : */
687 24 : if (PQnfields(res) < 2 || PQntuples(res) != 1)
688 0 : ereport(ERROR,
689 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
690 : errmsg("unexpected result set after end-of-streaming")));
691 24 : *next_tli = pg_strtoint32(PQgetvalue(res, 0, 0));
692 24 : PQclear(res);
693 :
694 : /* the result set should be followed by CommandComplete */
695 24 : res = libpqrcv_PQgetResult(conn->streamConn);
696 : }
697 320 : else if (PQresultStatus(res) == PGRES_COPY_OUT)
698 : {
699 320 : PQclear(res);
700 :
701 : /* End the copy */
702 320 : if (PQendcopy(conn->streamConn))
703 0 : ereport(ERROR,
704 : (errcode(ERRCODE_CONNECTION_FAILURE),
705 : errmsg("error while shutting down streaming COPY: %s",
706 : pchomp(PQerrorMessage(conn->streamConn)))));
707 :
708 : /* CommandComplete should follow */
709 320 : res = libpqrcv_PQgetResult(conn->streamConn);
710 : }
711 :
712 344 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
713 0 : ereport(ERROR,
714 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
715 : errmsg("error reading result of streaming command: %s",
716 : pchomp(PQerrorMessage(conn->streamConn)))));
717 344 : PQclear(res);
718 :
719 : /* Verify that there are no more results */
720 344 : res = libpqrcv_PQgetResult(conn->streamConn);
721 344 : if (res != NULL)
722 0 : ereport(ERROR,
723 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
724 : errmsg("unexpected result after CommandComplete: %s",
725 : pchomp(PQerrorMessage(conn->streamConn)))));
726 344 : }
727 :
728 : /*
729 : * Fetch the timeline history file for 'tli' from primary.
730 : */
731 : static void
732 22 : libpqrcv_readtimelinehistoryfile(WalReceiverConn *conn,
733 : TimeLineID tli, char **filename,
734 : char **content, int *len)
735 : {
736 : PGresult *res;
737 : char cmd[64];
738 :
739 : Assert(!conn->logical);
740 :
741 : /*
742 : * Request the primary to send over the history file for given timeline.
743 : */
744 22 : snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
745 22 : res = libpqrcv_PQexec(conn->streamConn, cmd);
746 22 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
747 : {
748 0 : PQclear(res);
749 0 : ereport(ERROR,
750 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
751 : errmsg("could not receive timeline history file from "
752 : "the primary server: %s",
753 : pchomp(PQerrorMessage(conn->streamConn)))));
754 : }
755 22 : if (PQnfields(res) != 2 || PQntuples(res) != 1)
756 : {
757 0 : int ntuples = PQntuples(res);
758 0 : int nfields = PQnfields(res);
759 :
760 0 : PQclear(res);
761 0 : ereport(ERROR,
762 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
763 : errmsg("invalid response from primary server"),
764 : errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
765 : ntuples, nfields)));
766 : }
767 22 : *filename = pstrdup(PQgetvalue(res, 0, 0));
768 :
769 22 : *len = PQgetlength(res, 0, 1);
770 22 : *content = palloc(*len);
771 22 : memcpy(*content, PQgetvalue(res, 0, 1), *len);
772 22 : PQclear(res);
773 22 : }
774 :
775 : /*
776 : * Send a query and wait for the results by using the asynchronous libpq
777 : * functions and socket readiness events.
778 : *
779 : * The function is modeled on libpqsrv_exec(), with the behavior difference
780 : * being that it calls ProcessWalRcvInterrupts(). As an optimization, it
781 : * skips try/catch, since all errors terminate the process.
782 : *
783 : * May return NULL, rather than an error result, on failure.
784 : */
785 : static PGresult *
786 6394 : libpqrcv_PQexec(PGconn *streamConn, const char *query)
787 : {
788 6394 : PGresult *lastResult = NULL;
789 :
790 : /*
791 : * PQexec() silently discards any prior query results on the connection.
792 : * This is not required for this function as it's expected that the caller
793 : * (which is this library in all cases) will behave correctly and we don't
794 : * have to be backwards compatible with old libpq.
795 : */
796 :
797 : /*
798 : * Submit the query. Since we don't use non-blocking mode, this could
799 : * theoretically block. In practice, since we don't send very long query
800 : * strings, the risk seems negligible.
801 : */
802 6394 : if (!PQsendQuery(streamConn, query))
803 0 : return NULL;
804 :
805 : for (;;)
806 5160 : {
807 : /* Wait for, and collect, the next PGresult. */
808 : PGresult *result;
809 :
810 11554 : result = libpqrcv_PQgetResult(streamConn);
811 11552 : if (result == NULL)
812 5160 : break; /* query is complete, or failure */
813 :
814 : /*
815 : * Emulate PQexec()'s behavior of returning the last result when there
816 : * are many. We are fine with returning just last error message.
817 : */
818 6392 : PQclear(lastResult);
819 6392 : lastResult = result;
820 :
821 12784 : if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
822 12446 : PQresultStatus(lastResult) == PGRES_COPY_OUT ||
823 11214 : PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
824 5160 : PQstatus(streamConn) == CONNECTION_BAD)
825 : break;
826 : }
827 :
828 6392 : return lastResult;
829 : }
830 :
831 : /*
832 : * Perform the equivalent of PQgetResult(), but watch for interrupts.
833 : */
834 : static PGresult *
835 13400 : libpqrcv_PQgetResult(PGconn *streamConn)
836 : {
837 : /*
838 : * Collect data until PQgetResult is ready to get the result without
839 : * blocking.
840 : */
841 19818 : while (PQisBusy(streamConn))
842 : {
843 : int rc;
844 :
845 : /*
846 : * We don't need to break down the sleep into smaller increments,
847 : * since we'll get interrupted by signals and can handle any
848 : * interrupts here.
849 : */
850 6476 : rc = WaitLatchOrSocket(MyLatch,
851 : WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
852 : WL_LATCH_SET,
853 : PQsocket(streamConn),
854 : 0,
855 : WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
856 :
857 : /* Interrupted? */
858 6476 : if (rc & WL_LATCH_SET)
859 : {
860 6 : ResetLatch(MyLatch);
861 6 : ProcessWalRcvInterrupts();
862 : }
863 :
864 : /* Consume whatever data is available from the socket */
865 6474 : if (PQconsumeInput(streamConn) == 0)
866 : {
867 : /* trouble; return NULL */
868 56 : return NULL;
869 : }
870 : }
871 :
872 : /* Now we can collect and return the next PGresult */
873 13342 : return PQgetResult(streamConn);
874 : }
875 :
876 : /*
877 : * Disconnect connection to primary, if any.
878 : */
879 : static void
880 1300 : libpqrcv_disconnect(WalReceiverConn *conn)
881 : {
882 1300 : PQfinish(conn->streamConn);
883 1300 : PQfreemem(conn->recvBuf);
884 1300 : pfree(conn);
885 1300 : }
886 :
887 : /*
888 : * Receive a message available from XLOG stream.
889 : *
890 : * Returns:
891 : *
892 : * If data was received, returns the length of the data. *buffer is set to
893 : * point to a buffer holding the received message. The buffer is only valid
894 : * until the next libpqrcv_* call.
895 : *
896 : * If no data was available immediately, returns 0, and *wait_fd is set to a
897 : * socket descriptor which can be waited on before trying again.
898 : *
899 : * -1 if the server ended the COPY.
900 : *
901 : * ereports on error.
902 : */
903 : static int
904 591398 : libpqrcv_receive(WalReceiverConn *conn, char **buffer,
905 : pgsocket *wait_fd)
906 : {
907 : int rawlen;
908 :
909 591398 : PQfreemem(conn->recvBuf);
910 591398 : conn->recvBuf = NULL;
911 :
912 : /* Try to receive a CopyData message */
913 591398 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
914 591398 : if (rawlen == 0)
915 : {
916 : /* Try consuming some data. */
917 346216 : if (PQconsumeInput(conn->streamConn) == 0)
918 90 : ereport(ERROR,
919 : (errcode(ERRCODE_CONNECTION_FAILURE),
920 : errmsg("could not receive data from WAL stream: %s",
921 : pchomp(PQerrorMessage(conn->streamConn)))));
922 :
923 : /* Now that we've consumed some input, try again */
924 346126 : rawlen = PQgetCopyData(conn->streamConn, &conn->recvBuf, 1);
925 346126 : if (rawlen == 0)
926 : {
927 : /* Tell caller to try again when our socket is ready. */
928 144160 : *wait_fd = PQsocket(conn->streamConn);
929 144160 : return 0;
930 : }
931 : }
932 447148 : if (rawlen == -1) /* end-of-streaming or error */
933 : {
934 : PGresult *res;
935 :
936 424 : res = libpqrcv_PQgetResult(conn->streamConn);
937 424 : if (PQresultStatus(res) == PGRES_COMMAND_OK)
938 : {
939 390 : PQclear(res);
940 :
941 : /* Verify that there are no more results. */
942 390 : res = libpqrcv_PQgetResult(conn->streamConn);
943 390 : if (res != NULL)
944 : {
945 0 : PQclear(res);
946 :
947 : /*
948 : * If the other side closed the connection orderly (otherwise
949 : * we'd seen an error, or PGRES_COPY_IN) don't report an error
950 : * here, but let callers deal with it.
951 : */
952 0 : if (PQstatus(conn->streamConn) == CONNECTION_BAD)
953 0 : return -1;
954 :
955 0 : ereport(ERROR,
956 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
957 : errmsg("unexpected result after CommandComplete: %s",
958 : PQerrorMessage(conn->streamConn))));
959 : }
960 :
961 390 : return -1;
962 : }
963 34 : else if (PQresultStatus(res) == PGRES_COPY_IN)
964 : {
965 24 : PQclear(res);
966 24 : return -1;
967 : }
968 : else
969 : {
970 10 : PQclear(res);
971 10 : ereport(ERROR,
972 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
973 : errmsg("could not receive data from WAL stream: %s",
974 : pchomp(PQerrorMessage(conn->streamConn)))));
975 : }
976 : }
977 446724 : if (rawlen < -1)
978 0 : ereport(ERROR,
979 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
980 : errmsg("could not receive data from WAL stream: %s",
981 : pchomp(PQerrorMessage(conn->streamConn)))));
982 :
983 : /* Return received messages to caller */
984 446724 : *buffer = conn->recvBuf;
985 446724 : return rawlen;
986 : }
987 :
988 : /*
989 : * Send a message to XLOG stream.
990 : *
991 : * ereports on error.
992 : */
993 : static void
994 158056 : libpqrcv_send(WalReceiverConn *conn, const char *buffer, int nbytes)
995 : {
996 316112 : if (PQputCopyData(conn->streamConn, buffer, nbytes) <= 0 ||
997 158056 : PQflush(conn->streamConn))
998 2 : ereport(ERROR,
999 : (errcode(ERRCODE_CONNECTION_FAILURE),
1000 : errmsg("could not send data to WAL stream: %s",
1001 : pchomp(PQerrorMessage(conn->streamConn)))));
1002 158054 : }
1003 :
1004 : /*
1005 : * Create new replication slot.
1006 : * Returns the name of the exported snapshot for logical slot or NULL for
1007 : * physical slot.
1008 : */
1009 : static char *
1010 526 : libpqrcv_create_slot(WalReceiverConn *conn, const char *slotname,
1011 : bool temporary, bool two_phase, bool failover,
1012 : CRSSnapshotAction snapshot_action, XLogRecPtr *lsn)
1013 : {
1014 : PGresult *res;
1015 : StringInfoData cmd;
1016 : char *snapshot;
1017 : int use_new_options_syntax;
1018 :
1019 526 : use_new_options_syntax = (PQserverVersion(conn->streamConn) >= 150000);
1020 :
1021 526 : initStringInfo(&cmd);
1022 :
1023 526 : appendStringInfo(&cmd, "CREATE_REPLICATION_SLOT \"%s\"", slotname);
1024 :
1025 526 : if (temporary)
1026 0 : appendStringInfoString(&cmd, " TEMPORARY");
1027 :
1028 526 : if (conn->logical)
1029 : {
1030 526 : appendStringInfoString(&cmd, " LOGICAL pgoutput ");
1031 526 : if (use_new_options_syntax)
1032 526 : appendStringInfoChar(&cmd, '(');
1033 526 : if (two_phase)
1034 : {
1035 2 : appendStringInfoString(&cmd, "TWO_PHASE");
1036 2 : if (use_new_options_syntax)
1037 2 : appendStringInfoString(&cmd, ", ");
1038 : else
1039 0 : appendStringInfoChar(&cmd, ' ');
1040 : }
1041 :
1042 526 : if (failover)
1043 : {
1044 12 : appendStringInfoString(&cmd, "FAILOVER");
1045 12 : if (use_new_options_syntax)
1046 12 : appendStringInfoString(&cmd, ", ");
1047 : else
1048 0 : appendStringInfoChar(&cmd, ' ');
1049 : }
1050 :
1051 526 : if (use_new_options_syntax)
1052 : {
1053 526 : switch (snapshot_action)
1054 : {
1055 0 : case CRS_EXPORT_SNAPSHOT:
1056 0 : appendStringInfoString(&cmd, "SNAPSHOT 'export'");
1057 0 : break;
1058 188 : case CRS_NOEXPORT_SNAPSHOT:
1059 188 : appendStringInfoString(&cmd, "SNAPSHOT 'nothing'");
1060 188 : break;
1061 338 : case CRS_USE_SNAPSHOT:
1062 338 : appendStringInfoString(&cmd, "SNAPSHOT 'use'");
1063 338 : break;
1064 : }
1065 526 : }
1066 : else
1067 : {
1068 0 : switch (snapshot_action)
1069 : {
1070 0 : case CRS_EXPORT_SNAPSHOT:
1071 0 : appendStringInfoString(&cmd, "EXPORT_SNAPSHOT");
1072 0 : break;
1073 0 : case CRS_NOEXPORT_SNAPSHOT:
1074 0 : appendStringInfoString(&cmd, "NOEXPORT_SNAPSHOT");
1075 0 : break;
1076 0 : case CRS_USE_SNAPSHOT:
1077 0 : appendStringInfoString(&cmd, "USE_SNAPSHOT");
1078 0 : break;
1079 : }
1080 526 : }
1081 :
1082 526 : if (use_new_options_syntax)
1083 526 : appendStringInfoChar(&cmd, ')');
1084 : }
1085 : else
1086 : {
1087 0 : if (use_new_options_syntax)
1088 0 : appendStringInfoString(&cmd, " PHYSICAL (RESERVE_WAL)");
1089 : else
1090 0 : appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
1091 : }
1092 :
1093 526 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1094 526 : pfree(cmd.data);
1095 :
1096 526 : if (PQresultStatus(res) != PGRES_TUPLES_OK)
1097 : {
1098 0 : PQclear(res);
1099 0 : ereport(ERROR,
1100 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1101 : errmsg("could not create replication slot \"%s\": %s",
1102 : slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1103 : }
1104 :
1105 526 : if (lsn)
1106 338 : *lsn = DatumGetLSN(DirectFunctionCall1Coll(pg_lsn_in, InvalidOid,
1107 338 : CStringGetDatum(PQgetvalue(res, 0, 1))));
1108 :
1109 526 : if (!PQgetisnull(res, 0, 2))
1110 0 : snapshot = pstrdup(PQgetvalue(res, 0, 2));
1111 : else
1112 526 : snapshot = NULL;
1113 :
1114 526 : PQclear(res);
1115 :
1116 526 : return snapshot;
1117 : }
1118 :
1119 : /*
1120 : * Change the definition of the replication slot.
1121 : */
1122 : static void
1123 4 : libpqrcv_alter_slot(WalReceiverConn *conn, const char *slotname,
1124 : bool failover)
1125 : {
1126 : StringInfoData cmd;
1127 : PGresult *res;
1128 :
1129 4 : initStringInfo(&cmd);
1130 4 : appendStringInfo(&cmd, "ALTER_REPLICATION_SLOT %s ( FAILOVER %s )",
1131 : quote_identifier(slotname),
1132 : failover ? "true" : "false");
1133 :
1134 4 : res = libpqrcv_PQexec(conn->streamConn, cmd.data);
1135 4 : pfree(cmd.data);
1136 :
1137 4 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
1138 0 : ereport(ERROR,
1139 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1140 : errmsg("could not alter replication slot \"%s\": %s",
1141 : slotname, pchomp(PQerrorMessage(conn->streamConn)))));
1142 :
1143 4 : PQclear(res);
1144 4 : }
1145 :
1146 : /*
1147 : * Return PID of remote backend process.
1148 : */
1149 : static pid_t
1150 0 : libpqrcv_get_backend_pid(WalReceiverConn *conn)
1151 : {
1152 0 : return PQbackendPID(conn->streamConn);
1153 : }
1154 :
1155 : /*
1156 : * Convert tuple query result to tuplestore.
1157 : */
1158 : static void
1159 1886 : libpqrcv_processTuples(PGresult *pgres, WalRcvExecResult *walres,
1160 : const int nRetTypes, const Oid *retTypes)
1161 : {
1162 : int tupn;
1163 : int coln;
1164 1886 : int nfields = PQnfields(pgres);
1165 : HeapTuple tuple;
1166 : AttInMetadata *attinmeta;
1167 : MemoryContext rowcontext;
1168 : MemoryContext oldcontext;
1169 :
1170 : /* Make sure we got expected number of fields. */
1171 1886 : if (nfields != nRetTypes)
1172 0 : ereport(ERROR,
1173 : (errcode(ERRCODE_PROTOCOL_VIOLATION),
1174 : errmsg("invalid query response"),
1175 : errdetail("Expected %d fields, got %d fields.",
1176 : nRetTypes, nfields)));
1177 :
1178 1886 : walres->tuplestore = tuplestore_begin_heap(true, false, work_mem);
1179 :
1180 : /* Create tuple descriptor corresponding to expected result. */
1181 1886 : walres->tupledesc = CreateTemplateTupleDesc(nRetTypes);
1182 6236 : for (coln = 0; coln < nRetTypes; coln++)
1183 4350 : TupleDescInitEntry(walres->tupledesc, (AttrNumber) coln + 1,
1184 4350 : PQfname(pgres, coln), retTypes[coln], -1, 0);
1185 1886 : attinmeta = TupleDescGetAttInMetadata(walres->tupledesc);
1186 :
1187 : /* No point in doing more here if there were no tuples returned. */
1188 1886 : if (PQntuples(pgres) == 0)
1189 30 : return;
1190 :
1191 : /* Create temporary context for local allocations. */
1192 1856 : rowcontext = AllocSetContextCreate(CurrentMemoryContext,
1193 : "libpqrcv query result context",
1194 : ALLOCSET_DEFAULT_SIZES);
1195 :
1196 : /* Process returned rows. */
1197 4288 : for (tupn = 0; tupn < PQntuples(pgres); tupn++)
1198 : {
1199 : char *cstrs[MaxTupleAttributeNumber];
1200 :
1201 2432 : ProcessWalRcvInterrupts();
1202 :
1203 : /* Do the allocations in temporary context. */
1204 2432 : oldcontext = MemoryContextSwitchTo(rowcontext);
1205 :
1206 : /*
1207 : * Fill cstrs with null-terminated strings of column values.
1208 : */
1209 8714 : for (coln = 0; coln < nfields; coln++)
1210 : {
1211 6282 : if (PQgetisnull(pgres, tupn, coln))
1212 886 : cstrs[coln] = NULL;
1213 : else
1214 5396 : cstrs[coln] = PQgetvalue(pgres, tupn, coln);
1215 : }
1216 :
1217 : /* Convert row to a tuple, and add it to the tuplestore */
1218 2432 : tuple = BuildTupleFromCStrings(attinmeta, cstrs);
1219 2432 : tuplestore_puttuple(walres->tuplestore, tuple);
1220 :
1221 : /* Clean up */
1222 2432 : MemoryContextSwitchTo(oldcontext);
1223 2432 : MemoryContextReset(rowcontext);
1224 : }
1225 :
1226 1856 : MemoryContextDelete(rowcontext);
1227 : }
1228 :
1229 : /*
1230 : * Public interface for sending generic queries (and commands).
1231 : *
1232 : * This can only be called from process connected to database.
1233 : */
1234 : static WalRcvExecResult *
1235 3322 : libpqrcv_exec(WalReceiverConn *conn, const char *query,
1236 : const int nRetTypes, const Oid *retTypes)
1237 : {
1238 3322 : PGresult *pgres = NULL;
1239 3322 : WalRcvExecResult *walres = palloc0(sizeof(WalRcvExecResult));
1240 : char *diag_sqlstate;
1241 :
1242 3322 : if (MyDatabaseId == InvalidOid)
1243 0 : ereport(ERROR,
1244 : (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
1245 : errmsg("the query interface requires a database connection")));
1246 :
1247 3322 : pgres = libpqrcv_PQexec(conn->streamConn, query);
1248 :
1249 3322 : switch (PQresultStatus(pgres))
1250 : {
1251 1886 : case PGRES_TUPLES_OK:
1252 : case PGRES_SINGLE_TUPLE:
1253 : case PGRES_TUPLES_CHUNK:
1254 1886 : walres->status = WALRCV_OK_TUPLES;
1255 1886 : libpqrcv_processTuples(pgres, walres, nRetTypes, retTypes);
1256 1886 : break;
1257 :
1258 0 : case PGRES_COPY_IN:
1259 0 : walres->status = WALRCV_OK_COPY_IN;
1260 0 : break;
1261 :
1262 338 : case PGRES_COPY_OUT:
1263 338 : walres->status = WALRCV_OK_COPY_OUT;
1264 338 : break;
1265 :
1266 0 : case PGRES_COPY_BOTH:
1267 0 : walres->status = WALRCV_OK_COPY_BOTH;
1268 0 : break;
1269 :
1270 1098 : case PGRES_COMMAND_OK:
1271 1098 : walres->status = WALRCV_OK_COMMAND;
1272 1098 : break;
1273 :
1274 : /* Empty query is considered error. */
1275 0 : case PGRES_EMPTY_QUERY:
1276 0 : walres->status = WALRCV_ERROR;
1277 0 : walres->err = _("empty query");
1278 0 : break;
1279 :
1280 0 : case PGRES_PIPELINE_SYNC:
1281 : case PGRES_PIPELINE_ABORTED:
1282 0 : walres->status = WALRCV_ERROR;
1283 0 : walres->err = _("unexpected pipeline mode");
1284 0 : break;
1285 :
1286 0 : case PGRES_NONFATAL_ERROR:
1287 : case PGRES_FATAL_ERROR:
1288 : case PGRES_BAD_RESPONSE:
1289 0 : walres->status = WALRCV_ERROR;
1290 0 : walres->err = pchomp(PQerrorMessage(conn->streamConn));
1291 0 : diag_sqlstate = PQresultErrorField(pgres, PG_DIAG_SQLSTATE);
1292 0 : if (diag_sqlstate)
1293 0 : walres->sqlstate = MAKE_SQLSTATE(diag_sqlstate[0],
1294 : diag_sqlstate[1],
1295 : diag_sqlstate[2],
1296 : diag_sqlstate[3],
1297 : diag_sqlstate[4]);
1298 0 : break;
1299 : }
1300 :
1301 3322 : PQclear(pgres);
1302 :
1303 3322 : return walres;
1304 : }
1305 :
1306 : /*
1307 : * Given a List of strings, return it as single comma separated
1308 : * string, quoting identifiers as needed.
1309 : *
1310 : * This is essentially the reverse of SplitIdentifierString.
1311 : *
1312 : * The caller should free the result.
1313 : */
1314 : static char *
1315 644 : stringlist_to_identifierstr(PGconn *conn, List *strings)
1316 : {
1317 : ListCell *lc;
1318 : StringInfoData res;
1319 644 : bool first = true;
1320 :
1321 644 : initStringInfo(&res);
1322 :
1323 1726 : foreach(lc, strings)
1324 : {
1325 1082 : char *val = strVal(lfirst(lc));
1326 : char *val_escaped;
1327 :
1328 1082 : if (first)
1329 644 : first = false;
1330 : else
1331 438 : appendStringInfoChar(&res, ',');
1332 :
1333 1082 : val_escaped = PQescapeIdentifier(conn, val, strlen(val));
1334 1082 : if (!val_escaped)
1335 : {
1336 0 : free(res.data);
1337 0 : return NULL;
1338 : }
1339 1082 : appendStringInfoString(&res, val_escaped);
1340 1082 : PQfreemem(val_escaped);
1341 : }
1342 :
1343 644 : return res.data;
1344 : }
|