Line data Source code
1 : /*-------------------------------------------------------------------------
2 : *
3 : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : * fashion and write it to a local file.
5 : *
6 : * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
7 : *
8 : * IDENTIFICATION
9 : * src/bin/pg_basebackup/pg_recvlogical.c
10 : *-------------------------------------------------------------------------
11 : */
12 :
13 : #include "postgres_fe.h"
14 :
15 : #include <dirent.h>
16 : #include <sys/stat.h>
17 : #include <unistd.h>
18 : #ifdef HAVE_SYS_SELECT_H
19 : #include <sys/select.h>
20 : #endif
21 :
22 : #include "access/xlog_internal.h"
23 : #include "common/fe_memutils.h"
24 : #include "common/file_perm.h"
25 : #include "common/logging.h"
26 : #include "getopt_long.h"
27 : #include "libpq-fe.h"
28 : #include "libpq/pqsignal.h"
29 : #include "pqexpbuffer.h"
30 : #include "streamutil.h"
31 :
32 : /* Time to sleep between reconnection attempts */
33 : #define RECONNECT_SLEEP_TIME 5
34 :
35 : /* Global Options */
36 : static char *outfile = NULL;
37 : static int verbose = 0;
38 : static int noloop = 0;
39 : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
40 : static int fsync_interval = 10 * 1000; /* 10 sec = default */
41 : static XLogRecPtr startpos = InvalidXLogRecPtr;
42 : static XLogRecPtr endpos = InvalidXLogRecPtr;
43 : static bool do_create_slot = false;
44 : static bool slot_exists_ok = false;
45 : static bool do_start_slot = false;
46 : static bool do_drop_slot = false;
47 : static char *replication_slot = NULL;
48 :
49 : /* filled pairwise with option, value. value may be NULL */
50 : static char **options;
51 : static size_t noptions = 0;
52 : static const char *plugin = "test_decoding";
53 :
54 : /* Global State */
55 : static int outfd = -1;
56 : static volatile sig_atomic_t time_to_abort = false;
57 : static volatile sig_atomic_t output_reopen = false;
58 : static bool output_isfile;
59 : static TimestampTz output_last_fsync = -1;
60 : static bool output_needs_fsync = false;
61 : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
62 : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
63 :
64 : static void usage(void);
65 : static void StreamLogicalLog(void);
66 : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
67 : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
68 : bool keepalive, XLogRecPtr lsn);
69 :
70 : static void
71 2 : usage(void)
72 : {
73 2 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
74 : progname);
75 2 : printf(_("Usage:\n"));
76 2 : printf(_(" %s [OPTION]...\n"), progname);
77 2 : printf(_("\nAction to be performed:\n"));
78 2 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
79 2 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
80 2 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
81 2 : printf(_("\nOptions:\n"));
82 2 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
83 2 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
84 2 : printf(_(" -F --fsync-interval=SECS\n"
85 : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
86 2 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
87 2 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
88 2 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
89 2 : printf(_(" -o, --option=NAME[=VALUE]\n"
90 : " pass option NAME with optional value VALUE to the\n"
91 : " output plugin\n"));
92 2 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
93 2 : printf(_(" -s, --status-interval=SECS\n"
94 : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
95 2 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
96 2 : printf(_(" -v, --verbose output verbose messages\n"));
97 2 : printf(_(" -V, --version output version information, then exit\n"));
98 2 : printf(_(" -?, --help show this help, then exit\n"));
99 2 : printf(_("\nConnection options:\n"));
100 2 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
101 2 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
102 2 : printf(_(" -p, --port=PORT database server port number\n"));
103 2 : printf(_(" -U, --username=NAME connect as specified database user\n"));
104 2 : printf(_(" -w, --no-password never prompt for password\n"));
105 2 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
106 2 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
107 2 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
108 2 : }
109 :
110 : /*
111 : * Send a Standby Status Update message to server.
112 : */
113 : static bool
114 16 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
115 : {
116 : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
117 : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
118 :
119 : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
120 16 : int len = 0;
121 :
122 : /*
123 : * we normally don't want to send superfluous feedback, but if it's
124 : * because of a timeout we need to, otherwise wal_sender_timeout will kill
125 : * us.
126 : */
127 16 : if (!force &&
128 0 : last_written_lsn == output_written_lsn &&
129 0 : last_fsync_lsn == output_fsync_lsn)
130 0 : return true;
131 :
132 16 : if (verbose)
133 0 : pg_log_info("confirming write up to %X/%X, flush to %X/%X (slot %s)",
134 : LSN_FORMAT_ARGS(output_written_lsn),
135 : LSN_FORMAT_ARGS(output_fsync_lsn),
136 : replication_slot);
137 :
138 16 : replybuf[len] = 'r';
139 16 : len += 1;
140 16 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
141 16 : len += 8;
142 16 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
143 16 : len += 8;
144 16 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
145 16 : len += 8;
146 16 : fe_sendint64(now, &replybuf[len]); /* sendTime */
147 16 : len += 8;
148 16 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
149 16 : len += 1;
150 :
151 16 : startpos = output_written_lsn;
152 16 : last_written_lsn = output_written_lsn;
153 16 : last_fsync_lsn = output_fsync_lsn;
154 :
155 16 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
156 : {
157 0 : pg_log_error("could not send feedback packet: %s",
158 : PQerrorMessage(conn));
159 0 : return false;
160 : }
161 :
162 16 : return true;
163 : }
164 :
165 : static void
166 10 : disconnect_atexit(void)
167 : {
168 10 : if (conn != NULL)
169 2 : PQfinish(conn);
170 10 : }
171 :
172 : static bool
173 16 : OutputFsync(TimestampTz now)
174 : {
175 16 : output_last_fsync = now;
176 :
177 16 : output_fsync_lsn = output_written_lsn;
178 :
179 16 : if (fsync_interval <= 0)
180 0 : return true;
181 :
182 16 : if (!output_needs_fsync)
183 10 : return true;
184 :
185 6 : output_needs_fsync = false;
186 :
187 : /* can only fsync if it's a regular file */
188 6 : if (!output_isfile)
189 4 : return true;
190 :
191 2 : if (fsync(outfd) != 0)
192 : {
193 0 : pg_log_fatal("could not fsync file \"%s\": %m", outfile);
194 0 : exit(1);
195 : }
196 :
197 2 : return true;
198 : }
199 :
200 : /*
201 : * Start the log streaming
202 : */
203 : static void
204 8 : StreamLogicalLog(void)
205 : {
206 : PGresult *res;
207 8 : char *copybuf = NULL;
208 8 : TimestampTz last_status = -1;
209 : int i;
210 : PQExpBuffer query;
211 :
212 8 : output_written_lsn = InvalidXLogRecPtr;
213 8 : output_fsync_lsn = InvalidXLogRecPtr;
214 :
215 8 : query = createPQExpBuffer();
216 :
217 : /*
218 : * Connect in replication mode to the server
219 : */
220 8 : if (!conn)
221 0 : conn = GetConnection();
222 8 : if (!conn)
223 : /* Error message already written in GetConnection() */
224 0 : return;
225 :
226 : /*
227 : * Start the replication
228 : */
229 8 : if (verbose)
230 0 : pg_log_info("starting log streaming at %X/%X (slot %s)",
231 : LSN_FORMAT_ARGS(startpos),
232 : replication_slot);
233 :
234 : /* Initiate the replication stream at specified location */
235 16 : appendPQExpBuffer(query, "START_REPLICATION SLOT \"%s\" LOGICAL %X/%X",
236 8 : replication_slot, LSN_FORMAT_ARGS(startpos));
237 :
238 : /* print options if there are any */
239 8 : if (noptions)
240 6 : appendPQExpBufferStr(query, " (");
241 :
242 20 : for (i = 0; i < noptions; i++)
243 : {
244 : /* separator */
245 12 : if (i > 0)
246 6 : appendPQExpBufferStr(query, ", ");
247 :
248 : /* write option name */
249 12 : appendPQExpBuffer(query, "\"%s\"", options[(i * 2)]);
250 :
251 : /* write option value if specified */
252 12 : if (options[(i * 2) + 1] != NULL)
253 12 : appendPQExpBuffer(query, " '%s'", options[(i * 2) + 1]);
254 : }
255 :
256 8 : if (noptions)
257 6 : appendPQExpBufferChar(query, ')');
258 :
259 8 : res = PQexec(conn, query->data);
260 8 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
261 : {
262 0 : pg_log_error("could not send replication command \"%s\": %s",
263 : query->data, PQresultErrorMessage(res));
264 0 : PQclear(res);
265 0 : goto error;
266 : }
267 8 : PQclear(res);
268 8 : resetPQExpBuffer(query);
269 :
270 8 : if (verbose)
271 0 : pg_log_info("streaming initiated");
272 :
273 122 : while (!time_to_abort)
274 : {
275 : int r;
276 : int bytes_left;
277 : int bytes_written;
278 : TimestampTz now;
279 : int hdr_len;
280 122 : XLogRecPtr cur_record_lsn = InvalidXLogRecPtr;
281 :
282 122 : if (copybuf != NULL)
283 : {
284 58 : PQfreemem(copybuf);
285 58 : copybuf = NULL;
286 : }
287 :
288 : /*
289 : * Potentially send a status message to the primary.
290 : */
291 122 : now = feGetCurrentTimestamp();
292 :
293 236 : if (outfd != -1 &&
294 114 : feTimestampDifferenceExceeds(output_last_fsync, now,
295 : fsync_interval))
296 : {
297 8 : if (!OutputFsync(now))
298 0 : goto error;
299 : }
300 :
301 244 : if (standby_message_timeout > 0 &&
302 122 : feTimestampDifferenceExceeds(last_status, now,
303 : standby_message_timeout))
304 : {
305 : /* Time to send feedback! */
306 8 : if (!sendFeedback(conn, now, true, false))
307 0 : goto error;
308 :
309 8 : last_status = now;
310 : }
311 :
312 : /* got SIGHUP, close output file */
313 122 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
314 : {
315 0 : now = feGetCurrentTimestamp();
316 0 : if (!OutputFsync(now))
317 0 : goto error;
318 0 : close(outfd);
319 0 : outfd = -1;
320 : }
321 122 : output_reopen = false;
322 :
323 : /* open the output file, if not open yet */
324 122 : if (outfd == -1)
325 : {
326 : struct stat statbuf;
327 :
328 8 : if (strcmp(outfile, "-") == 0)
329 8 : outfd = fileno(stdout);
330 : else
331 0 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
332 : S_IRUSR | S_IWUSR);
333 8 : if (outfd == -1)
334 : {
335 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
336 0 : goto error;
337 : }
338 :
339 8 : if (fstat(outfd, &statbuf) != 0)
340 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
341 :
342 8 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
343 : }
344 :
345 122 : r = PQgetCopyData(conn, ©buf, 1);
346 122 : if (r == 0)
347 : {
348 : /*
349 : * In async mode, and no data available. We block on reading but
350 : * not more than the specified timeout, so that we can send a
351 : * response back to the client.
352 : */
353 : fd_set input_mask;
354 56 : TimestampTz message_target = 0;
355 56 : TimestampTz fsync_target = 0;
356 : struct timeval timeout;
357 56 : struct timeval *timeoutptr = NULL;
358 :
359 56 : if (PQsocket(conn) < 0)
360 : {
361 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
362 0 : goto error;
363 : }
364 :
365 56 : FD_ZERO(&input_mask);
366 56 : FD_SET(PQsocket(conn), &input_mask);
367 :
368 : /* Compute when we need to wakeup to send a keepalive message. */
369 56 : if (standby_message_timeout)
370 56 : message_target = last_status + (standby_message_timeout - 1) *
371 : ((int64) 1000);
372 :
373 : /* Compute when we need to wakeup to fsync the output file. */
374 56 : if (fsync_interval > 0 && output_needs_fsync)
375 42 : fsync_target = output_last_fsync + (fsync_interval - 1) *
376 : ((int64) 1000);
377 :
378 : /* Now compute when to wakeup. */
379 56 : if (message_target > 0 || fsync_target > 0)
380 : {
381 : TimestampTz targettime;
382 : long secs;
383 : int usecs;
384 :
385 56 : targettime = message_target;
386 :
387 56 : if (fsync_target > 0 && fsync_target < targettime)
388 0 : targettime = fsync_target;
389 :
390 56 : feTimestampDifference(now,
391 : targettime,
392 : &secs,
393 : &usecs);
394 56 : if (secs <= 0)
395 0 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
396 : else
397 56 : timeout.tv_sec = secs;
398 56 : timeout.tv_usec = usecs;
399 56 : timeoutptr = &timeout;
400 : }
401 :
402 56 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
403 56 : if (r == 0 || (r < 0 && errno == EINTR))
404 : {
405 : /*
406 : * Got a timeout or signal. Continue the loop and either
407 : * deliver a status packet to the server or just go back into
408 : * blocking.
409 : */
410 56 : continue;
411 : }
412 56 : else if (r < 0)
413 : {
414 0 : pg_log_error("select() failed: %m");
415 0 : goto error;
416 : }
417 :
418 : /* Else there is actually data on the socket */
419 56 : if (PQconsumeInput(conn) == 0)
420 : {
421 0 : pg_log_error("could not receive data from WAL stream: %s",
422 : PQerrorMessage(conn));
423 0 : goto error;
424 : }
425 56 : continue;
426 : }
427 :
428 : /* End of copy stream */
429 66 : if (r == -1)
430 8 : break;
431 :
432 : /* Failure while reading the copy stream */
433 66 : if (r == -2)
434 : {
435 0 : pg_log_error("could not read COPY data: %s",
436 : PQerrorMessage(conn));
437 0 : goto error;
438 : }
439 :
440 : /* Check the message type. */
441 66 : if (copybuf[0] == 'k')
442 : {
443 : int pos;
444 : bool replyRequested;
445 : XLogRecPtr walEnd;
446 8 : bool endposReached = false;
447 :
448 : /*
449 : * Parse the keepalive message, enclosed in the CopyData message.
450 : * We just check if the server requested a reply, and ignore the
451 : * rest.
452 : */
453 8 : pos = 1; /* skip msgtype 'k' */
454 8 : walEnd = fe_recvint64(©buf[pos]);
455 8 : output_written_lsn = Max(walEnd, output_written_lsn);
456 :
457 8 : pos += 8; /* read walEnd */
458 :
459 8 : pos += 8; /* skip sendTime */
460 :
461 8 : if (r < pos + 1)
462 : {
463 0 : pg_log_error("streaming header too small: %d", r);
464 0 : goto error;
465 : }
466 8 : replyRequested = copybuf[pos];
467 :
468 8 : if (endpos != InvalidXLogRecPtr && walEnd >= endpos)
469 : {
470 : /*
471 : * If there's nothing to read on the socket until a keepalive
472 : * we know that the server has nothing to send us; and if
473 : * walEnd has passed endpos, we know nothing else can have
474 : * committed before endpos. So we can bail out now.
475 : */
476 2 : endposReached = true;
477 : }
478 :
479 : /* Send a reply, if necessary */
480 8 : if (replyRequested || endposReached)
481 : {
482 2 : if (!flushAndSendFeedback(conn, &now))
483 0 : goto error;
484 2 : last_status = now;
485 : }
486 :
487 8 : if (endposReached)
488 : {
489 2 : prepareToTerminate(conn, endpos, true, InvalidXLogRecPtr);
490 2 : time_to_abort = true;
491 2 : break;
492 : }
493 :
494 6 : continue;
495 : }
496 58 : else if (copybuf[0] != 'w')
497 : {
498 0 : pg_log_error("unrecognized streaming header: \"%c\"",
499 : copybuf[0]);
500 0 : goto error;
501 : }
502 :
503 : /*
504 : * Read the header of the XLogData message, enclosed in the CopyData
505 : * message. We only need the WAL location field (dataStart), the rest
506 : * of the header is ignored.
507 : */
508 58 : hdr_len = 1; /* msgtype 'w' */
509 58 : hdr_len += 8; /* dataStart */
510 58 : hdr_len += 8; /* walEnd */
511 58 : hdr_len += 8; /* sendTime */
512 58 : if (r < hdr_len + 1)
513 : {
514 0 : pg_log_error("streaming header too small: %d", r);
515 0 : goto error;
516 : }
517 :
518 : /* Extract WAL location for this block */
519 58 : cur_record_lsn = fe_recvint64(©buf[1]);
520 :
521 58 : if (endpos != InvalidXLogRecPtr && cur_record_lsn > endpos)
522 : {
523 : /*
524 : * We've read past our endpoint, so prepare to go away being
525 : * cautious about what happens to our output data.
526 : */
527 0 : if (!flushAndSendFeedback(conn, &now))
528 0 : goto error;
529 0 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
530 0 : time_to_abort = true;
531 0 : break;
532 : }
533 :
534 58 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
535 :
536 58 : bytes_left = r - hdr_len;
537 58 : bytes_written = 0;
538 :
539 : /* signal that a fsync is needed */
540 58 : output_needs_fsync = true;
541 :
542 116 : while (bytes_left)
543 : {
544 : int ret;
545 :
546 174 : ret = write(outfd,
547 58 : copybuf + hdr_len + bytes_written,
548 : bytes_left);
549 :
550 58 : if (ret < 0)
551 : {
552 0 : pg_log_error("could not write %u bytes to log file \"%s\": %m",
553 : bytes_left, outfile);
554 0 : goto error;
555 : }
556 :
557 : /* Write was successful, advance our position */
558 58 : bytes_written += ret;
559 58 : bytes_left -= ret;
560 : }
561 :
562 58 : if (write(outfd, "\n", 1) != 1)
563 : {
564 0 : pg_log_error("could not write %u bytes to log file \"%s\": %m",
565 : 1, outfile);
566 0 : goto error;
567 : }
568 :
569 58 : if (endpos != InvalidXLogRecPtr && cur_record_lsn == endpos)
570 : {
571 : /* endpos was exactly the record we just processed, we're done */
572 6 : if (!flushAndSendFeedback(conn, &now))
573 0 : goto error;
574 6 : prepareToTerminate(conn, endpos, false, cur_record_lsn);
575 6 : time_to_abort = true;
576 6 : break;
577 : }
578 : }
579 :
580 8 : res = PQgetResult(conn);
581 8 : if (PQresultStatus(res) == PGRES_COPY_OUT)
582 : {
583 8 : PQclear(res);
584 :
585 : /*
586 : * We're doing a client-initiated clean exit and have sent CopyDone to
587 : * the server. Drain any messages, so we don't miss a last-minute
588 : * ErrorResponse. The walsender stops generating XLogData records once
589 : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
590 : * it's too late for sendFeedback(), even if this were to take a long
591 : * time. Hence, use synchronous-mode PQgetCopyData().
592 : */
593 : while (1)
594 2 : {
595 : int r;
596 :
597 10 : if (copybuf != NULL)
598 : {
599 10 : PQfreemem(copybuf);
600 10 : copybuf = NULL;
601 : }
602 10 : r = PQgetCopyData(conn, ©buf, 0);
603 10 : if (r == -1)
604 8 : break;
605 2 : if (r == -2)
606 : {
607 0 : pg_log_error("could not read COPY data: %s",
608 : PQerrorMessage(conn));
609 0 : time_to_abort = false; /* unclean exit */
610 0 : goto error;
611 : }
612 : }
613 :
614 8 : res = PQgetResult(conn);
615 : }
616 8 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
617 : {
618 0 : pg_log_error("unexpected termination of replication stream: %s",
619 : PQresultErrorMessage(res));
620 0 : goto error;
621 : }
622 8 : PQclear(res);
623 :
624 8 : if (outfd != -1 && strcmp(outfile, "-") != 0)
625 : {
626 0 : TimestampTz t = feGetCurrentTimestamp();
627 :
628 : /* no need to jump to error on failure here, we're finishing anyway */
629 0 : OutputFsync(t);
630 :
631 0 : if (close(outfd) != 0)
632 0 : pg_log_error("could not close file \"%s\": %m", outfile);
633 : }
634 8 : outfd = -1;
635 8 : error:
636 8 : if (copybuf != NULL)
637 : {
638 0 : PQfreemem(copybuf);
639 0 : copybuf = NULL;
640 : }
641 8 : destroyPQExpBuffer(query);
642 8 : PQfinish(conn);
643 8 : conn = NULL;
644 : }
645 :
646 : /*
647 : * Unfortunately we can't do sensible signal handling on windows...
648 : */
649 : #ifndef WIN32
650 :
651 : /*
652 : * When sigint is called, just tell the system to exit at the next possible
653 : * moment.
654 : */
655 : static void
656 0 : sigint_handler(int signum)
657 : {
658 0 : time_to_abort = true;
659 0 : }
660 :
661 : /*
662 : * Trigger the output file to be reopened.
663 : */
664 : static void
665 0 : sighup_handler(int signum)
666 : {
667 0 : output_reopen = true;
668 0 : }
669 : #endif
670 :
671 :
672 : int
673 24 : main(int argc, char **argv)
674 : {
675 : static struct option long_options[] = {
676 : /* general options */
677 : {"file", required_argument, NULL, 'f'},
678 : {"fsync-interval", required_argument, NULL, 'F'},
679 : {"no-loop", no_argument, NULL, 'n'},
680 : {"verbose", no_argument, NULL, 'v'},
681 : {"version", no_argument, NULL, 'V'},
682 : {"help", no_argument, NULL, '?'},
683 : /* connection options */
684 : {"dbname", required_argument, NULL, 'd'},
685 : {"host", required_argument, NULL, 'h'},
686 : {"port", required_argument, NULL, 'p'},
687 : {"username", required_argument, NULL, 'U'},
688 : {"no-password", no_argument, NULL, 'w'},
689 : {"password", no_argument, NULL, 'W'},
690 : /* replication options */
691 : {"startpos", required_argument, NULL, 'I'},
692 : {"endpos", required_argument, NULL, 'E'},
693 : {"option", required_argument, NULL, 'o'},
694 : {"plugin", required_argument, NULL, 'P'},
695 : {"status-interval", required_argument, NULL, 's'},
696 : {"slot", required_argument, NULL, 'S'},
697 : /* action */
698 : {"create-slot", no_argument, NULL, 1},
699 : {"start", no_argument, NULL, 2},
700 : {"drop-slot", no_argument, NULL, 3},
701 : {"if-not-exists", no_argument, NULL, 4},
702 : {NULL, 0, NULL, 0}
703 : };
704 : int c;
705 : int option_index;
706 : uint32 hi,
707 : lo;
708 : char *db_name;
709 :
710 24 : pg_logging_init(argv[0]);
711 24 : progname = get_progname(argv[0]);
712 24 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
713 :
714 24 : if (argc > 1)
715 : {
716 22 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
717 : {
718 2 : usage();
719 2 : exit(0);
720 : }
721 20 : else if (strcmp(argv[1], "-V") == 0 ||
722 20 : strcmp(argv[1], "--version") == 0)
723 : {
724 2 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
725 2 : exit(0);
726 : }
727 : }
728 :
729 98 : while ((c = getopt_long(argc, argv, "E:f:F:nvd:h:p:U:wWI:o:P:s:S:",
730 : long_options, &option_index)) != -1)
731 : {
732 80 : switch (c)
733 : {
734 : /* general options */
735 8 : case 'f':
736 8 : outfile = pg_strdup(optarg);
737 8 : break;
738 0 : case 'F':
739 0 : fsync_interval = atoi(optarg) * 1000;
740 0 : if (fsync_interval < 0)
741 : {
742 0 : pg_log_error("invalid fsync interval \"%s\"", optarg);
743 0 : exit(1);
744 : }
745 0 : break;
746 8 : case 'n':
747 8 : noloop = 1;
748 8 : break;
749 0 : case 'v':
750 0 : verbose++;
751 0 : break;
752 : /* connection options */
753 14 : case 'd':
754 14 : dbname = pg_strdup(optarg);
755 14 : break;
756 0 : case 'h':
757 0 : dbhost = pg_strdup(optarg);
758 0 : break;
759 0 : case 'p':
760 0 : if (atoi(optarg) <= 0)
761 : {
762 0 : pg_log_error("invalid port number \"%s\"", optarg);
763 0 : exit(1);
764 : }
765 0 : dbport = pg_strdup(optarg);
766 0 : break;
767 0 : case 'U':
768 0 : dbuser = pg_strdup(optarg);
769 0 : break;
770 0 : case 'w':
771 0 : dbgetpassword = -1;
772 0 : break;
773 0 : case 'W':
774 0 : dbgetpassword = 1;
775 0 : break;
776 : /* replication options */
777 0 : case 'I':
778 0 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
779 : {
780 0 : pg_log_error("could not parse start position \"%s\"", optarg);
781 0 : exit(1);
782 : }
783 0 : startpos = ((uint64) hi) << 32 | lo;
784 0 : break;
785 8 : case 'E':
786 8 : if (sscanf(optarg, "%X/%X", &hi, &lo) != 2)
787 : {
788 0 : pg_log_error("could not parse end position \"%s\"", optarg);
789 0 : exit(1);
790 : }
791 8 : endpos = ((uint64) hi) << 32 | lo;
792 8 : break;
793 12 : case 'o':
794 : {
795 12 : char *data = pg_strdup(optarg);
796 12 : char *val = strchr(data, '=');
797 :
798 12 : if (val != NULL)
799 : {
800 : /* remove =; separate data from val */
801 12 : *val = '\0';
802 12 : val++;
803 : }
804 :
805 12 : noptions += 1;
806 12 : options = pg_realloc(options, sizeof(char *) * noptions * 2);
807 :
808 12 : options[(noptions - 1) * 2] = data;
809 12 : options[(noptions - 1) * 2 + 1] = val;
810 : }
811 :
812 12 : break;
813 0 : case 'P':
814 0 : plugin = pg_strdup(optarg);
815 0 : break;
816 0 : case 's':
817 0 : standby_message_timeout = atoi(optarg) * 1000;
818 0 : if (standby_message_timeout < 0)
819 : {
820 0 : pg_log_error("invalid status interval \"%s\"", optarg);
821 0 : exit(1);
822 : }
823 0 : break;
824 16 : case 'S':
825 16 : replication_slot = pg_strdup(optarg);
826 16 : break;
827 : /* action */
828 2 : case 1:
829 2 : do_create_slot = true;
830 2 : break;
831 10 : case 2:
832 10 : do_start_slot = true;
833 10 : break;
834 0 : case 3:
835 0 : do_drop_slot = true;
836 0 : break;
837 0 : case 4:
838 0 : slot_exists_ok = true;
839 0 : break;
840 :
841 2 : default:
842 :
843 : /*
844 : * getopt_long already emitted a complaint
845 : */
846 2 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
847 : progname);
848 2 : exit(1);
849 : }
850 : }
851 :
852 : /*
853 : * Any non-option arguments?
854 : */
855 18 : if (optind < argc)
856 : {
857 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
858 : argv[optind]);
859 0 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
860 : progname);
861 0 : exit(1);
862 : }
863 :
864 : /*
865 : * Required arguments
866 : */
867 18 : if (replication_slot == NULL)
868 : {
869 2 : pg_log_error("no slot specified");
870 2 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
871 : progname);
872 2 : exit(1);
873 : }
874 :
875 16 : if (do_start_slot && outfile == NULL)
876 : {
877 2 : pg_log_error("no target file specified");
878 2 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
879 : progname);
880 2 : exit(1);
881 : }
882 :
883 14 : if (!do_drop_slot && dbname == NULL)
884 : {
885 2 : pg_log_error("no database specified");
886 2 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
887 : progname);
888 2 : exit(1);
889 : }
890 :
891 12 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
892 : {
893 2 : pg_log_error("at least one action needs to be specified");
894 2 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
895 : progname);
896 2 : exit(1);
897 : }
898 :
899 10 : if (do_drop_slot && (do_create_slot || do_start_slot))
900 : {
901 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
902 0 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
903 : progname);
904 0 : exit(1);
905 : }
906 :
907 10 : if (startpos != InvalidXLogRecPtr && (do_create_slot || do_drop_slot))
908 : {
909 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
910 0 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
911 : progname);
912 0 : exit(1);
913 : }
914 :
915 10 : if (endpos != InvalidXLogRecPtr && !do_start_slot)
916 : {
917 0 : pg_log_error("--endpos may only be specified with --start");
918 0 : fprintf(stderr, _("Try \"%s --help\" for more information.\n"),
919 : progname);
920 0 : exit(1);
921 : }
922 :
923 : #ifndef WIN32
924 10 : pqsignal(SIGINT, sigint_handler);
925 10 : pqsignal(SIGHUP, sighup_handler);
926 : #endif
927 :
928 : /*
929 : * Obtain a connection to server. This is not really necessary but it
930 : * helps to get more precise error messages about authentication, required
931 : * GUC parameters and such.
932 : */
933 10 : conn = GetConnection();
934 10 : if (!conn)
935 : /* Error message already written in GetConnection() */
936 0 : exit(1);
937 10 : atexit(disconnect_atexit);
938 :
939 : /*
940 : * Run IDENTIFY_SYSTEM to make sure we connected using a database specific
941 : * replication connection.
942 : */
943 10 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
944 0 : exit(1);
945 :
946 10 : if (db_name == NULL)
947 : {
948 0 : pg_log_error("could not establish database-specific replication connection");
949 0 : exit(1);
950 : }
951 :
952 : /*
953 : * Set umask so that directories/files are created with the same
954 : * permissions as directories/files in the source data directory.
955 : *
956 : * pg_mode_mask is set to owner-only by default and then updated in
957 : * GetConnection() where we get the mode from the server-side with
958 : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
959 : */
960 10 : umask(pg_mode_mask);
961 :
962 : /* Drop a replication slot. */
963 10 : if (do_drop_slot)
964 : {
965 0 : if (verbose)
966 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
967 :
968 0 : if (!DropReplicationSlot(conn, replication_slot))
969 0 : exit(1);
970 : }
971 :
972 : /* Create a replication slot. */
973 10 : if (do_create_slot)
974 : {
975 2 : if (verbose)
976 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
977 :
978 2 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
979 : false, false, slot_exists_ok))
980 0 : exit(1);
981 2 : startpos = InvalidXLogRecPtr;
982 : }
983 :
984 10 : if (!do_start_slot)
985 2 : exit(0);
986 :
987 : /* Stream loop */
988 : while (true)
989 : {
990 8 : StreamLogicalLog();
991 8 : if (time_to_abort)
992 : {
993 : /*
994 : * We've been Ctrl-C'ed or reached an exit limit condition. That's
995 : * not an error, so exit without an errorcode.
996 : */
997 8 : exit(0);
998 : }
999 0 : else if (noloop)
1000 : {
1001 0 : pg_log_error("disconnected");
1002 0 : exit(1);
1003 : }
1004 : else
1005 : {
1006 : /* translator: check source for value for %d */
1007 0 : pg_log_info("disconnected; waiting %d seconds to try again",
1008 : RECONNECT_SLEEP_TIME);
1009 0 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1010 : }
1011 : }
1012 : }
1013 :
1014 : /*
1015 : * Fsync our output data, and send a feedback message to the server. Returns
1016 : * true if successful, false otherwise.
1017 : *
1018 : * If successful, *now is updated to the current timestamp just before sending
1019 : * feedback.
1020 : */
1021 : static bool
1022 8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1023 : {
1024 : /* flush data to disk, so that we send a recent flush pointer */
1025 8 : if (!OutputFsync(*now))
1026 0 : return false;
1027 8 : *now = feGetCurrentTimestamp();
1028 8 : if (!sendFeedback(conn, *now, true, false))
1029 0 : return false;
1030 :
1031 8 : return true;
1032 : }
1033 :
1034 : /*
1035 : * Try to inform the server about our upcoming demise, but don't wait around or
1036 : * retry on failure.
1037 : */
1038 : static void
1039 8 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, bool keepalive, XLogRecPtr lsn)
1040 : {
1041 8 : (void) PQputCopyEnd(conn, NULL);
1042 8 : (void) PQflush(conn);
1043 :
1044 8 : if (verbose)
1045 : {
1046 0 : if (keepalive)
1047 0 : pg_log_info("end position %X/%X reached by keepalive",
1048 : LSN_FORMAT_ARGS(endpos));
1049 : else
1050 0 : pg_log_info("end position %X/%X reached by WAL record at %X/%X",
1051 : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1052 : }
1053 8 : }
|