Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "common/file_perm.h"
22 : #include "common/logging.h"
23 : #include "fe_utils/option_utils.h"
24 : #include "getopt_long.h"
25 : #include "libpq-fe.h"
26 : #include "libpq/pqsignal.h"
27 : #include "pqexpbuffer.h"
28 : #include "streamutil.h"
29 :
30 : /* Time to sleep between reconnection attempts */
31 : #define RECONNECT_SLEEP_TIME 5
32 :
33 : typedef enum
34 : {
35 : STREAM_STOP_NONE,
36 : STREAM_STOP_END_OF_WAL,
37 : STREAM_STOP_KEEPALIVE,
38 : STREAM_STOP_SIGNAL
39 : } StreamStopReason;
40 :
41 : /* Global Options */
42 : static char *outfile = NULL;
43 : static int verbose = 0;
44 : static bool two_phase = false;
45 : static int noloop = 0;
46 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
47 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
48 : static XLogRecPtr startpos = InvalidXLogRecPtr;
49 : static XLogRecPtr endpos = InvalidXLogRecPtr;
50 : static bool do_create_slot = false;
51 : static bool slot_exists_ok = false;
52 : static bool do_start_slot = false;
53 : static bool do_drop_slot = false;
54 : static char *replication_slot = NULL;
55 :
56 : /* filled pairwise with option, value. value may be NULL */
57 : static char **options;
58 : static size_t noptions = 0;
59 : static const char *plugin = "test_decoding";
60 :
61 : /* Global State */
62 : static int outfd = -1;
63 : static volatile sig_atomic_t time_to_abort = false;
64 : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
65 : static volatile sig_atomic_t output_reopen = false;
66 : static bool output_isfile;
67 : static TimestampTz output_last_fsync = -1;
68 : static bool output_needs_fsync = false;
69 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
70 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
71 :
72 : static void usage(void);
73 : static void StreamLogicalLog(void);
74 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
75 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
76 : StreamStopReason reason,
77 : XLogRecPtr lsn);
78 :
79 : static void
80 2 : usage(void)
81 : {
82 2 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
83 : progname);
84 2 : printf(_("Usage:\n"));
85 2 : printf(_(" %s [OPTION]...\n"), progname);
86 2 : printf(_("\nAction to be performed:\n"));
87 2 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
88 2 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
89 2 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
90 2 : printf(_("\nOptions:\n"));
91 2 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
92 2 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
93 2 : printf(_(" -F --fsync-interval=SECS\n"
94 : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
95 2 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
96 2 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
97 2 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
98 2 : printf(_(" -o, --option=NAME[=VALUE]\n"
99 : " pass option NAME with optional value VALUE to the\n"
100 : " output plugin\n"));
101 2 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
102 2 : printf(_(" -s, --status-interval=SECS\n"
103 : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
104 2 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
105 2 : printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
106 2 : printf(_(" -v, --verbose output verbose messages\n"));
107 2 : printf(_(" -V, --version output version information, then exit\n"));
108 2 : printf(_(" -?, --help show this help, then exit\n"));
109 2 : printf(_("\nConnection options:\n"));
110 2 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
111 2 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
112 2 : printf(_(" -p, --port=PORT database server port number\n"));
113 2 : printf(_(" -U, --username=NAME connect as specified database user\n"));
114 2 : printf(_(" -w, --no-password never prompt for password\n"));
115 2 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
116 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
117 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
118 2 : }
119 :
120 : /*
121 : * Send a Standby Status Update message to server.
122 : */
123 : static bool
124 50 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
125 : {
126 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
127 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
128 :
129 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
130 50 : int len = 0;
131 :
132 : /*
133 : * we normally don't want to send superfluous feedback, but if it's
134 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
135 : * us.
136 : */
137 50 : if (!force &&
138 0 : last_written_lsn == output_written_lsn &&
139 0 : last_fsync_lsn == output_fsync_lsn)
140 0 : return true;
141 :
142 50 : if (verbose)
143 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
144 : LSN_FORMAT_ARGS(output_written_lsn),
145 : LSN_FORMAT_ARGS(output_fsync_lsn),
146 : replication_slot);
147 :
148 50 : replybuf[len] = 'r';
149 50 : len += 1;
150 50 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
151 50 : len += 8;
152 50 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
153 50 : len += 8;
154 50 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
155 50 : len += 8;
156 50 : fe_sendint64(now, &replybuf[len]); /* sendTime */
157 50 : len += 8;
158 50 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
159 50 : len += 1;
160 :
161 50 : startpos = output_written_lsn;
162 50 : last_written_lsn = output_written_lsn;
163 50 : last_fsync_lsn = output_fsync_lsn;
164 :
165 50 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
166 : {
167 0 : pg_log_error("could not send feedback packet: %s",
168 : PQerrorMessage(conn));
169 0 : return false;
170 : }
171 :
172 50 : return true;
173 : }
174 :
175 : static void
176 98 : disconnect_atexit(void)
177 : {
178 98 : if (conn != NULL)
179 52 : PQfinish(conn);
180 98 : }
181 :
182 : static bool
183 50 : OutputFsync(TimestampTz now)
184 : {
185 50 : output_last_fsync = now;
186 :
187 50 : output_fsync_lsn = output_written_lsn;
188 :
189 50 : if (fsync_interval <= 0)
190 0 : return true;
191 :
192 50 : if (!output_needs_fsync)
193 38 : return true;
194 :
195 12 : output_needs_fsync = false;
196 :
197 : /* can only fsync if it's a regular file */
198 12 : if (!output_isfile)
199 8 : return true;
200 :
201 4 : if (fsync(outfd) != 0)
202 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
203 :
204 4 : return true;
205 : }
206 :
207 : /*
208 : * Start the log streaming
209 : */
210 : static void
211 46 : StreamLogicalLog(void)
212 : {
213 : PGresult *res;
214 46 : char *copybuf = NULL;
215 46 : TimestampTz last_status = -1;
216 : int i;
217 : PQExpBuffer query;
218 : XLogRecPtr cur_record_lsn;
219 :
220 46 : output_written_lsn = InvalidXLogRecPtr;
221 46 : output_fsync_lsn = InvalidXLogRecPtr;
222 46 : cur_record_lsn = InvalidXLogRecPtr;
223 :
224 : /*
225 : * Connect in replication mode to the server
226 : */
227 46 : if (!conn)
228 0 : conn = GetConnection();
229 46 : if (!conn)
230 : /* Error message already written in GetConnection() */
231 0 : return;
232 :
233 : /*
234 : * Start the replication
235 : */
236 46 : if (verbose)
237 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
238 : LSN_FORMAT_ARGS(startpos),
239 : replication_slot);
240 :
241 : /* Initiate the replication stream at specified location */
242 46 : query = createPQExpBuffer();
243 46 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
244 46 : replication_slot, LSN_FORMAT_ARGS(startpos));
245 :
246 : /* print options if there are any */
247 46 : if (noptions)
248 40 : appendPQExpBufferStr(query, " (");
249 :
250 126 : for (i = 0; i < noptions; i++)
251 : {
252 : /* separator */
253 80 : if (i > 0)
254 40 : appendPQExpBufferStr(query, ", ");
255 :
256 : /* write option name */
257 80 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
258 :
259 : /* write option value if specified */
260 80 : if (options[(i * 2) + 1] != NULL)
261 80 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
262 : }
263 :
264 46 : if (noptions)
265 40 : appendPQExpBufferChar(query, ')');
266 :
267 46 : res = PQexec(conn, query->data);
268 46 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
269 : {
270 12 : pg_log_error("could not send replication command \"%s\": %s",
271 : query->data, PQresultErrorMessage(res));
272 12 : PQclear(res);
273 12 : goto error;
274 : }
275 34 : PQclear(res);
276 34 : resetPQExpBuffer(query);
277 :
278 34 : if (verbose)
279 0 : pg_log_info("streaming initiated");
280 :
281 1578 : while (!time_to_abort)
282 : {
283 : int r;
284 : int bytes_left;
285 : int bytes_written;
286 : TimestampTz now;
287 : int hdr_len;
288 :
289 1576 : cur_record_lsn = InvalidXLogRecPtr;
290 :
291 1576 : if (copybuf != NULL)
292 : {
293 854 : PQfreemem(copybuf);
294 854 : copybuf = NULL;
295 : }
296 :
297 : /*
298 : * Potentially send a status message to the primary.
299 : */
300 1576 : now = feGetCurrentTimestamp();
301 :
302 3118 : if (outfd != -1 &&
303 1542 : feTimestampDifferenceExceeds(output_last_fsync, now,
304 : fsync_interval))
305 : {
306 34 : if (!OutputFsync(now))
307 4 : goto error;
308 : }
309 :
310 3152 : if (standby_message_timeout > 0 &&
311 1576 : feTimestampDifferenceExceeds(last_status, now,
312 : standby_message_timeout))
313 : {
314 : /* Time to send feedback! */
315 34 : if (!sendFeedback(conn, now, true, false))
316 0 : goto error;
317 :
318 34 : last_status = now;
319 : }
320 :
321 : /* got SIGHUP, close output file */
322 1576 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
323 : {
324 0 : now = feGetCurrentTimestamp();
325 0 : if (!OutputFsync(now))
326 0 : goto error;
327 0 : close(outfd);
328 0 : outfd = -1;
329 : }
330 1576 : output_reopen = false;
331 :
332 : /* open the output file, if not open yet */
333 1576 : if (outfd == -1)
334 : {
335 : struct stat statbuf;
336 :
337 34 : if (strcmp(outfile, "-") == 0)
338 34 : outfd = fileno(stdout);
339 : else
340 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
341 : S_IRUSR | S_IWUSR);
342 34 : if (outfd == -1)
343 : {
344 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
345 0 : goto error;
346 : }
347 :
348 34 : if (fstat(outfd, &statbuf) != 0)
349 : {
350 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
351 0 : goto error;
352 : }
353 :
354 34 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
355 : }
356 :
357 1576 : r = PQgetCopyData(conn, ©buf, 1);
358 1576 : if (r == 0)
359 : {
360 : /*
361 : * In async mode, and no data available. We block on reading but
362 : * not more than the specified timeout, so that we can send a
363 : * response back to the client.
364 : */
365 : fd_set input_mask;
366 694 : TimestampTz message_target = 0;
367 694 : TimestampTz fsync_target = 0;
368 : struct timeval timeout;
369 694 : struct timeval *timeoutptr = NULL;
370 :
371 694 : if (PQsocket(conn) < 0)
372 : {
373 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
374 4 : goto error;
375 : }
376 :
377 694 : FD_ZERO(&input_mask);
378 694 : FD_SET(PQsocket(conn), &input_mask);
379 :
380 : /* Compute when we need to wakeup to send a keepalive message. */
381 694 : if (standby_message_timeout)
382 694 : message_target = last_status + (standby_message_timeout - 1) *
383 : ((int64) 1000);
384 :
385 : /* Compute when we need to wakeup to fsync the output file. */
386 694 : if (fsync_interval > 0 && output_needs_fsync)
387 324 : fsync_target = output_last_fsync + (fsync_interval - 1) *
388 : ((int64) 1000);
389 :
390 : /* Now compute when to wakeup. */
391 694 : if (message_target > 0 || fsync_target > 0)
392 : {
393 : TimestampTz targettime;
394 : long secs;
395 : int usecs;
396 :
397 694 : targettime = message_target;
398 :
399 694 : if (fsync_target > 0 && fsync_target < targettime)
400 0 : targettime = fsync_target;
401 :
402 694 : feTimestampDifference(now,
403 : targettime,
404 : &secs,
405 : &usecs);
406 694 : if (secs <= 0)
407 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
408 : else
409 694 : timeout.tv_sec = secs;
410 694 : timeout.tv_usec = usecs;
411 694 : timeoutptr = &timeout;
412 : }
413 :
414 694 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
415 694 : if (r == 0 || (r < 0 && errno == EINTR))
416 : {
417 : /*
418 : * Got a timeout or signal. Continue the loop and either
419 : * deliver a status packet to the server or just go back into
420 : * blocking.
421 : */
422 690 : continue;
423 : }
424 692 : else if (r < 0)
425 : {
426 0 : pg_log_error("%s() failed: %m", "select");
427 0 : goto error;
428 : }
429 :
430 : /* Else there is actually data on the socket */
431 692 : if (PQconsumeInput(conn) == 0)
432 : {
433 4 : pg_log_error("could not receive data from WAL stream: %s",
434 : PQerrorMessage(conn));
435 4 : goto error;
436 : }
437 688 : continue;
438 : }
439 :
440 : /* End of copy stream */
441 882 : if (r == -1)
442 28 : break;
443 :
444 : /* Failure while reading the copy stream */
445 868 : if (r == -2)
446 : {
447 0 : pg_log_error("could not read COPY data: %s",
448 : PQerrorMessage(conn));
449 0 : goto error;
450 : }
451 :
452 : /* Check the message type. */
453 868 : if (copybuf[0] == 'k')
454 : {
455 : int pos;
456 : bool replyRequested;
457 : XLogRecPtr walEnd;
458 686 : bool endposReached = false;
459 :
460 : /*
461 : * Parse the keepalive message, enclosed in the CopyData message.
462 : * We just check if the server requested a reply, and ignore the
463 : * rest.
464 : */
465 686 : pos = 1; /* skip msgtype 'k' */
466 686 : walEnd = fe_recvint64(©buf[pos]);
467 686 : output_written_lsn = Max(walEnd, output_written_lsn);
468 :
469 686 : pos += 8; /* read walEnd */
470 :
471 686 : pos += 8; /* skip sendTime */
472 :
473 686 : if (r < pos + 1)
474 : {
475 0 : pg_log_error("streaming header too small: %d", r);
476 0 : goto error;
477 : }
478 686 : replyRequested = copybuf[pos];
479 :
480 686 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
481 : {
482 : /*
483 : * If there's nothing to read on the socket until a keepalive
484 : * we know that the server has nothing to send us; and if
485 : * walEnd has passed endpos, we know nothing else can have
486 : * committed before endpos. So we can bail out now.
487 : */
488 4 : endposReached = true;
489 : }
490 :
491 : /* Send a reply, if necessary */
492 686 : if (replyRequested || endposReached)
493 : {
494 6 : if (!flushAndSendFeedback(conn, &now))
495 0 : goto error;
496 6 : last_status = now;
497 : }
498 :
499 686 : if (endposReached)
500 : {
501 4 : stop_reason = STREAM_STOP_KEEPALIVE;
502 4 : time_to_abort = true;
503 4 : break;
504 : }
505 :
506 682 : continue;
507 : }
508 182 : else if (copybuf[0] != 'w')
509 : {
510 0 : pg_log_error("unrecognized streaming header: \"%c\"",
511 : copybuf[0]);
512 0 : goto error;
513 : }
514 :
515 : /*
516 : * Read the header of the XLogData message, enclosed in the CopyData
517 : * message. We only need the WAL location field (dataStart), the rest
518 : * of the header is ignored.
519 : */
520 182 : hdr_len = 1; /* msgtype 'w' */
521 182 : hdr_len += 8; /* dataStart */
522 182 : hdr_len += 8; /* walEnd */
523 182 : hdr_len += 8; /* sendTime */
524 182 : if (r < hdr_len + 1)
525 : {
526 0 : pg_log_error("streaming header too small: %d", r);
527 0 : goto error;
528 : }
529 :
530 : /* Extract WAL location for this block */
531 182 : cur_record_lsn = fe_recvint64(©buf[1]);
532 :
533 182 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
534 : {
535 : /*
536 : * We've read past our endpoint, so prepare to go away being
537 : * cautious about what happens to our output data.
538 : */
539 0 : if (!flushAndSendFeedback(conn, &now))
540 0 : goto error;
541 0 : stop_reason = STREAM_STOP_END_OF_WAL;
542 0 : time_to_abort = true;
543 0 : break;
544 : }
545 :
546 182 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
547 :
548 182 : bytes_left = r - hdr_len;
549 182 : bytes_written = 0;
550 :
551 : /* signal that a fsync is needed */
552 182 : output_needs_fsync = true;
553 :
554 364 : while (bytes_left)
555 : {
556 : int ret;
557 :
558 364 : ret = write(outfd,
559 182 : copybuf + hdr_len + bytes_written,
560 : bytes_left);
561 :
562 182 : if (ret < 0)
563 : {
564 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
565 : bytes_left, outfile);
566 0 : goto error;
567 : }
568 :
569 : /* Write was successful, advance our position */
570 182 : bytes_written += ret;
571 182 : bytes_left -= ret;
572 : }
573 :
574 182 : if (write(outfd, "\n", 1) != 1)
575 : {
576 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
577 : 1, outfile);
578 0 : goto error;
579 : }
580 :
581 182 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
582 : {
583 : /* endpos was exactly the record we just processed, we're done */
584 10 : if (!flushAndSendFeedback(conn, &now))
585 0 : goto error;
586 10 : stop_reason = STREAM_STOP_END_OF_WAL;
587 10 : time_to_abort = true;
588 10 : break;
589 : }
590 : }
591 :
592 : /* Clean up connection state if stream has been aborted */
593 30 : if (time_to_abort)
594 16 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
595 :
596 30 : res = PQgetResult(conn);
597 30 : if (PQresultStatus(res) == PGRES_COPY_OUT)
598 : {
599 16 : PQclear(res);
600 :
601 : /*
602 : * We're doing a client-initiated clean exit and have sent CopyDone to
603 : * the server. Drain any messages, so we don't miss a last-minute
604 : * ErrorResponse. The walsender stops generating XLogData records once
605 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
606 : * it's too late for sendFeedback(), even if this were to take a long
607 : * time. Hence, use synchronous-mode PQgetCopyData().
608 : */
609 : while (1)
610 4 : {
611 : int r;
612 :
613 20 : if (copybuf != NULL)
614 : {
615 18 : PQfreemem(copybuf);
616 18 : copybuf = NULL;
617 : }
618 20 : r = PQgetCopyData(conn, ©buf, 0);
619 20 : if (r == -1)
620 16 : break;
621 4 : if (r == -2)
622 : {
623 0 : pg_log_error("could not read COPY data: %s",
624 : PQerrorMessage(conn));
625 0 : time_to_abort = false; /* unclean exit */
626 0 : goto error;
627 : }
628 : }
629 :
630 16 : res = PQgetResult(conn);
631 : }
632 30 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
633 : {
634 12 : pg_log_error("unexpected termination of replication stream: %s",
635 : PQresultErrorMessage(res));
636 12 : PQclear(res);
637 12 : goto error;
638 : }
639 18 : PQclear(res);
640 :
641 18 : if (outfd != -1 && strcmp(outfile, "-") != 0)
642 : {
643 0 : TimestampTz t = feGetCurrentTimestamp();
644 :
645 : /* no need to jump to error on failure here, we're finishing anyway */
646 0 : OutputFsync(t);
647 :
648 0 : if (close(outfd) != 0)
649 0 : pg_log_error("could not close file \"%s\": %m", outfile);
650 : }
651 18 : outfd = -1;
652 46 : error:
653 46 : if (copybuf != NULL)
654 : {
655 0 : PQfreemem(copybuf);
656 0 : copybuf = NULL;
657 : }
658 46 : destroyPQExpBuffer(query);
659 46 : PQfinish(conn);
660 46 : conn = NULL;
661 : }
662 :
663 : /*
664 : * Unfortunately we can't do sensible signal handling on windows...
665 : */
666 : #ifndef WIN32
667 :
668 : /*
669 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
670 : * possible moment.
671 : */
672 : static void
673 2 : sigexit_handler(SIGNAL_ARGS)
674 : {
675 2 : stop_reason = STREAM_STOP_SIGNAL;
676 2 : time_to_abort = true;
677 2 : }
678 :
679 : /*
680 : * Trigger the output file to be reopened.
681 : */
682 : static void
683 0 : sighup_handler(SIGNAL_ARGS)
684 : {
685 0 : output_reopen = true;
686 0 : }
687 : #endif
688 :
689 :
690 : int
691 114 : main(int argc, char **argv)
692 : {
693 : static struct option long_options[] = {
694 : /* general options */
695 : {"file", required_argument, NULL, 'f'},
696 : {"fsync-interval", required_argument, NULL, 'F'},
697 : {"no-loop", no_argument, NULL, 'n'},
698 : {"verbose", no_argument, NULL, 'v'},
699 : {"two-phase", no_argument, NULL, 't'},
700 : {"version", no_argument, NULL, 'V'},
701 : {"help", no_argument, NULL, '?'},
702 : /* connection options */
703 : {"dbname", required_argument, NULL, 'd'},
704 : {"host", required_argument, NULL, 'h'},
705 : {"port", required_argument, NULL, 'p'},
706 : {"username", required_argument, NULL, 'U'},
707 : {"no-password", no_argument, NULL, 'w'},
708 : {"password", no_argument, NULL, 'W'},
709 : /* replication options */
710 : {"startpos", required_argument, NULL, 'I'},
711 : {"endpos", required_argument, NULL, 'E'},
712 : {"option", required_argument, NULL, 'o'},
713 : {"plugin", required_argument, NULL, 'P'},
714 : {"status-interval", required_argument, NULL, 's'},
715 : {"slot", required_argument, NULL, 'S'},
716 : /* action */
717 : {"create-slot", no_argument, NULL, 1},
718 : {"start", no_argument, NULL, 2},
719 : {"drop-slot", no_argument, NULL, 3},
720 : {"if-not-exists", no_argument, NULL, 4},
721 : {NULL, 0, NULL, 0}
722 : };
723 : int c;
724 : int option_index;
725 : uint32 hi,
726 : lo;
727 : char *db_name;
728 :
729 114 : pg_logging_init(argv[0]);
730 114 : progname = get_progname(argv[0]);
731 114 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
732 :
733 114 : if (argc > 1)
734 : {
735 112 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
736 : {
737 2 : usage();
738 2 : exit(0);
739 : }
740 110 : else if (strcmp(argv[1], "-V") == 0 ||
741 110 : strcmp(argv[1], "--version") == 0)
742 : {
743 2 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
744 2 : exit(0);
745 : }
746 : }
747 :
748 658 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
749 : long_options, &option_index)) != -1)
750 : {
751 550 : switch (c)
752 : {
753 : /* general options */
754 48 : case 'f':
755 48 : outfile = pg_strdup(optarg);
756 48 : break;
757 0 : case 'F':
758 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
759 : INT_MAX / 1000,
760 : &fsync_interval))
761 0 : exit(1);
762 0 : fsync_interval *= 1000;
763 0 : break;
764 46 : case 'n':
765 46 : noloop = 1;
766 46 : break;
767 4 : case 't':
768 4 : two_phase = true;
769 4 : break;
770 0 : case 'v':
771 0 : verbose++;
772 0 : break;
773 : /* connection options */
774 102 : case 'd':
775 102 : dbname = pg_strdup(optarg);
776 102 : break;
777 0 : case 'h':
778 0 : dbhost = pg_strdup(optarg);
779 0 : break;
780 0 : case 'p':
781 0 : dbport = pg_strdup(optarg);
782 0 : break;
783 0 : case 'U':
784 0 : dbuser = pg_strdup(optarg);
785 0 : break;
786 0 : case 'w':
787 0 : dbgetpassword = -1;
788 0 : break;
789 0 : case 'W':
790 0 : dbgetpassword = 1;
791 0 : break;
792 : /* replication options */
793 0 : case 'I':
794 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
795 0 : pg_fatal("could not parse start position \"%s\"", optarg);
796 0 : startpos = ((uint64) hi) << 32 | lo;
797 0 : break;
798 16 : case 'E':
799 16 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
800 0 : pg_fatal("could not parse end position \"%s\"", optarg);
801 16 : endpos = ((uint64) hi) << 32 | lo;
802 16 : break;
803 80 : case 'o':
804 : {
805 80 : char *data = pg_strdup(optarg);
806 80 : char *val = strchr(data, '=');
807 :
808 80 : if (val != NULL)
809 : {
810 : /* remove =; separate data from val */
811 80 : *val = '\0';
812 80 : val++;
813 : }
814 :
815 80 : noptions += 1;
816 80 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
817 :
818 80 : options[(noptions - 1) * 2] = data;
819 80 : options[(noptions - 1) * 2 + 1] = val;
820 : }
821 :
822 80 : break;
823 44 : case 'P':
824 44 : plugin = pg_strdup(optarg);
825 44 : break;
826 0 : case 's':
827 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
828 : INT_MAX / 1000,
829 : &standby_message_timeout))
830 0 : exit(1);
831 0 : standby_message_timeout *= 1000;
832 0 : break;
833 106 : case 'S':
834 106 : replication_slot = pg_strdup(optarg);
835 106 : break;
836 : /* action */
837 48 : case 1:
838 48 : do_create_slot = true;
839 48 : break;
840 50 : case 2:
841 50 : do_start_slot = true;
842 50 : break;
843 4 : case 3:
844 4 : do_drop_slot = true;
845 4 : break;
846 0 : case 4:
847 0 : slot_exists_ok = true;
848 0 : break;
849 :
850 2 : default:
851 : /* getopt_long already emitted a complaint */
852 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
853 2 : exit(1);
854 : }
855 : }
856 :
857 : /*
858 : * Any non-option arguments?
859 : */
860 108 : if (optind < argc)
861 : {
862 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
863 : argv[optind]);
864 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
865 0 : exit(1);
866 : }
867 :
868 : /*
869 : * Required arguments
870 : */
871 108 : if (replication_slot == NULL)
872 : {
873 2 : pg_log_error("no slot specified");
874 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
875 2 : exit(1);
876 : }
877 :
878 106 : if (do_start_slot && outfile == NULL)
879 : {
880 2 : pg_log_error("no target file specified");
881 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
882 2 : exit(1);
883 : }
884 :
885 104 : if (!do_drop_slot && dbname == NULL)
886 : {
887 2 : pg_log_error("no database specified");
888 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
889 2 : exit(1);
890 : }
891 :
892 102 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
893 : {
894 2 : pg_log_error("at least one action needs to be specified");
895 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
896 2 : exit(1);
897 : }
898 :
899 100 : if (do_drop_slot && (do_create_slot || do_start_slot))
900 : {
901 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
902 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
903 0 : exit(1);
904 : }
905 :
906 100 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
907 : {
908 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
909 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
910 0 : exit(1);
911 : }
912 :
913 100 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
914 : {
915 0 : pg_log_error("--endpos may only be specified with --start");
916 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
917 0 : exit(1);
918 : }
919 :
920 100 : if (two_phase && !do_create_slot)
921 : {
922 2 : pg_log_error("--two-phase may only be specified with --create-slot");
923 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
924 2 : exit(1);
925 : }
926 :
927 : /*
928 : * Obtain a connection to server. Notably, if we need a password, we want
929 : * to collect it from the user immediately.
930 : */
931 98 : conn = GetConnection();
932 98 : if (!conn)
933 : /* Error message already written in GetConnection() */
934 0 : exit(1);
935 98 : atexit(disconnect_atexit);
936 :
937 : /*
938 : * Trap signals. (Don't do this until after the initial password prompt,
939 : * if one is needed, in GetConnection.)
940 : */
941 : #ifndef WIN32
942 98 : pqsignal(SIGINT, sigexit_handler);
943 98 : pqsignal(SIGTERM, sigexit_handler);
944 98 : pqsignal(SIGHUP, sighup_handler);
945 : #endif
946 :
947 : /*
948 : * Run IDENTIFY_SYSTEM to check the connection type for each action.
949 : * --create-slot and --start actions require a database-specific
950 : * replication connection because they handle logical replication slots.
951 : * --drop-slot can remove replication slots from any replication
952 : * connection without this restriction.
953 : */
954 98 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
955 0 : exit(1);
956 :
957 98 : if (!do_drop_slot && db_name == NULL)
958 0 : pg_fatal("could not establish database-specific replication connection");
959 :
960 : /*
961 : * Set umask so that directories/files are created with the same
962 : * permissions as directories/files in the source data directory.
963 : *
964 : * pg_mode_mask is set to owner-only by default and then updated in
965 : * GetConnection() where we get the mode from the server-side with
966 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
967 : */
968 98 : umask(pg_mode_mask);
969 :
970 : /* Drop a replication slot. */
971 98 : if (do_drop_slot)
972 : {
973 4 : if (verbose)
974 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
975 :
976 4 : if (!DropReplicationSlot(conn, replication_slot))
977 0 : exit(1);
978 : }
979 :
980 : /* Create a replication slot. */
981 98 : if (do_create_slot)
982 : {
983 48 : if (verbose)
984 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
985 :
986 48 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
987 : false, false, slot_exists_ok, two_phase))
988 0 : exit(1);
989 48 : startpos = InvalidXLogRecPtr;
990 : }
991 :
992 98 : if (!do_start_slot)
993 52 : exit(0);
994 :
995 : /* Stream loop */
996 : while (true)
997 : {
998 46 : StreamLogicalLog();
999 46 : if (time_to_abort)
1000 : {
1001 : /*
1002 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1003 : * not an error, so exit without an errorcode.
1004 : */
1005 16 : exit(0);
1006 : }
1007 30 : else if (noloop)
1008 30 : pg_fatal("disconnected");
1009 : else
1010 : {
1011 : /* translator: check source for value for %d */
1012 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1013 : RECONNECT_SLEEP_TIME);
1014 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1015 : }
1016 : }
1017 : }
1018 :
1019 : /*
1020 : * Fsync our output data, and send a feedback message to the server. Returns
1021 : * true if successful, false otherwise.
1022 : *
1023 : * If successful, *now is updated to the current timestamp just before sending
1024 : * feedback.
1025 : */
1026 : static bool
1027 16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1028 : {
1029 : /* flush data to disk, so that we send a recent flush pointer */
1030 16 : if (!OutputFsync(*now))
1031 0 : return false;
1032 16 : *now = feGetCurrentTimestamp();
1033 16 : if (!sendFeedback(conn, *now, true, false))
1034 0 : return false;
1035 :
1036 16 : return true;
1037 : }
1038 :
1039 : /*
1040 : * Try to inform the server about our upcoming demise, but don't wait around or
1041 : * retry on failure.
1042 : */
1043 : static void
1044 16 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1045 : XLogRecPtr lsn)
1046 : {
1047 16 : (void) PQputCopyEnd(conn, NULL);
1048 16 : (void) PQflush(conn);
1049 :
1050 16 : if (verbose)
1051 : {
1052 0 : switch (reason)
1053 : {
1054 0 : case STREAM_STOP_SIGNAL:
1055 0 : pg_log_info("received interrupt signal, exiting");
1056 0 : break;
1057 0 : case STREAM_STOP_KEEPALIVE:
1058 0 : pg_log_info("end position %X/%X reached by keepalive",
1059 : LSN_FORMAT_ARGS(endpos));
1060 0 : break;
1061 0 : case STREAM_STOP_END_OF_WAL:
1062 : Assert(!XLogRecPtrIsInvalid(lsn));
1063 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1064 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1065 0 : break;
1066 0 : case STREAM_STOP_NONE:
1067 : Assert(false);
1068 0 : break;
1069 : }
1070 16 : }
1071 16 : }
|