Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "common/file_perm.h"
22 : #include "common/logging.h"
23 : #include "fe_utils/option_utils.h"
24 : #include "getopt_long.h"
25 : #include "libpq-fe.h"
26 : #include "libpq/pqsignal.h"
27 : #include "libpq/protocol.h"
28 : #include "pqexpbuffer.h"
29 : #include "streamutil.h"
30 :
31 : /* Time to sleep between reconnection attempts */
32 : #define RECONNECT_SLEEP_TIME 5
33 :
34 : typedef enum
35 : {
36 : STREAM_STOP_NONE,
37 : STREAM_STOP_END_OF_WAL,
38 : STREAM_STOP_KEEPALIVE,
39 : STREAM_STOP_SIGNAL
40 : } StreamStopReason;
41 :
42 : /* Global Options */
43 : static char *outfile = NULL;
44 : static int verbose = 0;
45 : static bool two_phase = false; /* enable-two-phase option */
46 : static bool failover = false; /* enable-failover option */
47 : static int noloop = 0;
48 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 : static XLogRecPtr startpos = InvalidXLogRecPtr;
51 : static XLogRecPtr endpos = InvalidXLogRecPtr;
52 : static bool do_create_slot = false;
53 : static bool slot_exists_ok = false;
54 : static bool do_start_slot = false;
55 : static bool do_drop_slot = false;
56 : static char *replication_slot = NULL;
57 :
58 : /* filled pairwise with option, value. value may be NULL */
59 : static char **options;
60 : static size_t noptions = 0;
61 : static const char *plugin = "test_decoding";
62 :
63 : /* Global State */
64 : static int outfd = -1;
65 : static volatile sig_atomic_t time_to_abort = false;
66 : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 : static volatile sig_atomic_t output_reopen = false;
68 : static bool output_isfile;
69 : static TimestampTz output_last_fsync = -1;
70 : static bool output_needs_fsync = false;
71 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
72 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
73 :
74 : static void usage(void);
75 : static void StreamLogicalLog(void);
76 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
77 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
78 : StreamStopReason reason,
79 : XLogRecPtr lsn);
80 :
81 : static void
82 2 : usage(void)
83 : {
84 2 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 : progname);
86 2 : printf(_("Usage:\n"));
87 2 : printf(_(" %s [OPTION]...\n"), progname);
88 2 : printf(_("\nAction to be performed:\n"));
89 2 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 2 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 2 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
92 2 : printf(_("\nOptions:\n"));
93 2 : printf(_(" --enable-failover enable replication slot synchronization to standby servers when\n"
94 : " creating a replication slot\n"));
95 2 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
96 2 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
97 2 : printf(_(" -F --fsync-interval=SECS\n"
98 : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
99 2 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
100 2 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
101 2 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
102 2 : printf(_(" -o, --option=NAME[=VALUE]\n"
103 : " pass option NAME with optional value VALUE to the\n"
104 : " output plugin\n"));
105 2 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
106 2 : printf(_(" -s, --status-interval=SECS\n"
107 : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
108 2 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
109 2 : printf(_(" -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
110 2 : printf(_(" --two-phase (same as --enable-two-phase, deprecated)\n"));
111 2 : printf(_(" -v, --verbose output verbose messages\n"));
112 2 : printf(_(" -V, --version output version information, then exit\n"));
113 2 : printf(_(" -?, --help show this help, then exit\n"));
114 2 : printf(_("\nConnection options:\n"));
115 2 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
116 2 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
117 2 : printf(_(" -p, --port=PORT database server port number\n"));
118 2 : printf(_(" -U, --username=NAME connect as specified database user\n"));
119 2 : printf(_(" -w, --no-password never prompt for password\n"));
120 2 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
121 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
122 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
123 2 : }
124 :
125 : /*
126 : * Send a Standby Status Update message to server.
127 : */
128 : static bool
129 50 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
130 : {
131 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
132 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
133 :
134 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
135 50 : int len = 0;
136 :
137 : /*
138 : * we normally don't want to send superfluous feedback, but if it's
139 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
140 : * us.
141 : */
142 50 : if (!force &&
143 0 : last_written_lsn == output_written_lsn &&
144 0 : last_fsync_lsn == output_fsync_lsn)
145 0 : return true;
146 :
147 50 : if (verbose)
148 0 : pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
149 : LSN_FORMAT_ARGS(output_written_lsn),
150 : LSN_FORMAT_ARGS(output_fsync_lsn),
151 : replication_slot);
152 :
153 50 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
154 50 : len += 1;
155 50 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
156 50 : len += 8;
157 50 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
158 50 : len += 8;
159 50 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
160 50 : len += 8;
161 50 : fe_sendint64(now, &replybuf[len]); /* sendTime */
162 50 : len += 8;
163 50 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
164 50 : len += 1;
165 :
166 50 : startpos = output_written_lsn;
167 50 : last_written_lsn = output_written_lsn;
168 50 : last_fsync_lsn = output_fsync_lsn;
169 :
170 50 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
171 : {
172 0 : pg_log_error("could not send feedback packet: %s",
173 : PQerrorMessage(conn));
174 0 : return false;
175 : }
176 :
177 50 : return true;
178 : }
179 :
180 : static void
181 100 : disconnect_atexit(void)
182 : {
183 100 : if (conn != NULL)
184 54 : PQfinish(conn);
185 100 : }
186 :
187 : static bool
188 50 : OutputFsync(TimestampTz now)
189 : {
190 50 : output_last_fsync = now;
191 :
192 50 : output_fsync_lsn = output_written_lsn;
193 :
194 50 : if (fsync_interval <= 0)
195 0 : return true;
196 :
197 50 : if (!output_needs_fsync)
198 38 : return true;
199 :
200 12 : output_needs_fsync = false;
201 :
202 : /* can only fsync if it's a regular file */
203 12 : if (!output_isfile)
204 8 : return true;
205 :
206 4 : if (fsync(outfd) != 0)
207 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
208 :
209 4 : return true;
210 : }
211 :
212 : /*
213 : * Start the log streaming
214 : */
215 : static void
216 46 : StreamLogicalLog(void)
217 : {
218 : PGresult *res;
219 46 : char *copybuf = NULL;
220 46 : TimestampTz last_status = -1;
221 : int i;
222 : PQExpBuffer query;
223 : XLogRecPtr cur_record_lsn;
224 :
225 46 : output_written_lsn = InvalidXLogRecPtr;
226 46 : output_fsync_lsn = InvalidXLogRecPtr;
227 46 : cur_record_lsn = InvalidXLogRecPtr;
228 :
229 : /*
230 : * Connect in replication mode to the server
231 : */
232 46 : if (!conn)
233 0 : conn = GetConnection();
234 46 : if (!conn)
235 : /* Error message already written in GetConnection() */
236 0 : return;
237 :
238 : /*
239 : * Start the replication
240 : */
241 46 : if (verbose)
242 0 : pg_log_info("starting log streaming at %X/%08X (slot %s)",
243 : LSN_FORMAT_ARGS(startpos),
244 : replication_slot);
245 :
246 : /* Initiate the replication stream at specified location */
247 46 : query = createPQExpBuffer();
248 46 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%08X",
249 46 : replication_slot, LSN_FORMAT_ARGS(startpos));
250 :
251 : /* print options if there are any */
252 46 : if (noptions)
253 40 : appendPQExpBufferStr(query, " (");
254 :
255 126 : for (i = 0; i < noptions; i++)
256 : {
257 : /* separator */
258 80 : if (i > 0)
259 40 : appendPQExpBufferStr(query, ", ");
260 :
261 : /* write option name */
262 80 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
263 :
264 : /* write option value if specified */
265 80 : if (options[(i * 2) + 1] != NULL)
266 80 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
267 : }
268 :
269 46 : if (noptions)
270 40 : appendPQExpBufferChar(query, ')');
271 :
272 46 : res = PQexec(conn, query->data);
273 46 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
274 : {
275 12 : pg_log_error("could not send replication command \"%s\": %s",
276 : query->data, PQresultErrorMessage(res));
277 12 : PQclear(res);
278 12 : goto error;
279 : }
280 34 : PQclear(res);
281 34 : resetPQExpBuffer(query);
282 :
283 34 : if (verbose)
284 0 : pg_log_info("streaming initiated");
285 :
286 600 : while (!time_to_abort)
287 : {
288 : int r;
289 : int bytes_left;
290 : int bytes_written;
291 : TimestampTz now;
292 : int hdr_len;
293 :
294 598 : cur_record_lsn = InvalidXLogRecPtr;
295 :
296 598 : if (copybuf != NULL)
297 : {
298 356 : PQfreemem(copybuf);
299 356 : copybuf = NULL;
300 : }
301 :
302 : /*
303 : * Potentially send a status message to the primary.
304 : */
305 598 : now = feGetCurrentTimestamp();
306 :
307 1162 : if (outfd != -1 &&
308 564 : feTimestampDifferenceExceeds(output_last_fsync, now,
309 : fsync_interval))
310 : {
311 34 : if (!OutputFsync(now))
312 4 : goto error;
313 : }
314 :
315 1196 : if (standby_message_timeout > 0 &&
316 598 : feTimestampDifferenceExceeds(last_status, now,
317 : standby_message_timeout))
318 : {
319 : /* Time to send feedback! */
320 34 : if (!sendFeedback(conn, now, true, false))
321 0 : goto error;
322 :
323 34 : last_status = now;
324 : }
325 :
326 : /* got SIGHUP, close output file */
327 598 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
328 : {
329 0 : now = feGetCurrentTimestamp();
330 0 : if (!OutputFsync(now))
331 0 : goto error;
332 0 : close(outfd);
333 0 : outfd = -1;
334 : }
335 598 : output_reopen = false;
336 :
337 : /* open the output file, if not open yet */
338 598 : if (outfd == -1)
339 : {
340 : struct stat statbuf;
341 :
342 34 : if (strcmp(outfile, "-") == 0)
343 34 : outfd = fileno(stdout);
344 : else
345 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
346 : S_IRUSR | S_IWUSR);
347 34 : if (outfd == -1)
348 : {
349 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
350 0 : goto error;
351 : }
352 :
353 34 : if (fstat(outfd, &statbuf) != 0)
354 : {
355 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
356 0 : goto error;
357 : }
358 :
359 34 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
360 : }
361 :
362 598 : r = PQgetCopyData(conn, ©buf, 1);
363 598 : if (r == 0)
364 208 : {
365 : /*
366 : * In async mode, and no data available. We block on reading but
367 : * not more than the specified timeout, so that we can send a
368 : * response back to the client.
369 : */
370 : fd_set input_mask;
371 214 : TimestampTz message_target = 0;
372 214 : TimestampTz fsync_target = 0;
373 : struct timeval timeout;
374 214 : struct timeval *timeoutptr = NULL;
375 :
376 214 : if (PQsocket(conn) < 0)
377 : {
378 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
379 4 : goto error;
380 : }
381 :
382 3638 : FD_ZERO(&input_mask);
383 214 : FD_SET(PQsocket(conn), &input_mask);
384 :
385 : /* Compute when we need to wakeup to send a keepalive message. */
386 214 : if (standby_message_timeout)
387 214 : message_target = last_status + (standby_message_timeout - 1) *
388 : ((int64) 1000);
389 :
390 : /* Compute when we need to wakeup to fsync the output file. */
391 214 : if (fsync_interval > 0 && output_needs_fsync)
392 98 : fsync_target = output_last_fsync + (fsync_interval - 1) *
393 : ((int64) 1000);
394 :
395 : /* Now compute when to wakeup. */
396 214 : if (message_target > 0 || fsync_target > 0)
397 : {
398 : TimestampTz targettime;
399 : long secs;
400 : int usecs;
401 :
402 214 : targettime = message_target;
403 :
404 214 : if (fsync_target > 0 && fsync_target < targettime)
405 0 : targettime = fsync_target;
406 :
407 214 : feTimestampDifference(now,
408 : targettime,
409 : &secs,
410 : &usecs);
411 214 : if (secs <= 0)
412 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
413 : else
414 214 : timeout.tv_sec = secs;
415 214 : timeout.tv_usec = usecs;
416 214 : timeoutptr = &timeout;
417 : }
418 :
419 214 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
420 214 : if (r == 0 || (r < 0 && errno == EINTR))
421 : {
422 : /*
423 : * Got a timeout or signal. Continue the loop and either
424 : * deliver a status packet to the server or just go back into
425 : * blocking.
426 : */
427 210 : continue;
428 : }
429 212 : else if (r < 0)
430 : {
431 0 : pg_log_error("%s() failed: %m", "select");
432 0 : goto error;
433 : }
434 :
435 : /* Else there is actually data on the socket */
436 212 : if (PQconsumeInput(conn) == 0)
437 : {
438 4 : pg_log_error("could not receive data from WAL stream: %s",
439 : PQerrorMessage(conn));
440 4 : goto error;
441 : }
442 208 : continue;
443 : }
444 :
445 : /* End of copy stream */
446 384 : if (r == -1)
447 28 : break;
448 :
449 : /* Failure while reading the copy stream */
450 370 : if (r == -2)
451 : {
452 0 : pg_log_error("could not read COPY data: %s",
453 : PQerrorMessage(conn));
454 0 : goto error;
455 : }
456 :
457 : /* Check the message type. */
458 370 : if (copybuf[0] == PqReplMsg_Keepalive)
459 184 : {
460 : int pos;
461 : bool replyRequested;
462 : XLogRecPtr walEnd;
463 188 : bool endposReached = false;
464 :
465 : /*
466 : * Parse the keepalive message, enclosed in the CopyData message.
467 : * We just check if the server requested a reply, and ignore the
468 : * rest.
469 : */
470 188 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
471 188 : walEnd = fe_recvint64(©buf[pos]);
472 188 : output_written_lsn = Max(walEnd, output_written_lsn);
473 :
474 188 : pos += 8; /* read walEnd */
475 :
476 188 : pos += 8; /* skip sendTime */
477 :
478 188 : if (r < pos + 1)
479 : {
480 0 : pg_log_error("streaming header too small: %d", r);
481 0 : goto error;
482 : }
483 188 : replyRequested = copybuf[pos];
484 :
485 188 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
486 : {
487 : /*
488 : * If there's nothing to read on the socket until a keepalive
489 : * we know that the server has nothing to send us; and if
490 : * walEnd has passed endpos, we know nothing else can have
491 : * committed before endpos. So we can bail out now.
492 : */
493 4 : endposReached = true;
494 : }
495 :
496 : /* Send a reply, if necessary */
497 188 : if (replyRequested || endposReached)
498 : {
499 6 : if (!flushAndSendFeedback(conn, &now))
500 0 : goto error;
501 6 : last_status = now;
502 : }
503 :
504 188 : if (endposReached)
505 : {
506 4 : stop_reason = STREAM_STOP_KEEPALIVE;
507 4 : time_to_abort = true;
508 4 : break;
509 : }
510 :
511 184 : continue;
512 : }
513 182 : else if (copybuf[0] != PqReplMsg_WALData)
514 : {
515 0 : pg_log_error("unrecognized streaming header: \"%c\"",
516 : copybuf[0]);
517 0 : goto error;
518 : }
519 :
520 : /*
521 : * Read the header of the WALData message, enclosed in the CopyData
522 : * message. We only need the WAL location field (dataStart), the rest
523 : * of the header is ignored.
524 : */
525 182 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
526 182 : hdr_len += 8; /* dataStart */
527 182 : hdr_len += 8; /* walEnd */
528 182 : hdr_len += 8; /* sendTime */
529 182 : if (r < hdr_len + 1)
530 : {
531 0 : pg_log_error("streaming header too small: %d", r);
532 0 : goto error;
533 : }
534 :
535 : /* Extract WAL location for this block */
536 182 : cur_record_lsn = fe_recvint64(©buf[1]);
537 :
538 182 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
539 : {
540 : /*
541 : * We've read past our endpoint, so prepare to go away being
542 : * cautious about what happens to our output data.
543 : */
544 0 : if (!flushAndSendFeedback(conn, &now))
545 0 : goto error;
546 0 : stop_reason = STREAM_STOP_END_OF_WAL;
547 0 : time_to_abort = true;
548 0 : break;
549 : }
550 :
551 182 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
552 :
553 182 : bytes_left = r - hdr_len;
554 182 : bytes_written = 0;
555 :
556 : /* signal that a fsync is needed */
557 182 : output_needs_fsync = true;
558 :
559 364 : while (bytes_left)
560 : {
561 : int ret;
562 :
563 364 : ret = write(outfd,
564 182 : copybuf + hdr_len + bytes_written,
565 : bytes_left);
566 :
567 182 : if (ret < 0)
568 : {
569 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
570 : bytes_left, outfile);
571 0 : goto error;
572 : }
573 :
574 : /* Write was successful, advance our position */
575 182 : bytes_written += ret;
576 182 : bytes_left -= ret;
577 : }
578 :
579 182 : if (write(outfd, "\n", 1) != 1)
580 : {
581 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
582 : 1, outfile);
583 0 : goto error;
584 : }
585 :
586 182 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
587 : {
588 : /* endpos was exactly the record we just processed, we're done */
589 10 : if (!flushAndSendFeedback(conn, &now))
590 0 : goto error;
591 10 : stop_reason = STREAM_STOP_END_OF_WAL;
592 10 : time_to_abort = true;
593 10 : break;
594 : }
595 : }
596 :
597 : /* Clean up connection state if stream has been aborted */
598 30 : if (time_to_abort)
599 16 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
600 :
601 30 : res = PQgetResult(conn);
602 30 : if (PQresultStatus(res) == PGRES_COPY_OUT)
603 : {
604 16 : PQclear(res);
605 :
606 : /*
607 : * We're doing a client-initiated clean exit and have sent CopyDone to
608 : * the server. Drain any messages, so we don't miss a last-minute
609 : * ErrorResponse. The walsender stops generating WALData records once
610 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
611 : * it's too late for sendFeedback(), even if this were to take a long
612 : * time. Hence, use synchronous-mode PQgetCopyData().
613 : */
614 : while (1)
615 4 : {
616 : int r;
617 :
618 20 : if (copybuf != NULL)
619 : {
620 18 : PQfreemem(copybuf);
621 18 : copybuf = NULL;
622 : }
623 20 : r = PQgetCopyData(conn, ©buf, 0);
624 20 : if (r == -1)
625 16 : break;
626 4 : if (r == -2)
627 : {
628 0 : pg_log_error("could not read COPY data: %s",
629 : PQerrorMessage(conn));
630 0 : time_to_abort = false; /* unclean exit */
631 0 : goto error;
632 : }
633 : }
634 :
635 16 : res = PQgetResult(conn);
636 : }
637 30 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
638 : {
639 12 : pg_log_error("unexpected termination of replication stream: %s",
640 : PQresultErrorMessage(res));
641 12 : PQclear(res);
642 12 : goto error;
643 : }
644 18 : PQclear(res);
645 :
646 18 : if (outfd != -1 && strcmp(outfile, "-") != 0)
647 : {
648 0 : TimestampTz t = feGetCurrentTimestamp();
649 :
650 : /* no need to jump to error on failure here, we're finishing anyway */
651 0 : OutputFsync(t);
652 :
653 0 : if (close(outfd) != 0)
654 0 : pg_log_error("could not close file \"%s\": %m", outfile);
655 : }
656 18 : outfd = -1;
657 46 : error:
658 46 : if (copybuf != NULL)
659 : {
660 0 : PQfreemem(copybuf);
661 0 : copybuf = NULL;
662 : }
663 46 : destroyPQExpBuffer(query);
664 46 : PQfinish(conn);
665 46 : conn = NULL;
666 : }
667 :
668 : /*
669 : * Unfortunately we can't do sensible signal handling on windows...
670 : */
671 : #ifndef WIN32
672 :
673 : /*
674 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
675 : * possible moment.
676 : */
677 : static void
678 2 : sigexit_handler(SIGNAL_ARGS)
679 : {
680 2 : stop_reason = STREAM_STOP_SIGNAL;
681 2 : time_to_abort = true;
682 2 : }
683 :
684 : /*
685 : * Trigger the output file to be reopened.
686 : */
687 : static void
688 0 : sighup_handler(SIGNAL_ARGS)
689 : {
690 0 : output_reopen = true;
691 0 : }
692 : #endif
693 :
694 :
695 : int
696 116 : main(int argc, char **argv)
697 : {
698 : static struct option long_options[] = {
699 : /* general options */
700 : {"file", required_argument, NULL, 'f'},
701 : {"fsync-interval", required_argument, NULL, 'F'},
702 : {"no-loop", no_argument, NULL, 'n'},
703 : {"enable-failover", no_argument, NULL, 5},
704 : {"enable-two-phase", no_argument, NULL, 't'},
705 : {"two-phase", no_argument, NULL, 't'}, /* deprecated */
706 : {"verbose", no_argument, NULL, 'v'},
707 : {"version", no_argument, NULL, 'V'},
708 : {"help", no_argument, NULL, '?'},
709 : /* connection options */
710 : {"dbname", required_argument, NULL, 'd'},
711 : {"host", required_argument, NULL, 'h'},
712 : {"port", required_argument, NULL, 'p'},
713 : {"username", required_argument, NULL, 'U'},
714 : {"no-password", no_argument, NULL, 'w'},
715 : {"password", no_argument, NULL, 'W'},
716 : /* replication options */
717 : {"startpos", required_argument, NULL, 'I'},
718 : {"endpos", required_argument, NULL, 'E'},
719 : {"option", required_argument, NULL, 'o'},
720 : {"plugin", required_argument, NULL, 'P'},
721 : {"status-interval", required_argument, NULL, 's'},
722 : {"slot", required_argument, NULL, 'S'},
723 : /* action */
724 : {"create-slot", no_argument, NULL, 1},
725 : {"start", no_argument, NULL, 2},
726 : {"drop-slot", no_argument, NULL, 3},
727 : {"if-not-exists", no_argument, NULL, 4},
728 : {NULL, 0, NULL, 0}
729 : };
730 : int c;
731 : int option_index;
732 : uint32 hi,
733 : lo;
734 : char *db_name;
735 :
736 116 : pg_logging_init(argv[0]);
737 116 : progname = get_progname(argv[0]);
738 116 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
739 :
740 116 : if (argc > 1)
741 : {
742 114 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
743 : {
744 2 : usage();
745 2 : exit(0);
746 : }
747 112 : else if (strcmp(argv[1], "-V") == 0 ||
748 112 : strcmp(argv[1], "--version") == 0)
749 : {
750 2 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
751 2 : exit(0);
752 : }
753 : }
754 :
755 668 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
756 668 : long_options, &option_index)) != -1)
757 : {
758 558 : switch (c)
759 : {
760 : /* general options */
761 48 : case 'f':
762 48 : outfile = pg_strdup(optarg);
763 48 : break;
764 0 : case 'F':
765 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
766 : INT_MAX / 1000,
767 : &fsync_interval))
768 0 : exit(1);
769 0 : fsync_interval *= 1000;
770 0 : break;
771 46 : case 'n':
772 46 : noloop = 1;
773 46 : break;
774 4 : case 't':
775 4 : two_phase = true;
776 4 : break;
777 0 : case 'v':
778 0 : verbose++;
779 0 : break;
780 2 : case 5:
781 2 : failover = true;
782 2 : break;
783 : /* connection options */
784 104 : case 'd':
785 104 : dbname = pg_strdup(optarg);
786 104 : break;
787 0 : case 'h':
788 0 : dbhost = pg_strdup(optarg);
789 0 : break;
790 0 : case 'p':
791 0 : dbport = pg_strdup(optarg);
792 0 : break;
793 0 : case 'U':
794 0 : dbuser = pg_strdup(optarg);
795 0 : break;
796 0 : case 'w':
797 0 : dbgetpassword = -1;
798 0 : break;
799 0 : case 'W':
800 0 : dbgetpassword = 1;
801 0 : break;
802 : /* replication options */
803 0 : case 'I':
804 0 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
805 0 : pg_fatal("could not parse start position \"%s\"", optarg);
806 0 : startpos = ((uint64) hi) << 32 | lo;
807 0 : break;
808 16 : case 'E':
809 16 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
810 0 : pg_fatal("could not parse end position \"%s\"", optarg);
811 16 : endpos = ((uint64) hi) << 32 | lo;
812 16 : break;
813 80 : case 'o':
814 : {
815 80 : char *data = pg_strdup(optarg);
816 80 : char *val = strchr(data, '=');
817 :
818 80 : if (val != NULL)
819 : {
820 : /* remove =; separate data from val */
821 80 : *val = '\0';
822 80 : val++;
823 : }
824 :
825 80 : noptions += 1;
826 80 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
827 :
828 80 : options[(noptions - 1) * 2] = data;
829 80 : options[(noptions - 1) * 2 + 1] = val;
830 : }
831 :
832 80 : break;
833 44 : case 'P':
834 44 : plugin = pg_strdup(optarg);
835 44 : break;
836 0 : case 's':
837 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
838 : INT_MAX / 1000,
839 : &standby_message_timeout))
840 0 : exit(1);
841 0 : standby_message_timeout *= 1000;
842 0 : break;
843 108 : case 'S':
844 108 : replication_slot = pg_strdup(optarg);
845 108 : break;
846 : /* action */
847 50 : case 1:
848 50 : do_create_slot = true;
849 50 : break;
850 50 : case 2:
851 50 : do_start_slot = true;
852 50 : break;
853 4 : case 3:
854 4 : do_drop_slot = true;
855 4 : break;
856 0 : case 4:
857 0 : slot_exists_ok = true;
858 0 : break;
859 :
860 2 : default:
861 : /* getopt_long already emitted a complaint */
862 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
863 2 : exit(1);
864 : }
865 : }
866 :
867 : /*
868 : * Any non-option arguments?
869 : */
870 110 : if (optind < argc)
871 : {
872 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
873 : argv[optind]);
874 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
875 0 : exit(1);
876 : }
877 :
878 : /*
879 : * Required arguments
880 : */
881 110 : if (replication_slot == NULL)
882 : {
883 2 : pg_log_error("no slot specified");
884 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
885 2 : exit(1);
886 : }
887 :
888 108 : if (do_start_slot && outfile == NULL)
889 : {
890 2 : pg_log_error("no target file specified");
891 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
892 2 : exit(1);
893 : }
894 :
895 106 : if (!do_drop_slot && dbname == NULL)
896 : {
897 2 : pg_log_error("no database specified");
898 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
899 2 : exit(1);
900 : }
901 :
902 104 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
903 : {
904 2 : pg_log_error("at least one action needs to be specified");
905 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
906 2 : exit(1);
907 : }
908 :
909 102 : if (do_drop_slot && (do_create_slot || do_start_slot))
910 : {
911 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
912 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
913 0 : exit(1);
914 : }
915 :
916 102 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
917 : {
918 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
919 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
920 0 : exit(1);
921 : }
922 :
923 102 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
924 : {
925 0 : pg_log_error("--endpos may only be specified with --start");
926 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
927 0 : exit(1);
928 : }
929 :
930 102 : if (!do_create_slot)
931 : {
932 52 : if (two_phase)
933 : {
934 2 : pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
935 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
936 2 : exit(1);
937 : }
938 :
939 50 : if (failover)
940 : {
941 0 : pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
942 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
943 0 : exit(1);
944 : }
945 : }
946 :
947 : /*
948 : * Obtain a connection to server. Notably, if we need a password, we want
949 : * to collect it from the user immediately.
950 : */
951 100 : conn = GetConnection();
952 100 : if (!conn)
953 : /* Error message already written in GetConnection() */
954 0 : exit(1);
955 100 : atexit(disconnect_atexit);
956 :
957 : /*
958 : * Trap signals. (Don't do this until after the initial password prompt,
959 : * if one is needed, in GetConnection.)
960 : */
961 : #ifndef WIN32
962 100 : pqsignal(SIGINT, sigexit_handler);
963 100 : pqsignal(SIGTERM, sigexit_handler);
964 100 : pqsignal(SIGHUP, sighup_handler);
965 : #endif
966 :
967 : /*
968 : * Run IDENTIFY_SYSTEM to check the connection type for each action.
969 : * --create-slot and --start actions require a database-specific
970 : * replication connection because they handle logical replication slots.
971 : * --drop-slot can remove replication slots from any replication
972 : * connection without this restriction.
973 : */
974 100 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
975 0 : exit(1);
976 :
977 100 : if (!do_drop_slot && db_name == NULL)
978 0 : pg_fatal("could not establish database-specific replication connection");
979 :
980 : /*
981 : * Set umask so that directories/files are created with the same
982 : * permissions as directories/files in the source data directory.
983 : *
984 : * pg_mode_mask is set to owner-only by default and then updated in
985 : * GetConnection() where we get the mode from the server-side with
986 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
987 : */
988 100 : umask(pg_mode_mask);
989 :
990 : /* Drop a replication slot. */
991 100 : if (do_drop_slot)
992 : {
993 4 : if (verbose)
994 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
995 :
996 4 : if (!DropReplicationSlot(conn, replication_slot))
997 0 : exit(1);
998 : }
999 :
1000 : /* Create a replication slot. */
1001 100 : if (do_create_slot)
1002 : {
1003 50 : if (verbose)
1004 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
1005 :
1006 50 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
1007 : false, false, slot_exists_ok, two_phase,
1008 : failover))
1009 0 : exit(1);
1010 50 : startpos = InvalidXLogRecPtr;
1011 : }
1012 :
1013 100 : if (!do_start_slot)
1014 54 : exit(0);
1015 :
1016 : /* Stream loop */
1017 : while (true)
1018 : {
1019 46 : StreamLogicalLog();
1020 46 : if (time_to_abort)
1021 : {
1022 : /*
1023 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1024 : * not an error, so exit without an errorcode.
1025 : */
1026 16 : exit(0);
1027 : }
1028 30 : else if (noloop)
1029 30 : pg_fatal("disconnected");
1030 : else
1031 : {
1032 : /* translator: check source for value for %d */
1033 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1034 : RECONNECT_SLEEP_TIME);
1035 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1036 : }
1037 : }
1038 : }
1039 :
1040 : /*
1041 : * Fsync our output data, and send a feedback message to the server. Returns
1042 : * true if successful, false otherwise.
1043 : *
1044 : * If successful, *now is updated to the current timestamp just before sending
1045 : * feedback.
1046 : */
1047 : static bool
1048 16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1049 : {
1050 : /* flush data to disk, so that we send a recent flush pointer */
1051 16 : if (!OutputFsync(*now))
1052 0 : return false;
1053 16 : *now = feGetCurrentTimestamp();
1054 16 : if (!sendFeedback(conn, *now, true, false))
1055 0 : return false;
1056 :
1057 16 : return true;
1058 : }
1059 :
1060 : /*
1061 : * Try to inform the server about our upcoming demise, but don't wait around or
1062 : * retry on failure.
1063 : */
1064 : static void
1065 16 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1066 : XLogRecPtr lsn)
1067 : {
1068 16 : (void) PQputCopyEnd(conn, NULL);
1069 16 : (void) PQflush(conn);
1070 :
1071 16 : if (verbose)
1072 : {
1073 0 : switch (reason)
1074 : {
1075 0 : case STREAM_STOP_SIGNAL:
1076 0 : pg_log_info("received interrupt signal, exiting");
1077 0 : break;
1078 0 : case STREAM_STOP_KEEPALIVE:
1079 0 : pg_log_info("end position %X/%08X reached by keepalive",
1080 : LSN_FORMAT_ARGS(endpos));
1081 0 : break;
1082 0 : case STREAM_STOP_END_OF_WAL:
1083 : Assert(!XLogRecPtrIsInvalid(lsn));
1084 0 : pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
1085 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1086 0 : break;
1087 0 : case STREAM_STOP_NONE:
1088 : Assert(false);
1089 0 : break;
1090 : }
1091 : }
1092 16 : }
|