Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "access/xlog_internal.h"
22 : #include "common/fe_memutils.h"
23 : #include "common/file_perm.h"
24 : #include "common/logging.h"
25 : #include "fe_utils/option_utils.h"
26 : #include "getopt_long.h"
27 : #include "libpq-fe.h"
28 : #include "libpq/pqsignal.h"
29 : #include "pqexpbuffer.h"
30 : #include "streamutil.h"
31 :
32 : /* Time to sleep between reconnection attempts */
33 : #define RECONNECT_SLEEP_TIME 5
34 :
35 : /* Global Options */
36 : static char *outfile = NULL;
37 : static int verbose = 0;
38 : static bool two_phase = false;
39 : static int noloop = 0;
40 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
41 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
42 : static XLogRecPtr startpos = InvalidXLogRecPtr;
43 : static XLogRecPtr endpos = InvalidXLogRecPtr;
44 : static bool do_create_slot = false;
45 : static bool slot_exists_ok = false;
46 : static bool do_start_slot = false;
47 : static bool do_drop_slot = false;
48 : static char *replication_slot = NULL;
49 :
50 : /* filled pairwise with option, value. value may be NULL */
51 : static char **options;
52 : static size_t noptions = 0;
53 : static const char *plugin = "test_decoding";
54 :
55 : /* Global State */
56 : static int outfd = -1;
57 : static volatile sig_atomic_t time_to_abort = false;
58 : static volatile sig_atomic_t output_reopen = false;
59 : static bool output_isfile;
60 : static TimestampTz output_last_fsync = -1;
61 : static bool output_needs_fsync = false;
62 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
63 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
64 :
65 : static void usage(void);
66 : static void StreamLogicalLog(void);
67 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
68 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
69 : bool keepalive, XLogRecPtr lsn);
70 :
71 : static void
72 2 : usage(void)
73 : {
74 2 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
75 : progname);
76 2 : printf(_("Usage:\n"));
77 2 : printf(_(" %s [OPTION]...\n"), progname);
78 2 : printf(_("\nAction to be performed:\n"));
79 2 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
80 2 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
81 2 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
82 2 : printf(_("\nOptions:\n"));
83 2 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
84 2 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
85 2 : printf(_(" -F --fsync-interval=SECS\n"
86 : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
87 2 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
88 2 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
89 2 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
90 2 : printf(_(" -o, --option=NAME[=VALUE]\n"
91 : " pass option NAME with optional value VALUE to the\n"
92 : " output plugin\n"));
93 2 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
94 2 : printf(_(" -s, --status-interval=SECS\n"
95 : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
96 2 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
97 2 : printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
98 2 : printf(_(" -v, --verbose output verbose messages\n"));
99 2 : printf(_(" -V, --version output version information, then exit\n"));
100 2 : printf(_(" -?, --help show this help, then exit\n"));
101 2 : printf(_("\nConnection options:\n"));
102 2 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
103 2 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
104 2 : printf(_(" -p, --port=PORT database server port number\n"));
105 2 : printf(_(" -U, --username=NAME connect as specified database user\n"));
106 2 : printf(_(" -w, --no-password never prompt for password\n"));
107 2 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
108 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
109 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
110 2 : }
111 :
112 : /*
113 : * Send a Standby Status Update message to server.
114 : */
115 : static bool
116 50 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
117 : {
118 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
119 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
120 :
121 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
122 50 : int len = 0;
123 :
124 : /*
125 : * we normally don't want to send superfluous feedback, but if it's
126 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
127 : * us.
128 : */
129 50 : if (!force &&
130 0 : last_written_lsn == output_written_lsn &&
131 0 : last_fsync_lsn == output_fsync_lsn)
132 0 : return true;
133 :
134 50 : if (verbose)
135 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
136 : LSN_FORMAT_ARGS(output_written_lsn),
137 : LSN_FORMAT_ARGS(output_fsync_lsn),
138 : replication_slot);
139 :
140 50 : replybuf[len] = 'r';
141 50 : len += 1;
142 50 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
143 50 : len += 8;
144 50 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
145 50 : len += 8;
146 50 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
147 50 : len += 8;
148 50 : fe_sendint64(now, &replybuf[len]); /* sendTime */
149 50 : len += 8;
150 50 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
151 50 : len += 1;
152 :
153 50 : startpos = output_written_lsn;
154 50 : last_written_lsn = output_written_lsn;
155 50 : last_fsync_lsn = output_fsync_lsn;
156 :
157 50 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
158 : {
159 0 : pg_log_error("could not send feedback packet: %s",
160 : PQerrorMessage(conn));
161 0 : return false;
162 : }
163 :
164 50 : return true;
165 : }
166 :
167 : static void
168 94 : disconnect_atexit(void)
169 : {
170 94 : if (conn != NULL)
171 48 : PQfinish(conn);
172 94 : }
173 :
174 : static bool
175 50 : OutputFsync(TimestampTz now)
176 : {
177 50 : output_last_fsync = now;
178 :
179 50 : output_fsync_lsn = output_written_lsn;
180 :
181 50 : if (fsync_interval <= 0)
182 0 : return true;
183 :
184 50 : if (!output_needs_fsync)
185 38 : return true;
186 :
187 12 : output_needs_fsync = false;
188 :
189 : /* can only fsync if it's a regular file */
190 12 : if (!output_isfile)
191 8 : return true;
192 :
193 4 : if (fsync(outfd) != 0)
194 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
195 :
196 4 : return true;
197 : }
198 :
199 : /*
200 : * Start the log streaming
201 : */
202 : static void
203 46 : StreamLogicalLog(void)
204 : {
205 : PGresult *res;
206 46 : char *copybuf = NULL;
207 46 : TimestampTz last_status = -1;
208 : int i;
209 : PQExpBuffer query;
210 :
211 46 : output_written_lsn = InvalidXLogRecPtr;
212 46 : output_fsync_lsn = InvalidXLogRecPtr;
213 :
214 : /*
215 : * Connect in replication mode to the server
216 : */
217 46 : if (!conn)
218 0 : conn = GetConnection();
219 46 : if (!conn)
220 : /* Error message already written in GetConnection() */
221 0 : return;
222 :
223 : /*
224 : * Start the replication
225 : */
226 46 : if (verbose)
227 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
228 : LSN_FORMAT_ARGS(startpos),
229 : replication_slot);
230 :
231 : /* Initiate the replication stream at specified location */
232 46 : query = createPQExpBuffer();
233 46 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
234 46 : replication_slot, LSN_FORMAT_ARGS(startpos));
235 :
236 : /* print options if there are any */
237 46 : if (noptions)
238 40 : appendPQExpBufferStr(query, " (");
239 :
240 126 : for (i = 0; i < noptions; i++)
241 : {
242 : /* separator */
243 80 : if (i > 0)
244 40 : appendPQExpBufferStr(query, ", ");
245 :
246 : /* write option name */
247 80 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
248 :
249 : /* write option value if specified */
250 80 : if (options[(i * 2) + 1] != NULL)
251 80 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
252 : }
253 :
254 46 : if (noptions)
255 40 : appendPQExpBufferChar(query, ')');
256 :
257 46 : res = PQexec(conn, query->data);
258 46 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
259 : {
260 12 : pg_log_error("could not send replication command \"%s\": %s",
261 : query->data, PQresultErrorMessage(res));
262 12 : PQclear(res);
263 12 : goto error;
264 : }
265 34 : PQclear(res);
266 34 : resetPQExpBuffer(query);
267 :
268 34 : if (verbose)
269 0 : pg_log_info("streaming initiated");
270 :
271 1718 : while (!time_to_abort)
272 : {
273 : int r;
274 : int bytes_left;
275 : int bytes_written;
276 : TimestampTz now;
277 : int hdr_len;
278 1716 : XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
279 :
280 1716 : if (copybuf != NULL)
281 : {
282 892 : PQfreemem(copybuf);
283 892 : copybuf = NULL;
284 : }
285 :
286 : /*
287 : * Potentially send a status message to the primary.
288 : */
289 1716 : now = feGetCurrentTimestamp();
290 :
291 3398 : if (outfd != -1 &&
292 1682 : feTimestampDifferenceExceeds(output_last_fsync, now,
293 : fsync_interval))
294 : {
295 34 : if (!OutputFsync(now))
296 4 : goto error;
297 : }
298 :
299 3432 : if (standby_message_timeout > 0 &&
300 1716 : feTimestampDifferenceExceeds(last_status, now,
301 : standby_message_timeout))
302 : {
303 : /* Time to send feedback! */
304 34 : if (!sendFeedback(conn, now, true, false))
305 0 : goto error;
306 :
307 34 : last_status = now;
308 : }
309 :
310 : /* got SIGHUP, close output file */
311 1716 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
312 : {
313 0 : now = feGetCurrentTimestamp();
314 0 : if (!OutputFsync(now))
315 0 : goto error;
316 0 : close(outfd);
317 0 : outfd = -1;
318 : }
319 1716 : output_reopen = false;
320 :
321 : /* open the output file, if not open yet */
322 1716 : if (outfd == -1)
323 : {
324 : struct stat statbuf;
325 :
326 34 : if (strcmp(outfile, "-") == 0)
327 34 : outfd = fileno(stdout);
328 : else
329 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
330 : S_IRUSR | S_IWUSR);
331 34 : if (outfd == -1)
332 : {
333 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
334 0 : goto error;
335 : }
336 :
337 34 : if (fstat(outfd, &statbuf) != 0)
338 : {
339 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
340 0 : goto error;
341 : }
342 :
343 34 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
344 : }
345 :
346 1716 : r = PQgetCopyData(conn, ©buf, 1);
347 1716 : if (r == 0)
348 : {
349 : /*
350 : * In async mode, and no data available. We block on reading but
351 : * not more than the specified timeout, so that we can send a
352 : * response back to the client.
353 : */
354 : fd_set input_mask;
355 796 : TimestampTz message_target = 0;
356 796 : TimestampTz fsync_target = 0;
357 : struct timeval timeout;
358 796 : struct timeval *timeoutptr = NULL;
359 :
360 796 : if (PQsocket(conn) < 0)
361 : {
362 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
363 4 : goto error;
364 : }
365 :
366 796 : FD_ZERO(&input_mask);
367 796 : FD_SET(PQsocket(conn), &input_mask);
368 :
369 : /* Compute when we need to wakeup to send a keepalive message. */
370 796 : if (standby_message_timeout)
371 796 : message_target = last_status + (standby_message_timeout - 1) *
372 : ((int64) 1000);
373 :
374 : /* Compute when we need to wakeup to fsync the output file. */
375 796 : if (fsync_interval > 0 && output_needs_fsync)
376 182 : fsync_target = output_last_fsync + (fsync_interval - 1) *
377 : ((int64) 1000);
378 :
379 : /* Now compute when to wakeup. */
380 796 : if (message_target > 0 || fsync_target > 0)
381 : {
382 : TimestampTz targettime;
383 : long secs;
384 : int usecs;
385 :
386 796 : targettime = message_target;
387 :
388 796 : if (fsync_target > 0 && fsync_target < targettime)
389 0 : targettime = fsync_target;
390 :
391 796 : feTimestampDifference(now,
392 : targettime,
393 : &secs,
394 : &usecs);
395 796 : if (secs <= 0)
396 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
397 : else
398 796 : timeout.tv_sec = secs;
399 796 : timeout.tv_usec = usecs;
400 796 : timeoutptr = &timeout;
401 : }
402 :
403 796 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
404 796 : if (r == 0 || (r < 0 && errno == EINTR))
405 : {
406 : /*
407 : * Got a timeout or signal. Continue the loop and either
408 : * deliver a status packet to the server or just go back into
409 : * blocking.
410 : */
411 792 : continue;
412 : }
413 794 : else if (r < 0)
414 : {
415 0 : pg_log_error("%s() failed: %m", "select");
416 0 : goto error;
417 : }
418 :
419 : /* Else there is actually data on the socket */
420 794 : if (PQconsumeInput(conn) == 0)
421 : {
422 4 : pg_log_error("could not receive data from WAL stream: %s",
423 : PQerrorMessage(conn));
424 4 : goto error;
425 : }
426 790 : continue;
427 : }
428 :
429 : /* End of copy stream */
430 920 : if (r == -1)
431 28 : break;
432 :
433 : /* Failure while reading the copy stream */
434 906 : if (r == -2)
435 : {
436 0 : pg_log_error("could not read COPY data: %s",
437 : PQerrorMessage(conn));
438 0 : goto error;
439 : }
440 :
441 : /* Check the message type. */
442 906 : if (copybuf[0] == 'k')
443 : {
444 : int pos;
445 : bool replyRequested;
446 : XLogRecPtr walEnd;
447 730 : bool endposReached = false;
448 :
449 : /*
450 : * Parse the keepalive message, enclosed in the CopyData message.
451 : * We just check if the server requested a reply, and ignore the
452 : * rest.
453 : */
454 730 : pos = 1; /* skip msgtype 'k' */
455 730 : walEnd = fe_recvint64(©buf[pos]);
456 730 : output_written_lsn = Max(walEnd, output_written_lsn);
457 :
458 730 : pos += 8; /* read walEnd */
459 :
460 730 : pos += 8; /* skip sendTime */
461 :
462 730 : if (r < pos + 1)
463 : {
464 0 : pg_log_error("streaming header too small: %d", r);
465 0 : goto error;
466 : }
467 730 : replyRequested = copybuf[pos];
468 :
469 730 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
470 : {
471 : /*
472 : * If there's nothing to read on the socket until a keepalive
473 : * we know that the server has nothing to send us; and if
474 : * walEnd has passed endpos, we know nothing else can have
475 : * committed before endpos. So we can bail out now.
476 : */
477 4 : endposReached = true;
478 : }
479 :
480 : /* Send a reply, if necessary */
481 730 : if (replyRequested || endposReached)
482 : {
483 6 : if (!flushAndSendFeedback(conn, &now))
484 0 : goto error;
485 6 : last_status = now;
486 : }
487 :
488 730 : if (endposReached)
489 : {
490 4 : prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
491 4 : time_to_abort = true;
492 4 : break;
493 : }
494 :
495 726 : continue;
496 : }
497 176 : else if (copybuf[0] != 'w')
498 : {
499 0 : pg_log_error("unrecognized streaming header: \"%c\"",
500 : copybuf[0]);
501 0 : goto error;
502 : }
503 :
504 : /*
505 : * Read the header of the XLogData message, enclosed in the CopyData
506 : * message. We only need the WAL location field (dataStart), the rest
507 : * of the header is ignored.
508 : */
509 176 : hdr_len = 1; /* msgtype 'w' */
510 176 : hdr_len += 8; /* dataStart */
511 176 : hdr_len += 8; /* walEnd */
512 176 : hdr_len += 8; /* sendTime */
513 176 : if (r < hdr_len + 1)
514 : {
515 0 : pg_log_error("streaming header too small: %d", r);
516 0 : goto error;
517 : }
518 :
519 : /* Extract WAL location for this block */
520 176 : cur_record_lsn = fe_recvint64(©buf[1]);
521 :
522 176 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
523 : {
524 : /*
525 : * We've read past our endpoint, so prepare to go away being
526 : * cautious about what happens to our output data.
527 : */
528 0 : if (!flushAndSendFeedback(conn, &now))
529 0 : goto error;
530 0 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
531 0 : time_to_abort = true;
532 0 : break;
533 : }
534 :
535 176 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
536 :
537 176 : bytes_left = r - hdr_len;
538 176 : bytes_written = 0;
539 :
540 : /* signal that a fsync is needed */
541 176 : output_needs_fsync = true;
542 :
543 352 : while (bytes_left)
544 : {
545 : int ret;
546 :
547 352 : ret = write(outfd,
548 176 : copybuf + hdr_len + bytes_written,
549 : bytes_left);
550 :
551 176 : if (ret < 0)
552 : {
553 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
554 : bytes_left, outfile);
555 0 : goto error;
556 : }
557 :
558 : /* Write was successful, advance our position */
559 176 : bytes_written += ret;
560 176 : bytes_left -= ret;
561 : }
562 :
563 176 : if (write(outfd, "\n", 1) != 1)
564 : {
565 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
566 : 1, outfile);
567 0 : goto error;
568 : }
569 :
570 176 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
571 : {
572 : /* endpos was exactly the record we just processed, we're done */
573 10 : if (!flushAndSendFeedback(conn, &now))
574 0 : goto error;
575 10 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
576 10 : time_to_abort = true;
577 10 : break;
578 : }
579 : }
580 :
581 30 : res = PQgetResult(conn);
582 30 : if (PQresultStatus(res) == PGRES_COPY_OUT)
583 : {
584 14 : PQclear(res);
585 :
586 : /*
587 : * We're doing a client-initiated clean exit and have sent CopyDone to
588 : * the server. Drain any messages, so we don't miss a last-minute
589 : * ErrorResponse. The walsender stops generating XLogData records once
590 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
591 : * it's too late for sendFeedback(), even if this were to take a long
592 : * time. Hence, use synchronous-mode PQgetCopyData().
593 : */
594 : while (1)
595 102 : {
596 : int r;
597 :
598 116 : if (copybuf != NULL)
599 : {
600 116 : PQfreemem(copybuf);
601 116 : copybuf = NULL;
602 : }
603 116 : r = PQgetCopyData(conn, ©buf, 0);
604 116 : if (r == -1)
605 14 : break;
606 102 : if (r == -2)
607 : {
608 0 : pg_log_error("could not read COPY data: %s",
609 : PQerrorMessage(conn));
610 0 : time_to_abort = false; /* unclean exit */
611 0 : goto error;
612 : }
613 : }
614 :
615 14 : res = PQgetResult(conn);
616 : }
617 30 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
618 : {
619 14 : pg_log_error("unexpected termination of replication stream: %s",
620 : PQresultErrorMessage(res));
621 14 : goto error;
622 : }
623 16 : PQclear(res);
624 :
625 16 : if (outfd != -1 && strcmp(outfile, "-") != 0)
626 : {
627 0 : TimestampTz t = feGetCurrentTimestamp();
628 :
629 : /* no need to jump to error on failure here, we're finishing anyway */
630 0 : OutputFsync(t);
631 :
632 0 : if (close(outfd) != 0)
633 0 : pg_log_error("could not close file \"%s\": %m", outfile);
634 : }
635 16 : outfd = -1;
636 46 : error:
637 46 : if (copybuf != NULL)
638 : {
639 0 : PQfreemem(copybuf);
640 0 : copybuf = NULL;
641 : }
642 46 : destroyPQExpBuffer(query);
643 46 : PQfinish(conn);
644 46 : conn = NULL;
645 : }
646 :
647 : /*
648 : * Unfortunately we can't do sensible signal handling on windows...
649 : */
650 : #ifndef WIN32
651 :
652 : /*
653 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
654 : * possible moment.
655 : */
656 : static void
657 2 : sigexit_handler(SIGNAL_ARGS)
658 : {
659 2 : time_to_abort = true;
660 2 : }
661 :
662 : /*
663 : * Trigger the output file to be reopened.
664 : */
665 : static void
666 0 : sighup_handler(SIGNAL_ARGS)
667 : {
668 0 : output_reopen = true;
669 0 : }
670 : #endif
671 :
672 :
673 : int
674 110 : main(int argc, char **argv)
675 : {
676 : static struct option long_options[] = {
677 : /* general options */
678 : {"file", required_argument, NULL, 'f'},
679 : {"fsync-interval", required_argument, NULL, 'F'},
680 : {"no-loop", no_argument, NULL, 'n'},
681 : {"verbose", no_argument, NULL, 'v'},
682 : {"two-phase", no_argument, NULL, 't'},
683 : {"version", no_argument, NULL, 'V'},
684 : {"help", no_argument, NULL, '?'},
685 : /* connection options */
686 : {"dbname", required_argument, NULL, 'd'},
687 : {"host", required_argument, NULL, 'h'},
688 : {"port", required_argument, NULL, 'p'},
689 : {"username", required_argument, NULL, 'U'},
690 : {"no-password", no_argument, NULL, 'w'},
691 : {"password", no_argument, NULL, 'W'},
692 : /* replication options */
693 : {"startpos", required_argument, NULL, 'I'},
694 : {"endpos", required_argument, NULL, 'E'},
695 : {"option", required_argument, NULL, 'o'},
696 : {"plugin", required_argument, NULL, 'P'},
697 : {"status-interval", required_argument, NULL, 's'},
698 : {"slot", required_argument, NULL, 'S'},
699 : /* action */
700 : {"create-slot", no_argument, NULL, 1},
701 : {"start", no_argument, NULL, 2},
702 : {"drop-slot", no_argument, NULL, 3},
703 : {"if-not-exists", no_argument, NULL, 4},
704 : {NULL, 0, NULL, 0}
705 : };
706 : int c;
707 : int option_index;
708 : uint32 hi,
709 : lo;
710 : char *db_name;
711 :
712 110 : pg_logging_init(argv[0]);
713 110 : progname = get_progname(argv[0]);
714 110 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
715 :
716 110 : if (argc > 1)
717 : {
718 108 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
719 : {
720 2 : usage();
721 2 : exit(0);
722 : }
723 106 : else if (strcmp(argv[1], "-V") == 0 ||
724 106 : strcmp(argv[1], "--version") == 0)
725 : {
726 2 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
727 2 : exit(0);
728 : }
729 : }
730 :
731 642 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
732 : long_options, &option_index)) != -1)
733 : {
734 538 : switch (c)
735 : {
736 : /* general options */
737 48 : case 'f':
738 48 : outfile = pg_strdup(optarg);
739 48 : break;
740 0 : case 'F':
741 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
742 : INT_MAX / 1000,
743 : &fsync_interval))
744 0 : exit(1);
745 0 : fsync_interval *= 1000;
746 0 : break;
747 46 : case 'n':
748 46 : noloop = 1;
749 46 : break;
750 4 : case 't':
751 4 : two_phase = true;
752 4 : break;
753 0 : case 'v':
754 0 : verbose++;
755 0 : break;
756 : /* connection options */
757 100 : case 'd':
758 100 : dbname = pg_strdup(optarg);
759 100 : break;
760 0 : case 'h':
761 0 : dbhost = pg_strdup(optarg);
762 0 : break;
763 0 : case 'p':
764 0 : dbport = pg_strdup(optarg);
765 0 : break;
766 0 : case 'U':
767 0 : dbuser = pg_strdup(optarg);
768 0 : break;
769 0 : case 'w':
770 0 : dbgetpassword = -1;
771 0 : break;
772 0 : case 'W':
773 0 : dbgetpassword = 1;
774 0 : break;
775 : /* replication options */
776 0 : case 'I':
777 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
778 0 : pg_fatal("could not parse start position \"%s\"", optarg);
779 0 : startpos = ((uint64) hi) << 32 | lo;
780 0 : break;
781 16 : case 'E':
782 16 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
783 0 : pg_fatal("could not parse end position \"%s\"", optarg);
784 16 : endpos = ((uint64) hi) << 32 | lo;
785 16 : break;
786 80 : case 'o':
787 : {
788 80 : char *data = pg_strdup(optarg);
789 80 : char *val = strchr(data, '=');
790 :
791 80 : if (val != NULL)
792 : {
793 : /* remove =; separate data from val */
794 80 : *val = '\0';
795 80 : val++;
796 : }
797 :
798 80 : noptions += 1;
799 80 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
800 :
801 80 : options[(noptions - 1) * 2] = data;
802 80 : options[(noptions - 1) * 2 + 1] = val;
803 : }
804 :
805 80 : break;
806 42 : case 'P':
807 42 : plugin = pg_strdup(optarg);
808 42 : break;
809 0 : case 's':
810 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
811 : INT_MAX / 1000,
812 : &standby_message_timeout))
813 0 : exit(1);
814 0 : standby_message_timeout *= 1000;
815 0 : break;
816 102 : case 'S':
817 102 : replication_slot = pg_strdup(optarg);
818 102 : break;
819 : /* action */
820 46 : case 1:
821 46 : do_create_slot = true;
822 46 : break;
823 50 : case 2:
824 50 : do_start_slot = true;
825 50 : break;
826 2 : case 3:
827 2 : do_drop_slot = true;
828 2 : break;
829 0 : case 4:
830 0 : slot_exists_ok = true;
831 0 : break;
832 :
833 2 : default:
834 : /* getopt_long already emitted a complaint */
835 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
836 2 : exit(1);
837 : }
838 : }
839 :
840 : /*
841 : * Any non-option arguments?
842 : */
843 104 : if (optind < argc)
844 : {
845 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
846 : argv[optind]);
847 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
848 0 : exit(1);
849 : }
850 :
851 : /*
852 : * Required arguments
853 : */
854 104 : if (replication_slot == NULL)
855 : {
856 2 : pg_log_error("no slot specified");
857 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
858 2 : exit(1);
859 : }
860 :
861 102 : if (do_start_slot && outfile == NULL)
862 : {
863 2 : pg_log_error("no target file specified");
864 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
865 2 : exit(1);
866 : }
867 :
868 100 : if (!do_drop_slot && dbname == NULL)
869 : {
870 2 : pg_log_error("no database specified");
871 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
872 2 : exit(1);
873 : }
874 :
875 98 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
876 : {
877 2 : pg_log_error("at least one action needs to be specified");
878 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
879 2 : exit(1);
880 : }
881 :
882 96 : if (do_drop_slot && (do_create_slot || do_start_slot))
883 : {
884 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
885 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
886 0 : exit(1);
887 : }
888 :
889 96 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
890 : {
891 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
892 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
893 0 : exit(1);
894 : }
895 :
896 96 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
897 : {
898 0 : pg_log_error("--endpos may only be specified with --start");
899 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
900 0 : exit(1);
901 : }
902 :
903 96 : if (two_phase && !do_create_slot)
904 : {
905 2 : pg_log_error("--two-phase may only be specified with --create-slot");
906 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
907 2 : exit(1);
908 : }
909 :
910 : /*
911 : * Obtain a connection to server. Notably, if we need a password, we want
912 : * to collect it from the user immediately.
913 : */
914 94 : conn = GetConnection();
915 94 : if (!conn)
916 : /* Error message already written in GetConnection() */
917 0 : exit(1);
918 94 : atexit(disconnect_atexit);
919 :
920 : /*
921 : * Trap signals. (Don't do this until after the initial password prompt,
922 : * if one is needed, in GetConnection.)
923 : */
924 : #ifndef WIN32
925 94 : pqsignal(SIGINT, sigexit_handler);
926 94 : pqsignal(SIGTERM, sigexit_handler);
927 94 : pqsignal(SIGHUP, sighup_handler);
928 : #endif
929 :
930 : /*
931 : * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
932 : * replication connection.
933 : */
934 94 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
935 0 : exit(1);
936 :
937 94 : if (db_name == NULL)
938 0 : pg_fatal("could not establish database-specific replication connection");
939 :
940 : /*
941 : * Set umask so that directories/files are created with the same
942 : * permissions as directories/files in the source data directory.
943 : *
944 : * pg_mode_mask is set to owner-only by default and then updated in
945 : * GetConnection() where we get the mode from the server-side with
946 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
947 : */
948 94 : umask(pg_mode_mask);
949 :
950 : /* Drop a replication slot. */
951 94 : if (do_drop_slot)
952 : {
953 2 : if (verbose)
954 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
955 :
956 2 : if (!DropReplicationSlot(conn, replication_slot))
957 0 : exit(1);
958 : }
959 :
960 : /* Create a replication slot. */
961 94 : if (do_create_slot)
962 : {
963 46 : if (verbose)
964 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
965 :
966 46 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
967 : false, false, slot_exists_ok, two_phase))
968 0 : exit(1);
969 46 : startpos = InvalidXLogRecPtr;
970 : }
971 :
972 94 : if (!do_start_slot)
973 48 : exit(0);
974 :
975 : /* Stream loop */
976 : while (true)
977 : {
978 46 : StreamLogicalLog();
979 46 : if (time_to_abort)
980 : {
981 : /*
982 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
983 : * not an error, so exit without an errorcode.
984 : */
985 16 : exit(0);
986 : }
987 30 : else if (noloop)
988 30 : pg_fatal("disconnected");
989 : else
990 : {
991 : /* translator: check source for value for %d */
992 0 : pg_log_info("disconnected; waiting %d seconds to try again",
993 : RECONNECT_SLEEP_TIME);
994 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
995 : }
996 : }
997 : }
998 :
999 : /*
1000 : * Fsync our output data, and send a feedback message to the server. Returns
1001 : * true if successful, false otherwise.
1002 : *
1003 : * If successful, *now is updated to the current timestamp just before sending
1004 : * feedback.
1005 : */
1006 : static bool
1007 16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1008 : {
1009 : /* flush data to disk, so that we send a recent flush pointer */
1010 16 : if (!OutputFsync(*now))
1011 0 : return false;
1012 16 : *now = feGetCurrentTimestamp();
1013 16 : if (!sendFeedback(conn, *now, true, false))
1014 0 : return false;
1015 :
1016 16 : return true;
1017 : }
1018 :
1019 : /*
1020 : * Try to inform the server about our upcoming demise, but don't wait around or
1021 : * retry on failure.
1022 : */
1023 : static void
1024 14 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1025 : {
1026 14 : (void) PQputCopyEnd(conn, NULL);
1027 14 : (void) PQflush(conn);
1028 :
1029 14 : if (verbose)
1030 : {
1031 0 : if (keepalive)
1032 0 : pg_log_info("end position %X/%X reached by keepalive",
1033 : LSN_FORMAT_ARGS(endpos));
1034 : else
1035 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1036 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1037 : }
1038 14 : }
|