Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2025, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <limits.h>
17 : #include <sys/select.h>
18 : #include <sys/stat.h>
19 : #include <unistd.h>
20 :
21 : #include "common/file_perm.h"
22 : #include "common/logging.h"
23 : #include "fe_utils/option_utils.h"
24 : #include "getopt_long.h"
25 : #include "libpq-fe.h"
26 : #include "libpq/pqsignal.h"
27 : #include "pqexpbuffer.h"
28 : #include "streamutil.h"
29 :
30 : /* Time to sleep between reconnection attempts */
31 : #define RECONNECT_SLEEP_TIME 5
32 :
33 : typedef enum
34 : {
35 : STREAM_STOP_NONE,
36 : STREAM_STOP_END_OF_WAL,
37 : STREAM_STOP_KEEPALIVE,
38 : STREAM_STOP_SIGNAL
39 : } StreamStopReason;
40 :
41 : /* Global Options */
42 : static char *outfile = NULL;
43 : static int verbose = 0;
44 : static bool two_phase = false;
45 : static bool failover = false;
46 : static int noloop = 0;
47 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
48 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
49 : static XLogRecPtr startpos = InvalidXLogRecPtr;
50 : static XLogRecPtr endpos = InvalidXLogRecPtr;
51 : static bool do_create_slot = false;
52 : static bool slot_exists_ok = false;
53 : static bool do_start_slot = false;
54 : static bool do_drop_slot = false;
55 : static char *replication_slot = NULL;
56 :
57 : /* filled pairwise with option, value. value may be NULL */
58 : static char **options;
59 : static size_t noptions = 0;
60 : static const char *plugin = "test_decoding";
61 :
62 : /* Global State */
63 : static int outfd = -1;
64 : static volatile sig_atomic_t time_to_abort = false;
65 : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
66 : static volatile sig_atomic_t output_reopen = false;
67 : static bool output_isfile;
68 : static TimestampTz output_last_fsync = -1;
69 : static bool output_needs_fsync = false;
70 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
71 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
72 :
73 : static void usage(void);
74 : static void StreamLogicalLog(void);
75 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
76 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
77 : StreamStopReason reason,
78 : XLogRecPtr lsn);
79 :
80 : static void
81 2 : usage(void)
82 : {
83 2 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
84 : progname);
85 2 : printf(_("Usage:\n"));
86 2 : printf(_(" %s [OPTION]...\n"), progname);
87 2 : printf(_("\nAction to be performed:\n"));
88 2 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
89 2 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
90 2 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
91 2 : printf(_("\nOptions:\n"));
92 2 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
93 2 : printf(_(" --failover enable replication slot synchronization to standby servers when\n"
94 : " creating a slot\n"));
95 2 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
96 2 : printf(_(" -F --fsync-interval=SECS\n"
97 : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
98 2 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
99 2 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
100 2 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
101 2 : printf(_(" -o, --option=NAME[=VALUE]\n"
102 : " pass option NAME with optional value VALUE to the\n"
103 : " output plugin\n"));
104 2 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
105 2 : printf(_(" -s, --status-interval=SECS\n"
106 : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
107 2 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
108 2 : printf(_(" -t, --two-phase enable decoding of prepared transactions when creating a slot\n"));
109 2 : printf(_(" -v, --verbose output verbose messages\n"));
110 2 : printf(_(" -V, --version output version information, then exit\n"));
111 2 : printf(_(" -?, --help show this help, then exit\n"));
112 2 : printf(_("\nConnection options:\n"));
113 2 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
114 2 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
115 2 : printf(_(" -p, --port=PORT database server port number\n"));
116 2 : printf(_(" -U, --username=NAME connect as specified database user\n"));
117 2 : printf(_(" -w, --no-password never prompt for password\n"));
118 2 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
119 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
120 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
121 2 : }
122 :
123 : /*
124 : * Send a Standby Status Update message to server.
125 : */
126 : static bool
127 50 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
128 : {
129 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
130 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
131 :
132 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
133 50 : int len = 0;
134 :
135 : /*
136 : * we normally don't want to send superfluous feedback, but if it's
137 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
138 : * us.
139 : */
140 50 : if (!force &&
141 0 : last_written_lsn == output_written_lsn &&
142 0 : last_fsync_lsn == output_fsync_lsn)
143 0 : return true;
144 :
145 50 : if (verbose)
146 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
147 : LSN_FORMAT_ARGS(output_written_lsn),
148 : LSN_FORMAT_ARGS(output_fsync_lsn),
149 : replication_slot);
150 :
151 50 : replybuf[len] = 'r';
152 50 : len += 1;
153 50 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
154 50 : len += 8;
155 50 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
156 50 : len += 8;
157 50 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
158 50 : len += 8;
159 50 : fe_sendint64(now, &replybuf[len]); /* sendTime */
160 50 : len += 8;
161 50 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
162 50 : len += 1;
163 :
164 50 : startpos = output_written_lsn;
165 50 : last_written_lsn = output_written_lsn;
166 50 : last_fsync_lsn = output_fsync_lsn;
167 :
168 50 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
169 : {
170 0 : pg_log_error("could not send feedback packet: %s",
171 : PQerrorMessage(conn));
172 0 : return false;
173 : }
174 :
175 50 : return true;
176 : }
177 :
178 : static void
179 100 : disconnect_atexit(void)
180 : {
181 100 : if (conn != NULL)
182 54 : PQfinish(conn);
183 100 : }
184 :
185 : static bool
186 50 : OutputFsync(TimestampTz now)
187 : {
188 50 : output_last_fsync = now;
189 :
190 50 : output_fsync_lsn = output_written_lsn;
191 :
192 50 : if (fsync_interval <= 0)
193 0 : return true;
194 :
195 50 : if (!output_needs_fsync)
196 38 : return true;
197 :
198 12 : output_needs_fsync = false;
199 :
200 : /* can only fsync if it's a regular file */
201 12 : if (!output_isfile)
202 8 : return true;
203 :
204 4 : if (fsync(outfd) != 0)
205 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
206 :
207 4 : return true;
208 : }
209 :
210 : /*
211 : * Start the log streaming
212 : */
213 : static void
214 46 : StreamLogicalLog(void)
215 : {
216 : PGresult *res;
217 46 : char *copybuf = NULL;
218 46 : TimestampTz last_status = -1;
219 : int i;
220 : PQExpBuffer query;
221 : XLogRecPtr cur_record_lsn;
222 :
223 46 : output_written_lsn = InvalidXLogRecPtr;
224 46 : output_fsync_lsn = InvalidXLogRecPtr;
225 46 : cur_record_lsn = InvalidXLogRecPtr;
226 :
227 : /*
228 : * Connect in replication mode to the server
229 : */
230 46 : if (!conn)
231 0 : conn = GetConnection();
232 46 : if (!conn)
233 : /* Error message already written in GetConnection() */
234 0 : return;
235 :
236 : /*
237 : * Start the replication
238 : */
239 46 : if (verbose)
240 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
241 : LSN_FORMAT_ARGS(startpos),
242 : replication_slot);
243 :
244 : /* Initiate the replication stream at specified location */
245 46 : query = createPQExpBuffer();
246 46 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
247 46 : replication_slot, LSN_FORMAT_ARGS(startpos));
248 :
249 : /* print options if there are any */
250 46 : if (noptions)
251 40 : appendPQExpBufferStr(query, " (");
252 :
253 126 : for (i = 0; i < noptions; i++)
254 : {
255 : /* separator */
256 80 : if (i > 0)
257 40 : appendPQExpBufferStr(query, ", ");
258 :
259 : /* write option name */
260 80 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
261 :
262 : /* write option value if specified */
263 80 : if (options[(i * 2) + 1] != NULL)
264 80 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
265 : }
266 :
267 46 : if (noptions)
268 40 : appendPQExpBufferChar(query, ')');
269 :
270 46 : res = PQexec(conn, query->data);
271 46 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
272 : {
273 12 : pg_log_error("could not send replication command \"%s\": %s",
274 : query->data, PQresultErrorMessage(res));
275 12 : PQclear(res);
276 12 : goto error;
277 : }
278 34 : PQclear(res);
279 34 : resetPQExpBuffer(query);
280 :
281 34 : if (verbose)
282 0 : pg_log_info("streaming initiated");
283 :
284 944 : while (!time_to_abort)
285 : {
286 : int r;
287 : int bytes_left;
288 : int bytes_written;
289 : TimestampTz now;
290 : int hdr_len;
291 :
292 942 : cur_record_lsn = InvalidXLogRecPtr;
293 :
294 942 : if (copybuf != NULL)
295 : {
296 636 : PQfreemem(copybuf);
297 636 : copybuf = NULL;
298 : }
299 :
300 : /*
301 : * Potentially send a status message to the primary.
302 : */
303 942 : now = feGetCurrentTimestamp();
304 :
305 1850 : if (outfd != -1 &&
306 908 : feTimestampDifferenceExceeds(output_last_fsync, now,
307 : fsync_interval))
308 : {
309 34 : if (!OutputFsync(now))
310 4 : goto error;
311 : }
312 :
313 1884 : if (standby_message_timeout > 0 &&
314 942 : feTimestampDifferenceExceeds(last_status, now,
315 : standby_message_timeout))
316 : {
317 : /* Time to send feedback! */
318 34 : if (!sendFeedback(conn, now, true, false))
319 0 : goto error;
320 :
321 34 : last_status = now;
322 : }
323 :
324 : /* got SIGHUP, close output file */
325 942 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
326 : {
327 0 : now = feGetCurrentTimestamp();
328 0 : if (!OutputFsync(now))
329 0 : goto error;
330 0 : close(outfd);
331 0 : outfd = -1;
332 : }
333 942 : output_reopen = false;
334 :
335 : /* open the output file, if not open yet */
336 942 : if (outfd == -1)
337 : {
338 : struct stat statbuf;
339 :
340 34 : if (strcmp(outfile, "-") == 0)
341 34 : outfd = fileno(stdout);
342 : else
343 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
344 : S_IRUSR | S_IWUSR);
345 34 : if (outfd == -1)
346 : {
347 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
348 0 : goto error;
349 : }
350 :
351 34 : if (fstat(outfd, &statbuf) != 0)
352 : {
353 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
354 0 : goto error;
355 : }
356 :
357 34 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
358 : }
359 :
360 942 : r = PQgetCopyData(conn, ©buf, 1);
361 942 : if (r == 0)
362 272 : {
363 : /*
364 : * In async mode, and no data available. We block on reading but
365 : * not more than the specified timeout, so that we can send a
366 : * response back to the client.
367 : */
368 : fd_set input_mask;
369 278 : TimestampTz message_target = 0;
370 278 : TimestampTz fsync_target = 0;
371 : struct timeval timeout;
372 278 : struct timeval *timeoutptr = NULL;
373 :
374 278 : if (PQsocket(conn) < 0)
375 : {
376 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
377 4 : goto error;
378 : }
379 :
380 4726 : FD_ZERO(&input_mask);
381 278 : FD_SET(PQsocket(conn), &input_mask);
382 :
383 : /* Compute when we need to wakeup to send a keepalive message. */
384 278 : if (standby_message_timeout)
385 278 : message_target = last_status + (standby_message_timeout - 1) *
386 : ((int64) 1000);
387 :
388 : /* Compute when we need to wakeup to fsync the output file. */
389 278 : if (fsync_interval > 0 && output_needs_fsync)
390 108 : fsync_target = output_last_fsync + (fsync_interval - 1) *
391 : ((int64) 1000);
392 :
393 : /* Now compute when to wakeup. */
394 278 : if (message_target > 0 || fsync_target > 0)
395 : {
396 : TimestampTz targettime;
397 : long secs;
398 : int usecs;
399 :
400 278 : targettime = message_target;
401 :
402 278 : if (fsync_target > 0 && fsync_target < targettime)
403 0 : targettime = fsync_target;
404 :
405 278 : feTimestampDifference(now,
406 : targettime,
407 : &secs,
408 : &usecs);
409 278 : if (secs <= 0)
410 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
411 : else
412 278 : timeout.tv_sec = secs;
413 278 : timeout.tv_usec = usecs;
414 278 : timeoutptr = &timeout;
415 : }
416 :
417 278 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
418 278 : if (r == 0 || (r < 0 && errno == EINTR))
419 : {
420 : /*
421 : * Got a timeout or signal. Continue the loop and either
422 : * deliver a status packet to the server or just go back into
423 : * blocking.
424 : */
425 274 : continue;
426 : }
427 276 : else if (r < 0)
428 : {
429 0 : pg_log_error("%s() failed: %m", "select");
430 0 : goto error;
431 : }
432 :
433 : /* Else there is actually data on the socket */
434 276 : if (PQconsumeInput(conn) == 0)
435 : {
436 4 : pg_log_error("could not receive data from WAL stream: %s",
437 : PQerrorMessage(conn));
438 4 : goto error;
439 : }
440 272 : continue;
441 : }
442 :
443 : /* End of copy stream */
444 664 : if (r == -1)
445 28 : break;
446 :
447 : /* Failure while reading the copy stream */
448 650 : if (r == -2)
449 : {
450 0 : pg_log_error("could not read COPY data: %s",
451 : PQerrorMessage(conn));
452 0 : goto error;
453 : }
454 :
455 : /* Check the message type. */
456 650 : if (copybuf[0] == 'k')
457 464 : {
458 : int pos;
459 : bool replyRequested;
460 : XLogRecPtr walEnd;
461 468 : bool endposReached = false;
462 :
463 : /*
464 : * Parse the keepalive message, enclosed in the CopyData message.
465 : * We just check if the server requested a reply, and ignore the
466 : * rest.
467 : */
468 468 : pos = 1; /* skip msgtype 'k' */
469 468 : walEnd = fe_recvint64(©buf[pos]);
470 468 : output_written_lsn = Max(walEnd, output_written_lsn);
471 :
472 468 : pos += 8; /* read walEnd */
473 :
474 468 : pos += 8; /* skip sendTime */
475 :
476 468 : if (r < pos + 1)
477 : {
478 0 : pg_log_error("streaming header too small: %d", r);
479 0 : goto error;
480 : }
481 468 : replyRequested = copybuf[pos];
482 :
483 468 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
484 : {
485 : /*
486 : * If there's nothing to read on the socket until a keepalive
487 : * we know that the server has nothing to send us; and if
488 : * walEnd has passed endpos, we know nothing else can have
489 : * committed before endpos. So we can bail out now.
490 : */
491 4 : endposReached = true;
492 : }
493 :
494 : /* Send a reply, if necessary */
495 468 : if (replyRequested || endposReached)
496 : {
497 6 : if (!flushAndSendFeedback(conn, &now))
498 0 : goto error;
499 6 : last_status = now;
500 : }
501 :
502 468 : if (endposReached)
503 : {
504 4 : stop_reason = STREAM_STOP_KEEPALIVE;
505 4 : time_to_abort = true;
506 4 : break;
507 : }
508 :
509 464 : continue;
510 : }
511 182 : else if (copybuf[0] != 'w')
512 : {
513 0 : pg_log_error("unrecognized streaming header: \"%c\"",
514 : copybuf[0]);
515 0 : goto error;
516 : }
517 :
518 : /*
519 : * Read the header of the XLogData message, enclosed in the CopyData
520 : * message. We only need the WAL location field (dataStart), the rest
521 : * of the header is ignored.
522 : */
523 182 : hdr_len = 1; /* msgtype 'w' */
524 182 : hdr_len += 8; /* dataStart */
525 182 : hdr_len += 8; /* walEnd */
526 182 : hdr_len += 8; /* sendTime */
527 182 : if (r < hdr_len + 1)
528 : {
529 0 : pg_log_error("streaming header too small: %d", r);
530 0 : goto error;
531 : }
532 :
533 : /* Extract WAL location for this block */
534 182 : cur_record_lsn = fe_recvint64(©buf[1]);
535 :
536 182 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
537 : {
538 : /*
539 : * We've read past our endpoint, so prepare to go away being
540 : * cautious about what happens to our output data.
541 : */
542 0 : if (!flushAndSendFeedback(conn, &now))
543 0 : goto error;
544 0 : stop_reason = STREAM_STOP_END_OF_WAL;
545 0 : time_to_abort = true;
546 0 : break;
547 : }
548 :
549 182 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
550 :
551 182 : bytes_left = r - hdr_len;
552 182 : bytes_written = 0;
553 :
554 : /* signal that a fsync is needed */
555 182 : output_needs_fsync = true;
556 :
557 364 : while (bytes_left)
558 : {
559 : int ret;
560 :
561 364 : ret = write(outfd,
562 182 : copybuf + hdr_len + bytes_written,
563 : bytes_left);
564 :
565 182 : if (ret < 0)
566 : {
567 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
568 : bytes_left, outfile);
569 0 : goto error;
570 : }
571 :
572 : /* Write was successful, advance our position */
573 182 : bytes_written += ret;
574 182 : bytes_left -= ret;
575 : }
576 :
577 182 : if (write(outfd, "\n", 1) != 1)
578 : {
579 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
580 : 1, outfile);
581 0 : goto error;
582 : }
583 :
584 182 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
585 : {
586 : /* endpos was exactly the record we just processed, we're done */
587 10 : if (!flushAndSendFeedback(conn, &now))
588 0 : goto error;
589 10 : stop_reason = STREAM_STOP_END_OF_WAL;
590 10 : time_to_abort = true;
591 10 : break;
592 : }
593 : }
594 :
595 : /* Clean up connection state if stream has been aborted */
596 30 : if (time_to_abort)
597 16 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
598 :
599 30 : res = PQgetResult(conn);
600 30 : if (PQresultStatus(res) == PGRES_COPY_OUT)
601 : {
602 16 : PQclear(res);
603 :
604 : /*
605 : * We're doing a client-initiated clean exit and have sent CopyDone to
606 : * the server. Drain any messages, so we don't miss a last-minute
607 : * ErrorResponse. The walsender stops generating XLogData records once
608 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
609 : * it's too late for sendFeedback(), even if this were to take a long
610 : * time. Hence, use synchronous-mode PQgetCopyData().
611 : */
612 : while (1)
613 4 : {
614 : int r;
615 :
616 20 : if (copybuf != NULL)
617 : {
618 18 : PQfreemem(copybuf);
619 18 : copybuf = NULL;
620 : }
621 20 : r = PQgetCopyData(conn, ©buf, 0);
622 20 : if (r == -1)
623 16 : break;
624 4 : if (r == -2)
625 : {
626 0 : pg_log_error("could not read COPY data: %s",
627 : PQerrorMessage(conn));
628 0 : time_to_abort = false; /* unclean exit */
629 0 : goto error;
630 : }
631 : }
632 :
633 16 : res = PQgetResult(conn);
634 : }
635 30 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
636 : {
637 12 : pg_log_error("unexpected termination of replication stream: %s",
638 : PQresultErrorMessage(res));
639 12 : PQclear(res);
640 12 : goto error;
641 : }
642 18 : PQclear(res);
643 :
644 18 : if (outfd != -1 && strcmp(outfile, "-") != 0)
645 : {
646 0 : TimestampTz t = feGetCurrentTimestamp();
647 :
648 : /* no need to jump to error on failure here, we're finishing anyway */
649 0 : OutputFsync(t);
650 :
651 0 : if (close(outfd) != 0)
652 0 : pg_log_error("could not close file \"%s\": %m", outfile);
653 : }
654 18 : outfd = -1;
655 46 : error:
656 46 : if (copybuf != NULL)
657 : {
658 0 : PQfreemem(copybuf);
659 0 : copybuf = NULL;
660 : }
661 46 : destroyPQExpBuffer(query);
662 46 : PQfinish(conn);
663 46 : conn = NULL;
664 : }
665 :
666 : /*
667 : * Unfortunately we can't do sensible signal handling on windows...
668 : */
669 : #ifndef WIN32
670 :
671 : /*
672 : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
673 : * possible moment.
674 : */
675 : static void
676 2 : sigexit_handler(SIGNAL_ARGS)
677 : {
678 2 : stop_reason = STREAM_STOP_SIGNAL;
679 2 : time_to_abort = true;
680 2 : }
681 :
682 : /*
683 : * Trigger the output file to be reopened.
684 : */
685 : static void
686 0 : sighup_handler(SIGNAL_ARGS)
687 : {
688 0 : output_reopen = true;
689 0 : }
690 : #endif
691 :
692 :
693 : int
694 116 : main(int argc, char **argv)
695 : {
696 : static struct option long_options[] = {
697 : /* general options */
698 : {"file", required_argument, NULL, 'f'},
699 : {"fsync-interval", required_argument, NULL, 'F'},
700 : {"no-loop", no_argument, NULL, 'n'},
701 : {"failover", no_argument, NULL, 5},
702 : {"verbose", no_argument, NULL, 'v'},
703 : {"two-phase", no_argument, NULL, 't'},
704 : {"version", no_argument, NULL, 'V'},
705 : {"help", no_argument, NULL, '?'},
706 : /* connection options */
707 : {"dbname", required_argument, NULL, 'd'},
708 : {"host", required_argument, NULL, 'h'},
709 : {"port", required_argument, NULL, 'p'},
710 : {"username", required_argument, NULL, 'U'},
711 : {"no-password", no_argument, NULL, 'w'},
712 : {"password", no_argument, NULL, 'W'},
713 : /* replication options */
714 : {"startpos", required_argument, NULL, 'I'},
715 : {"endpos", required_argument, NULL, 'E'},
716 : {"option", required_argument, NULL, 'o'},
717 : {"plugin", required_argument, NULL, 'P'},
718 : {"status-interval", required_argument, NULL, 's'},
719 : {"slot", required_argument, NULL, 'S'},
720 : /* action */
721 : {"create-slot", no_argument, NULL, 1},
722 : {"start", no_argument, NULL, 2},
723 : {"drop-slot", no_argument, NULL, 3},
724 : {"if-not-exists", no_argument, NULL, 4},
725 : {NULL, 0, NULL, 0}
726 : };
727 : int c;
728 : int option_index;
729 : uint32 hi,
730 : lo;
731 : char *db_name;
732 :
733 116 : pg_logging_init(argv[0]);
734 116 : progname = get_progname(argv[0]);
735 116 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
736 :
737 116 : if (argc > 1)
738 : {
739 114 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
740 : {
741 2 : usage();
742 2 : exit(0);
743 : }
744 112 : else if (strcmp(argv[1], "-V") == 0 ||
745 112 : strcmp(argv[1], "--version") == 0)
746 : {
747 2 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
748 2 : exit(0);
749 : }
750 : }
751 :
752 668 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
753 668 : long_options, &option_index)) != -1)
754 : {
755 558 : switch (c)
756 : {
757 : /* general options */
758 48 : case 'f':
759 48 : outfile = pg_strdup(optarg);
760 48 : break;
761 0 : case 'F':
762 0 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
763 : INT_MAX / 1000,
764 : &fsync_interval))
765 0 : exit(1);
766 0 : fsync_interval *= 1000;
767 0 : break;
768 46 : case 'n':
769 46 : noloop = 1;
770 46 : break;
771 4 : case 't':
772 4 : two_phase = true;
773 4 : break;
774 0 : case 'v':
775 0 : verbose++;
776 0 : break;
777 2 : case 5:
778 2 : failover = true;
779 2 : break;
780 : /* connection options */
781 104 : case 'd':
782 104 : dbname = pg_strdup(optarg);
783 104 : break;
784 0 : case 'h':
785 0 : dbhost = pg_strdup(optarg);
786 0 : break;
787 0 : case 'p':
788 0 : dbport = pg_strdup(optarg);
789 0 : break;
790 0 : case 'U':
791 0 : dbuser = pg_strdup(optarg);
792 0 : break;
793 0 : case 'w':
794 0 : dbgetpassword = -1;
795 0 : break;
796 0 : case 'W':
797 0 : dbgetpassword = 1;
798 0 : break;
799 : /* replication options */
800 0 : case 'I':
801 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
802 0 : pg_fatal("could not parse start position \"%s\"", optarg);
803 0 : startpos = ((uint64) hi) << 32 | lo;
804 0 : break;
805 16 : case 'E':
806 16 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
807 0 : pg_fatal("could not parse end position \"%s\"", optarg);
808 16 : endpos = ((uint64) hi) << 32 | lo;
809 16 : break;
810 80 : case 'o':
811 : {
812 80 : char *data = pg_strdup(optarg);
813 80 : char *val = strchr(data, '=');
814 :
815 80 : if (val != NULL)
816 : {
817 : /* remove =; separate data from val */
818 80 : *val = '\0';
819 80 : val++;
820 : }
821 :
822 80 : noptions += 1;
823 80 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
824 :
825 80 : options[(noptions - 1) * 2] = data;
826 80 : options[(noptions - 1) * 2 + 1] = val;
827 : }
828 :
829 80 : break;
830 44 : case 'P':
831 44 : plugin = pg_strdup(optarg);
832 44 : break;
833 0 : case 's':
834 0 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
835 : INT_MAX / 1000,
836 : &standby_message_timeout))
837 0 : exit(1);
838 0 : standby_message_timeout *= 1000;
839 0 : break;
840 108 : case 'S':
841 108 : replication_slot = pg_strdup(optarg);
842 108 : break;
843 : /* action */
844 50 : case 1:
845 50 : do_create_slot = true;
846 50 : break;
847 50 : case 2:
848 50 : do_start_slot = true;
849 50 : break;
850 4 : case 3:
851 4 : do_drop_slot = true;
852 4 : break;
853 0 : case 4:
854 0 : slot_exists_ok = true;
855 0 : break;
856 :
857 2 : default:
858 : /* getopt_long already emitted a complaint */
859 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
860 2 : exit(1);
861 : }
862 : }
863 :
864 : /*
865 : * Any non-option arguments?
866 : */
867 110 : if (optind < argc)
868 : {
869 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
870 : argv[optind]);
871 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
872 0 : exit(1);
873 : }
874 :
875 : /*
876 : * Required arguments
877 : */
878 110 : if (replication_slot == NULL)
879 : {
880 2 : pg_log_error("no slot specified");
881 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
882 2 : exit(1);
883 : }
884 :
885 108 : if (do_start_slot && outfile == NULL)
886 : {
887 2 : pg_log_error("no target file specified");
888 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
889 2 : exit(1);
890 : }
891 :
892 106 : if (!do_drop_slot && dbname == NULL)
893 : {
894 2 : pg_log_error("no database specified");
895 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
896 2 : exit(1);
897 : }
898 :
899 104 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
900 : {
901 2 : pg_log_error("at least one action needs to be specified");
902 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
903 2 : exit(1);
904 : }
905 :
906 102 : if (do_drop_slot && (do_create_slot || do_start_slot))
907 : {
908 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
909 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
910 0 : exit(1);
911 : }
912 :
913 102 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
914 : {
915 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
916 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
917 0 : exit(1);
918 : }
919 :
920 102 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
921 : {
922 0 : pg_log_error("--endpos may only be specified with --start");
923 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
924 0 : exit(1);
925 : }
926 :
927 102 : if (!do_create_slot)
928 : {
929 52 : if (two_phase)
930 : {
931 2 : pg_log_error("--two-phase may only be specified with --create-slot");
932 2 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
933 2 : exit(1);
934 : }
935 :
936 50 : if (failover)
937 : {
938 0 : pg_log_error("--failover may only be specified with --create-slot");
939 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
940 0 : exit(1);
941 : }
942 : }
943 :
944 : /*
945 : * Obtain a connection to server. Notably, if we need a password, we want
946 : * to collect it from the user immediately.
947 : */
948 100 : conn = GetConnection();
949 100 : if (!conn)
950 : /* Error message already written in GetConnection() */
951 0 : exit(1);
952 100 : atexit(disconnect_atexit);
953 :
954 : /*
955 : * Trap signals. (Don't do this until after the initial password prompt,
956 : * if one is needed, in GetConnection.)
957 : */
958 : #ifndef WIN32
959 100 : pqsignal(SIGINT, sigexit_handler);
960 100 : pqsignal(SIGTERM, sigexit_handler);
961 100 : pqsignal(SIGHUP, sighup_handler);
962 : #endif
963 :
964 : /*
965 : * Run IDENTIFY_SYSTEM to check the connection type for each action.
966 : * --create-slot and --start actions require a database-specific
967 : * replication connection because they handle logical replication slots.
968 : * --drop-slot can remove replication slots from any replication
969 : * connection without this restriction.
970 : */
971 100 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
972 0 : exit(1);
973 :
974 100 : if (!do_drop_slot && db_name == NULL)
975 0 : pg_fatal("could not establish database-specific replication connection");
976 :
977 : /*
978 : * Set umask so that directories/files are created with the same
979 : * permissions as directories/files in the source data directory.
980 : *
981 : * pg_mode_mask is set to owner-only by default and then updated in
982 : * GetConnection() where we get the mode from the server-side with
983 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
984 : */
985 100 : umask(pg_mode_mask);
986 :
987 : /* Drop a replication slot. */
988 100 : if (do_drop_slot)
989 : {
990 4 : if (verbose)
991 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
992 :
993 4 : if (!DropReplicationSlot(conn, replication_slot))
994 0 : exit(1);
995 : }
996 :
997 : /* Create a replication slot. */
998 100 : if (do_create_slot)
999 : {
1000 50 : if (verbose)
1001 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
1002 :
1003 50 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
1004 : false, false, slot_exists_ok, two_phase,
1005 : failover))
1006 0 : exit(1);
1007 50 : startpos = InvalidXLogRecPtr;
1008 : }
1009 :
1010 100 : if (!do_start_slot)
1011 54 : exit(0);
1012 :
1013 : /* Stream loop */
1014 : while (true)
1015 : {
1016 46 : StreamLogicalLog();
1017 46 : if (time_to_abort)
1018 : {
1019 : /*
1020 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1021 : * not an error, so exit without an errorcode.
1022 : */
1023 16 : exit(0);
1024 : }
1025 30 : else if (noloop)
1026 30 : pg_fatal("disconnected");
1027 : else
1028 : {
1029 : /* translator: check source for value for %d */
1030 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1031 : RECONNECT_SLEEP_TIME);
1032 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1033 : }
1034 : }
1035 : }
1036 :
1037 : /*
1038 : * Fsync our output data, and send a feedback message to the server. Returns
1039 : * true if successful, false otherwise.
1040 : *
1041 : * If successful, *now is updated to the current timestamp just before sending
1042 : * feedback.
1043 : */
1044 : static bool
1045 16 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1046 : {
1047 : /* flush data to disk, so that we send a recent flush pointer */
1048 16 : if (!OutputFsync(*now))
1049 0 : return false;
1050 16 : *now = feGetCurrentTimestamp();
1051 16 : if (!sendFeedback(conn, *now, true, false))
1052 0 : return false;
1053 :
1054 16 : return true;
1055 : }
1056 :
1057 : /*
1058 : * Try to inform the server about our upcoming demise, but don't wait around or
1059 : * retry on failure.
1060 : */
1061 : static void
1062 16 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1063 : XLogRecPtr lsn)
1064 : {
1065 16 : (void) PQputCopyEnd(conn, NULL);
1066 16 : (void) PQflush(conn);
1067 :
1068 16 : if (verbose)
1069 : {
1070 0 : switch (reason)
1071 : {
1072 0 : case STREAM_STOP_SIGNAL:
1073 0 : pg_log_info("received interrupt signal, exiting");
1074 0 : break;
1075 0 : case STREAM_STOP_KEEPALIVE:
1076 0 : pg_log_info("end position %X/%X reached by keepalive",
1077 : LSN_FORMAT_ARGS(endpos));
1078 0 : break;
1079 0 : case STREAM_STOP_END_OF_WAL:
1080 : Assert(!XLogRecPtrIsInvalid(lsn));
1081 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1082 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1083 0 : break;
1084 0 : case STREAM_STOP_NONE:
1085 : Assert(false);
1086 0 : break;
1087 : }
1088 : }
1089 16 : }
|