Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "access/xlog_internal.h"
22 : #include "common/fe_memutils.h"
23 : #include "common/file_perm.h"
24 : #include "common/logging.h"
25 : #include "fe_utils/option_utils.h"
26 : #include "getopt_long.h"
27 : #include "libpq-fe.h"
28 : #include "libpq/pqsignal.h"
29 : #include "pqexpbuffer.h"
30 : #include "streamutil.h"
31 :
32 : /* Time to sleep between reconnection attempts */
33 : #define RECONNECT_SLEEP_TIME 5
34 :
35 : typedef enum
36 : {
37 : STREAM_STOP_NONE,
38 : STREAM_STOP_END_OF_WAL,
39 : STREAM_STOP_KEEPALIVE,
40 : STREAM_STOP_SIGNAL
41 : } StreamStopReason;
42 :
43 : /* Global Options */
44 : static char *outfile = NULL;
45 : static int verbose = 0;
46 : static bool two_phase = false;
47 : static int noloop = 0;
48 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 : static XLogRecPtr startpos = InvalidXLogRecPtr;
51 : static XLogRecPtr endpos = InvalidXLogRecPtr;
52 : static bool do_create_slot = false;
53 : static bool slot_exists_ok = false;
54 : static bool do_start_slot = false;
55 : static bool do_drop_slot = false;
56 : static char *replication_slot = NULL;
57 :
58 : /* filled pairwise with option, value. value may be NULL */
59 : static char **options;
60 : static size_t noptions = 0;
61 : static const char *plugin = "test_decoding";
62 :
63 : /* Global State */
64 : static int outfd = -1;
65 : static volatile sig_atomic_t time_to_abort = false;
66 : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 : static volatile sig_atomic_t output_reopen = false;
68 : static bool output_isfile;
69 : static TimestampTz output_last_fsync = -1;
70 : static bool output_needs_fsync = false;
71 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
72 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
73 :
74 : static void usage(void);
75 : static void StreamLogicalLog(void);
76 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
77 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
78 : StreamStopReason reason,
79 : XLogRecPtr lsn);
80 :
81 : static void
82 2 : usage(void)
83 : {
84 2 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 : progname);
86 2 : printf(_("Usage:\n"));
87 2 : printf(_(" %s [OPTION]...\n"), progname);
88 2 : printf(_("\nAction to be performed:\n"));
89 2 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 2 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 2 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
92 2 : printf(_("\nOptions:\n"));
93 2 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
94 2 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
95 2 : printf(_(" -F --fsync-interval=SECS\n"
96 : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
97 2 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
98 2 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
99 2 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
100 2 : printf(_(" -o, --option=NAME[=VALUE]\n"
101 : " pass option NAME with optional value VALUE to the\n"
102 : " output plugin\n"));
103 2 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
104 2 : printf(_(" -s, --status-interval=SECS\n"
105 : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
106 2 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
107 2 : printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
108 2 : printf(_(" -v, --verbose output verbose messages\n"));
109 2 : printf(_(" -V, --version output version information, then exit\n"));
110 2 : printf(_(" -?, --help show this help, then exit\n"));
111 2 : printf(_("\nConnection options:\n"));
112 2 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
113 2 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
114 2 : printf(_(" -p, --port=PORT database server port number\n"));
115 2 : printf(_(" -U, --username=NAME connect as specified database user\n"));
116 2 : printf(_(" -w, --no-password never prompt for password\n"));
117 2 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
118 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
119 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
120 2 : }
121 :
122 : /*
123 : * Send a Standby Status Update message to server.
124 : */
125 : static bool
126 50 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
127 : {
128 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
129 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
130 :
131 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
132 50 : int len = 0;
133 :
134 : /*
135 : * we normally don't want to send superfluous feedback, but if it's
136 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
137 : * us.
138 : */
139 50 : if (!force &&
140 0 : last_written_lsn == output_written_lsn &&
141 0 : last_fsync_lsn == output_fsync_lsn)
142 0 : return true;
143 :
144 50 : if (verbose)
145 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
146 : LSN_FORMAT_ARGS(output_written_lsn),
147 : LSN_FORMAT_ARGS(output_fsync_lsn),
148 : replication_slot);
149 :
150 50 : replybuf[len] = 'r';
151 50 : len += 1;
152 50 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
153 50 : len += 8;
154 50 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
155 50 : len += 8;
156 50 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
157 50 : len += 8;
158 50 : fe_sendint64(now, &replybuf[len]); /* sendTime */
159 50 : len += 8;
160 50 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
161 50 : len += 1;
162 :
163 50 : startpos = output_written_lsn;
164 50 : last_written_lsn = output_written_lsn;
165 50 : last_fsync_lsn = output_fsync_lsn;
166 :
167 50 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
168 : {
169 0 : pg_log_error("could not send feedback packet: %s",
170 : PQerrorMessage(conn));
171 0 : return false;
172 : }
173 :
174 50 : return true;
175 : }
176 :
177 : static void
178 94 : disconnect_atexit(void)
179 : {
180 94 : if (conn != NULL)
181 48 : PQfinish(conn);
182 94 : }
183 :
184 : static bool
185 50 : OutputFsync(TimestampTz now)
186 : {
187 50 : output_last_fsync = now;
188 :
189 50 : output_fsync_lsn = output_written_lsn;
190 :
191 50 : if (fsync_interval <= 0)
192 0 : return true;
193 :
194 50 : if (!output_needs_fsync)
195 38 : return true;
196 :
197 12 : output_needs_fsync = false;
198 :
199 : /* can only fsync if it's a regular file */
200 12 : if (!output_isfile)
201 8 : return true;
202 :
203 4 : if (fsync(outfd) != 0)
204 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
205 :
206 4 : return true;
207 : }
208 :
209 : /*
210 : * Start the log streaming
211 : */
212 : static void
213 46 : StreamLogicalLog(void)
214 : {
215 : PGresult *res;
216 46 : char *copybuf = NULL;
217 46 : TimestampTz last_status = -1;
218 : int i;
219 : PQExpBuffer query;
220 : XLogRecPtr cur_record_lsn;
221 :
222 46 : output_written_lsn = InvalidXLogRecPtr;
223 46 : output_fsync_lsn = InvalidXLogRecPtr;
224 46 : cur_record_lsn = InvalidXLogRecPtr;
225 :
226 : /*
227 : * Connect in replication mode to the server
228 : */
229 46 : if (!conn)
230 0 : conn = GetConnection();
231 46 : if (!conn)
232 : /* Error message already written in GetConnection() */
233 0 : return;
234 :
235 : /*
236 : * Start the replication
237 : */
238 46 : if (verbose)
239 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
240 : LSN_FORMAT_ARGS(startpos),
241 : replication_slot);
242 :
243 : /* Initiate the replication stream at specified location */
244 46 : query = createPQExpBuffer();
245 46 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
246 46 : replication_slot, LSN_FORMAT_ARGS(startpos));
247 :
248 : /* print options if there are any */
249 46 : if (noptions)
250 40 : appendPQExpBufferStr(query, " (");
251 :
252 126 : for (i = 0; i < noptions; i++)
253 : {
254 : /* separator */
255 80 : if (i > 0)
256 40 : appendPQExpBufferStr(query, ", ");
257 :
258 : /* write option name */
259 80 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
260 :
261 : /* write option value if specified */
262 80 : if (options[(i * 2) + 1] != NULL)
263 80 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
264 : }
265 :
266 46 : if (noptions)
267 40 : appendPQExpBufferChar(query, ')');
268 :
269 46 : res = PQexec(conn, query->data);
270 46 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
271 : {
272 12 : pg_log_error("could not send replication command \"%s\": %s",
273 : query->data, PQresultErrorMessage(res));
274 12 : PQclear(res);
275 12 : goto error;
276 : }
277 34 : PQclear(res);
278 34 : resetPQExpBuffer(query);
279 :
280 34 : if (verbose)
281 0 : pg_log_info("streaming initiated");
282 :
283 1628 : while (!time_to_abort)
284 : {
285 : int r;
286 : int bytes_left;
287 : int bytes_written;
288 : TimestampTz now;
289 : int hdr_len;
290 :
291 1626 : cur_record_lsn = InvalidXLogRecPtr;
292 :
293 1626 : if (copybuf != NULL)
294 : {
295 854 : PQfreemem(copybuf);
296 854 : copybuf = NULL;
297 : }
298 :
299 : /*
300 : * Potentially send a status message to the primary.
301 : */
302 1626 : now = feGetCurrentTimestamp();
303 :
304 3218 : if (outfd != -1 &&
305 1592 : feTimestampDifferenceExceeds(output_last_fsync, now,
306 : fsync_interval))
307 : {
308 34 : if (!OutputFsync(now))
309 4 : goto error;
310 : }
311 :
312 3252 : if (standby_message_timeout > 0 &&
313 1626 : feTimestampDifferenceExceeds(last_status, now,
314 : standby_message_timeout))
315 : {
316 : /* Time to send feedback! */
317 34 : if (!sendFeedback(conn, now, true, false))
318 0 : goto error;
319 :
320 34 : last_status = now;
321 : }
322 :
323 : /* got SIGHUP, close output file */
324 1626 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
325 : {
326 0 : now = feGetCurrentTimestamp();
327 0 : if (!OutputFsync(now))
328 0 : goto error;
329 0 : close(outfd);
330 0 : outfd = -1;
331 : }
332 1626 : output_reopen = false;
333 :
334 : /* open the output file, if not open yet */
335 1626 : if (outfd == -1)
336 : {
337 : struct stat statbuf;
338 :
339 34 : if (strcmp(outfile, "-") == 0)
340 34 : outfd = fileno(stdout);
341 : else
342 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
343 : S_IRUSR | S_IWUSR);
344 34 : if (outfd == -1)
345 : {
346 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
347 0 : goto error;
348 : }
349 :
350 34 : if (fstat(outfd, &statbuf) != 0)
351 : {
352 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
353 0 : goto error;
354 : }
355 :
356 34 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
357 : }
358 :
359 1626 : r = PQgetCopyData(conn, ©buf, 1);
360 1626 : if (r == 0)
361 : {
362 : /*
363 : * In async mode, and no data available. We block on reading but
364 : * not more than the specified timeout, so that we can send a
365 : * response back to the client.
366 : */
367 : fd_set input_mask;
368 744 : TimestampTz message_target = 0;
369 744 : TimestampTz fsync_target = 0;
370 : struct timeval timeout;
371 744 : struct timeval *timeoutptr = NULL;
372 :
373 744 : if (PQsocket(conn) < 0)
374 : {
375 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
376 4 : goto error;
377 : }
378 :
379 744 : FD_ZERO(&input_mask);
380 744 : FD_SET(PQsocket(conn), &input_mask);
381 :
382 : /* Compute when we need to wakeup to send a keepalive message. */
383 744 : if (standby_message_timeout)
384 744 : message_target = last_status + (standby_message_timeout - 1) *
385 : ((int64) 1000);
386 :
387 : /* Compute when we need to wakeup to fsync the output file. */
388 744 : if (fsync_interval > 0 && output_needs_fsync)
389 186 : fsync_target = output_last_fsync + (fsync_interval - 1) *
390 : ((int64) 1000);
391 :
392 : /* Now compute when to wakeup. */
393 744 : if (message_target > 0 || fsync_target > 0)
394 : {
395 : TimestampTz targettime;
396 : long secs;
397 : int usecs;
398 :
399 744 : targettime = message_target;
400 :
401 744 : if (fsync_target > 0 && fsync_target < targettime)
402 0 : targettime = fsync_target;
403 :
404 744 : feTimestampDifference(now,
405 : targettime,
406 : &secs,
407 : &usecs);
408 744 : if (secs <= 0)
409 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
410 : else
411 744 : timeout.tv_sec = secs;
412 744 : timeout.tv_usec = usecs;
413 744 : timeoutptr = &timeout;
414 : }
415 :
416 744 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
417 744 : if (r == 0 || (r < 0 && errno == EINTR))
418 : {
419 : /*
420 : * Got a timeout or signal. Continue the loop and either
421 : * deliver a status packet to the server or just go back into
422 : * blocking.
423 : */
424 740 : continue;
425 : }
426 742 : else if (r < 0)
427 : {
428 0 : pg_log_error("%s() failed: %m", "select");
429 0 : goto error;
430 : }
431 :
432 : /* Else there is actually data on the socket */
433 742 : if (PQconsumeInput(conn) == 0)
434 : {
435 4 : pg_log_error("could not receive data from WAL stream: %s",
436 : PQerrorMessage(conn));
437 4 : goto error;
438 : }
439 738 : continue;
440 : }
441 :
442 : /* End of copy stream */
443 882 : if (r == -1)
444 28 : break;
445 :
446 : /* Failure while reading the copy stream */
447 868 : if (r == -2)
448 : {
449 0 : pg_log_error("could not read COPY data: %s",
450 : PQerrorMessage(conn));
451 0 : goto error;
452 : }
453 :
454 : /* Check the message type. */
455 868 : if (copybuf[0] == 'k')
456 : {
457 : int pos;
458 : bool replyRequested;
459 : XLogRecPtr walEnd;
460 692 : bool endposReached = false;
461 :
462 : /*
463 : * Parse the keepalive message, enclosed in the CopyData message.
464 : * We just check if the server requested a reply, and ignore the
465 : * rest.
466 : */
467 692 : pos = 1; /* skip msgtype 'k' */
468 692 : walEnd = fe_recvint64(©buf[pos]);
469 692 : output_written_lsn = Max(walEnd, output_written_lsn);
470 :
471 692 : pos += 8; /* read walEnd */
472 :
473 692 : pos += 8; /* skip sendTime */
474 :
475 692 : if (r < pos + 1)
476 : {
477 0 : pg_log_error("streaming header too small: %d", r);
478 0 : goto error;
479 : }
480 692 : replyRequested = copybuf[pos];
481 :
482 692 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
483 : {
484 : /*
485 : * If there's nothing to read on the socket until a keepalive
486 : * we know that the server has nothing to send us; and if
487 : * walEnd has passed endpos, we know nothing else can have
488 : * committed before endpos. So we can bail out now.
489 : */
490 4 : endposReached = true;
491 : }
492 :
493 : /* Send a reply, if necessary */
494 692 : if (replyRequested || endposReached)
495 : {
496 6 : if (!flushAndSendFeedback(conn, &now))
497 0 : goto error;
498 6 : last_status = now;
499 : }
500 :
501 692 : if (endposReached)
502 : {
503 4 : stop_reason = STREAM_STOP_KEEPALIVE;
504 4 : time_to_abort = true;
505 4 : break;
506 : }
507 :
508 688 : continue;
509 : }
510 176 : else if (copybuf[0] != 'w')
511 : {
512 0 : pg_log_error("unrecognized streaming header: \"%c\"",
513 : copybuf[0]);
514 0 : goto error;
515 : }
516 :
517 : /*
518 : * Read the header of the XLogData message, enclosed in the CopyData
519 : * message. We only need the WAL location field (dataStart), the rest
520 : * of the header is ignored.
521 : */
522 176 : hdr_len = 1; /* msgtype 'w' */
523 176 : hdr_len += 8; /* dataStart */
524 176 : hdr_len += 8; /* walEnd */
525 176 : hdr_len += 8; /* sendTime */
526 176 : if (r < hdr_len + 1)
527 : {
528 0 : pg_log_error("streaming header too small: %d", r);
529 0 : goto error;
530 : }
531 :
532 : /* Extract WAL location for this block */
533 176 : cur_record_lsn = fe_recvint64(©buf[1]);
534 :
535 176 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
536 : {
537 : /*
538 : * We've read past our endpoint, so prepare to go away being
539 : * cautious about what happens to our output data.
540 : */
541 0 : if (!flushAndSendFeedback(conn, &now))
542 0 : goto error;
543 0 : stop_reason = STREAM_STOP_END_OF_WAL;
544 0 : time_to_abort = true;
545 0 : break;
546 : }
547 :
548 176 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
549 :
550 176 : bytes_left = r - hdr_len;
551 176 : bytes_written = 0;
552 :
553 : /* signal that a fsync is needed */
554 176 : output_needs_fsync = true;
555 :
556 352 : while (bytes_left)
557 : {
558 : int ret;
559 :
560 352 : ret = write(outfd,
561 176 : copybuf + hdr_len + bytes_written,
562 : bytes_left);
563 :
564 176 : if (ret < 0)
565 : {
566 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
567 : bytes_left, outfile);
568 0 : goto error;
569 : }
570 :
571 : /* Write was successful, advance our position */
572 176 : bytes_written += ret;
573 176 : bytes_left -= ret;
574 : }
575 :
576 176 : if (write(outfd, "\n", 1) != 1)
577 : {
578 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
579 : 1, outfile);
580 0 : goto error;
581 : }
582 :
583 176 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
584 : {
585 : /* endpos was exactly the record we just processed, we're done */
586 10 : if (!flushAndSendFeedback(conn, &now))
587 0 : goto error;
588 10 : stop_reason = STREAM_STOP_END_OF_WAL;
589 10 : time_to_abort = true;
590 10 : break;
591 : }
592 : }
593 :
594 : /* Clean up connection state if stream has been aborted */
595 30 : if (time_to_abort)
596 16 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
597 :
598 30 : res = PQgetResult(conn);
599 30 : if (PQresultStatus(res) == PGRES_COPY_OUT)
600 : {
601 16 : PQclear(res);
602 :
603 : /*
604 : * We're doing a client-initiated clean exit and have sent CopyDone to
605 : * the server. Drain any messages, so we don't miss a last-minute
606 : * ErrorResponse. The walsender stops generating XLogData records once
607 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
608 : * it's too late for sendFeedback(), even if this were to take a long
609 : * time. Hence, use synchronous-mode PQgetCopyData().
610 : */
611 : while (1)
612 4 : {
613 : int r;
614 :
615 20 : if (copybuf != NULL)
616 : {
617 18 : PQfreemem(copybuf);
618 18 : copybuf = NULL;
619 : }
620 20 : r = PQgetCopyData(conn, ©buf, 0);
621 20 : if (r == -1)
622 16 : break;
623 4 : if (r == -2)
624 : {
625 0 : pg_log_error("could not read COPY data: %s",
626 : PQerrorMessage(conn));
627 0 : time_to_abort = false; /* unclean exit */
628 0 : goto error;
629 : }
630 : }
631 :
632 16 : res = PQgetResult(conn);
633 : }
634 30 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
635 : {
636 12 : pg_log_error("unexpected termination of replication stream: %s",
637 : PQresultErrorMessage(res));
638 12 : goto error;
639 : }
640 18 : PQclear(res);
641 :
642 18 : if (outfd != -1 && strcmp(outfile, "-") != 0)
643 : {
644 0 : TimestampTz t = feGetCurrentTimestamp();
645 :
646 : /* no need to jump to error on failure here, we're finishing anyway */
647 0 : OutputFsync(t);
648 :
649 0 : if (close(outfd) != 0)
650 0 : pg_log_error("could not close file \"%s\": %m", outfile);
651 : }
652 18 : outfd = -1;
653 46 : error:
654 46 : if (copybuf != NULL)
655 : {
656 0 : PQfreemem(copybuf);
657 0 : copybuf = NULL;
658 : }
659 46 : destroyPQExpBuffer(query);
660 46 : PQfinish(conn);
661 46 : conn = NULL;
662 : }
663 :
664 : /*
665 : * Unfortunately we can't do sensible signal handling on windows...
666 : */
667 : #ifndef WIN32
668 :
669 : /*
670 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
671 : * possible moment.
672 : */
673 : static void
674 2 : sigexit_handler(SIGNAL_ARGS)
675 : {
676 2 : stop_reason = STREAM_STOP_SIGNAL;
677 2 : time_to_abort = true;
678 2 : }
679 :
680 : /*
681 : * Trigger the output file to be reopened.
682 : */
683 : static void
684 0 : sighup_handler(SIGNAL_ARGS)
685 : {
686 0 : output_reopen = true;
687 0 : }
688 : #endif
689 :
690 :
691 : int
692 110 : main(int argc, char **argv)
693 : {
694 : static struct option long_options[] = {
695 : /* general options */
696 : {"file", required_argument, NULL, 'f'},
697 : {"fsync-interval", required_argument, NULL, 'F'},
698 : {"no-loop", no_argument, NULL, 'n'},
699 : {"verbose", no_argument, NULL, 'v'},
700 : {"two-phase", no_argument, NULL, 't'},
701 : {"version", no_argument, NULL, 'V'},
702 : {"help", no_argument, NULL, '?'},
703 : /* connection options */
704 : {"dbname", required_argument, NULL, 'd'},
705 : {"host", required_argument, NULL, 'h'},
706 : {"port", required_argument, NULL, 'p'},
707 : {"username", required_argument, NULL, 'U'},
708 : {"no-password", no_argument, NULL, 'w'},
709 : {"password", no_argument, NULL, 'W'},
710 : /* replication options */
711 : {"startpos", required_argument, NULL, 'I'},
712 : {"endpos", required_argument, NULL, 'E'},
713 : {"option", required_argument, NULL, 'o'},
714 : {"plugin", required_argument, NULL, 'P'},
715 : {"status-interval", required_argument, NULL, 's'},
716 : {"slot", required_argument, NULL, 'S'},
717 : /* action */
718 : {"create-slot", no_argument, NULL, 1},
719 : {"start", no_argument, NULL, 2},
720 : {"drop-slot", no_argument, NULL, 3},
721 : {"if-not-exists", no_argument, NULL, 4},
722 : {NULL, 0, NULL, 0}
723 : };
724 : int c;
725 : int option_index;
726 : uint32 hi,
727 : lo;
728 : char *db_name;
729 :
730 110 : pg_logging_init(argv[0]);
731 110 : progname = get_progname(argv[0]);
732 110 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
733 :
734 110 : if (argc > 1)
735 : {
736 108 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
737 : {
738 2 : usage();
739 2 : exit(0);
740 : }
741 106 : else if (strcmp(argv[1], "-V") == 0 ||
742 106 : strcmp(argv[1], "--version") == 0)
743 : {
744 2 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
745 2 : exit(0);
746 : }
747 : }
748 :
749 642 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
750 : long_options, &option_index)) != -1)
751 : {
752 538 : switch (c)
753 : {
754 : /* general options */
755 48 : case 'f':
756 48 : outfile = pg_strdup(optarg);
757 48 : break;
758 0 : case 'F':
759 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
760 : INT_MAX / 1000,
761 : &fsync_interval))
762 0 : exit(1);
763 0 : fsync_interval *= 1000;
764 0 : break;
765 46 : case 'n':
766 46 : noloop = 1;
767 46 : break;
768 4 : case 't':
769 4 : two_phase = true;
770 4 : break;
771 0 : case 'v':
772 0 : verbose++;
773 0 : break;
774 : /* connection options */
775 100 : case 'd':
776 100 : dbname = pg_strdup(optarg);
777 100 : break;
778 0 : case 'h':
779 0 : dbhost = pg_strdup(optarg);
780 0 : break;
781 0 : case 'p':
782 0 : dbport = pg_strdup(optarg);
783 0 : break;
784 0 : case 'U':
785 0 : dbuser = pg_strdup(optarg);
786 0 : break;
787 0 : case 'w':
788 0 : dbgetpassword = -1;
789 0 : break;
790 0 : case 'W':
791 0 : dbgetpassword = 1;
792 0 : break;
793 : /* replication options */
794 0 : case 'I':
795 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
796 0 : pg_fatal("could not parse start position \"%s\"", optarg);
797 0 : startpos = ((uint64) hi) << 32 | lo;
798 0 : break;
799 16 : case 'E':
800 16 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
801 0 : pg_fatal("could not parse end position \"%s\"", optarg);
802 16 : endpos = ((uint64) hi) << 32 | lo;
803 16 : break;
804 80 : case 'o':
805 : {
806 80 : char *data = pg_strdup(optarg);
807 80 : char *val = strchr(data, '=');
808 :
809 80 : if (val != NULL)
810 : {
811 : /* remove =; separate data from val */
812 80 : *val = '\0';
813 80 : val++;
814 : }
815 :
816 80 : noptions += 1;
817 80 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
818 :
819 80 : options[(noptions - 1) * 2] = data;
820 80 : options[(noptions - 1) * 2 + 1] = val;
821 : }
822 :
823 80 : break;
824 42 : case 'P':
825 42 : plugin = pg_strdup(optarg);
826 42 : break;
827 0 : case 's':
828 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
829 : INT_MAX / 1000,
830 : &standby_message_timeout))
831 0 : exit(1);
832 0 : standby_message_timeout *= 1000;
833 0 : break;
834 102 : case 'S':
835 102 : replication_slot = pg_strdup(optarg);
836 102 : break;
837 : /* action */
838 46 : case 1:
839 46 : do_create_slot = true;
840 46 : break;
841 50 : case 2:
842 50 : do_start_slot = true;
843 50 : break;
844 2 : case 3:
845 2 : do_drop_slot = true;
846 2 : break;
847 0 : case 4:
848 0 : slot_exists_ok = true;
849 0 : break;
850 :
851 2 : default:
852 : /* getopt_long already emitted a complaint */
853 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
854 2 : exit(1);
855 : }
856 : }
857 :
858 : /*
859 : * Any non-option arguments?
860 : */
861 104 : if (optind < argc)
862 : {
863 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
864 : argv[optind]);
865 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
866 0 : exit(1);
867 : }
868 :
869 : /*
870 : * Required arguments
871 : */
872 104 : if (replication_slot == NULL)
873 : {
874 2 : pg_log_error("no slot specified");
875 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
876 2 : exit(1);
877 : }
878 :
879 102 : if (do_start_slot && outfile == NULL)
880 : {
881 2 : pg_log_error("no target file specified");
882 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
883 2 : exit(1);
884 : }
885 :
886 100 : if (!do_drop_slot && dbname == NULL)
887 : {
888 2 : pg_log_error("no database specified");
889 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
890 2 : exit(1);
891 : }
892 :
893 98 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
894 : {
895 2 : pg_log_error("at least one action needs to be specified");
896 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
897 2 : exit(1);
898 : }
899 :
900 96 : if (do_drop_slot && (do_create_slot || do_start_slot))
901 : {
902 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
903 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
904 0 : exit(1);
905 : }
906 :
907 96 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
908 : {
909 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
910 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
911 0 : exit(1);
912 : }
913 :
914 96 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
915 : {
916 0 : pg_log_error("--endpos may only be specified with --start");
917 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
918 0 : exit(1);
919 : }
920 :
921 96 : if (two_phase && !do_create_slot)
922 : {
923 2 : pg_log_error("--two-phase may only be specified with --create-slot");
924 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
925 2 : exit(1);
926 : }
927 :
928 : /*
929 : * Obtain a connection to server. Notably, if we need a password, we want
930 : * to collect it from the user immediately.
931 : */
932 94 : conn = GetConnection();
933 94 : if (!conn)
934 : /* Error message already written in GetConnection() */
935 0 : exit(1);
936 94 : atexit(disconnect_atexit);
937 :
938 : /*
939 : * Trap signals. (Don't do this until after the initial password prompt,
940 : * if one is needed, in GetConnection.)
941 : */
942 : #ifndef WIN32
943 94 : pqsignal(SIGINT, sigexit_handler);
944 94 : pqsignal(SIGTERM, sigexit_handler);
945 94 : pqsignal(SIGHUP, sighup_handler);
946 : #endif
947 :
948 : /*
949 : * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
950 : * replication connection.
951 : */
952 94 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
953 0 : exit(1);
954 :
955 94 : if (db_name == NULL)
956 0 : pg_fatal("could not establish database-specific replication connection");
957 :
958 : /*
959 : * Set umask so that directories/files are created with the same
960 : * permissions as directories/files in the source data directory.
961 : *
962 : * pg_mode_mask is set to owner-only by default and then updated in
963 : * GetConnection() where we get the mode from the server-side with
964 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
965 : */
966 94 : umask(pg_mode_mask);
967 :
968 : /* Drop a replication slot. */
969 94 : if (do_drop_slot)
970 : {
971 2 : if (verbose)
972 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
973 :
974 2 : if (!DropReplicationSlot(conn, replication_slot))
975 0 : exit(1);
976 : }
977 :
978 : /* Create a replication slot. */
979 94 : if (do_create_slot)
980 : {
981 46 : if (verbose)
982 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
983 :
984 46 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
985 : false, false, slot_exists_ok, two_phase))
986 0 : exit(1);
987 46 : startpos = InvalidXLogRecPtr;
988 : }
989 :
990 94 : if (!do_start_slot)
991 48 : exit(0);
992 :
993 : /* Stream loop */
994 : while (true)
995 : {
996 46 : StreamLogicalLog();
997 46 : if (time_to_abort)
998 : {
999 : /*
1000 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1001 : * not an error, so exit without an errorcode.
1002 : */
1003 16 : exit(0);
1004 : }
1005 30 : else if (noloop)
1006 30 : pg_fatal("disconnected");
1007 : else
1008 : {
1009 : /* translator: check source for value for %d */
1010 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1011 : RECONNECT_SLEEP_TIME);
1012 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1013 : }
1014 : }
1015 : }
1016 :
1017 : /*
1018 : * Fsync our output data, and send a feedback message to the server. Returns
1019 : * true if successful, false otherwise.
1020 : *
1021 : * If successful, *now is updated to the current timestamp just before sending
1022 : * feedback.
1023 : */
1024 : static bool
1025 16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1026 : {
1027 : /* flush data to disk, so that we send a recent flush pointer */
1028 16 : if (!OutputFsync(*now))
1029 0 : return false;
1030 16 : *now = feGetCurrentTimestamp();
1031 16 : if (!sendFeedback(conn, *now, true, false))
1032 0 : return false;
1033 :
1034 16 : return true;
1035 : }
1036 :
1037 : /*
1038 : * Try to inform the server about our upcoming demise, but don't wait around or
1039 : * retry on failure.
1040 : */
1041 : static void
1042 16 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1043 : XLogRecPtr lsn)
1044 : {
1045 16 : (void) PQputCopyEnd(conn, NULL);
1046 16 : (void) PQflush(conn);
1047 :
1048 16 : if (verbose)
1049 : {
1050 0 : switch (reason)
1051 : {
1052 0 : case STREAM_STOP_SIGNAL:
1053 0 : pg_log_info("received interrupt signal, exiting");
1054 0 : break;
1055 0 : case STREAM_STOP_KEEPALIVE:
1056 0 : pg_log_info("end position %X/%X reached by keepalive",
1057 : LSN_FORMAT_ARGS(endpos));
1058 0 : break;
1059 0 : case STREAM_STOP_END_OF_WAL:
1060 : Assert(!XLogRecPtrIsInvalid(lsn));
1061 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1062 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1063 0 : break;
1064 0 : case STREAM_STOP_NONE:
1065 : Assert(false);
1066 0 : break;
1067 : }
1068 16 : }
1069 16 : }
|