Age Owner Branch data TLA Line data Source code
1 : : /*-------------------------------------------------------------------------
2 : : *
3 : : * pg_recvlogical.c - receive data from a logical decoding slot in a streaming
4 : : * fashion and write it to a local file.
5 : : *
6 : : * Portions Copyright (c) 1996-2026, PostgreSQL Global Development Group
7 : : *
8 : : * IDENTIFICATION
9 : : * src/bin/pg_basebackup/pg_recvlogical.c
10 : : *-------------------------------------------------------------------------
11 : : */
12 : :
13 : : #include "postgres_fe.h"
14 : :
15 : : #include <dirent.h>
16 : : #include <limits.h>
17 : : #include <sys/select.h>
18 : : #include <sys/stat.h>
19 : : #include <unistd.h>
20 : :
21 : : #include "common/file_perm.h"
22 : : #include "common/logging.h"
23 : : #include "fe_utils/option_utils.h"
24 : : #include "getopt_long.h"
25 : : #include "libpq-fe.h"
26 : : #include "libpq/pqsignal.h"
27 : : #include "libpq/protocol.h"
28 : : #include "pqexpbuffer.h"
29 : : #include "streamutil.h"
30 : :
31 : : /* Time to sleep between reconnection attempts */
32 : : #define RECONNECT_SLEEP_TIME 5
33 : :
34 : : typedef enum
35 : : {
36 : : STREAM_STOP_NONE,
37 : : STREAM_STOP_END_OF_WAL,
38 : : STREAM_STOP_KEEPALIVE,
39 : : STREAM_STOP_SIGNAL
40 : : } StreamStopReason;
41 : :
42 : : /* Global Options */
43 : : static char *outfile = NULL;
44 : : static int verbose = 0;
45 : : static bool two_phase = false; /* enable-two-phase option */
46 : : static bool failover = false; /* enable-failover option */
47 : : static int noloop = 0;
48 : : static int standby_message_timeout = 10 * 1000; /* 10 sec = default */
49 : : static int fsync_interval = 10 * 1000; /* 10 sec = default */
50 : : static XLogRecPtr startpos = InvalidXLogRecPtr;
51 : : static XLogRecPtr endpos = InvalidXLogRecPtr;
52 : : static bool do_create_slot = false;
53 : : static bool slot_exists_ok = false;
54 : : static bool do_start_slot = false;
55 : : static bool do_drop_slot = false;
56 : : static char *replication_slot = NULL;
57 : :
58 : : /* filled pairwise with option, value. value may be NULL */
59 : : static char **options;
60 : : static size_t noptions = 0;
61 : : static const char *plugin = "test_decoding";
62 : :
63 : : /* Global State */
64 : : static int outfd = -1;
65 : : static volatile sig_atomic_t time_to_abort = false;
66 : : static volatile sig_atomic_t stop_reason = STREAM_STOP_NONE;
67 : : static volatile sig_atomic_t output_reopen = false;
68 : : static bool output_isfile;
69 : : static TimestampTz output_last_fsync = -1;
70 : : static bool output_needs_fsync = false;
71 : : static XLogRecPtr output_written_lsn = InvalidXLogRecPtr;
72 : : static XLogRecPtr output_fsync_lsn = InvalidXLogRecPtr;
73 : :
74 : : static void usage(void);
75 : : static void StreamLogicalLog(void);
76 : : static bool flushAndSendFeedback(PGconn *conn, TimestampTz *now);
77 : : static void prepareToTerminate(PGconn *conn, XLogRecPtr endpos,
78 : : StreamStopReason reason,
79 : : XLogRecPtr lsn);
80 : :
81 : : static void
4487 rhaas@postgresql.org 82 :CBC 1 : usage(void)
83 : : {
4120 bruce@momjian.us 84 : 1 : printf(_("%s controls PostgreSQL logical decoding streams.\n\n"),
85 : : progname);
4487 rhaas@postgresql.org 86 : 1 : printf(_("Usage:\n"));
87 : 1 : printf(_(" %s [OPTION]...\n"), progname);
4279 peter_e@gmx.net 88 : 1 : printf(_("\nAction to be performed:\n"));
89 : 1 : printf(_(" --create-slot create a new replication slot (for the slot's name see --slot)\n"));
90 : 1 : printf(_(" --drop-slot drop the replication slot (for the slot's name see --slot)\n"));
91 : 1 : printf(_(" --start start streaming in a replication slot (for the slot's name see --slot)\n"));
4487 rhaas@postgresql.org 92 : 1 : printf(_("\nOptions:\n"));
366 peter@eisentraut.org 93 : 1 : printf(_(" --enable-failover enable replication slot synchronization to standby servers when\n"
94 : : " creating a replication slot\n"));
95 : 1 : printf(_(" -E, --endpos=LSN exit after receiving the specified LSN\n"));
4279 peter_e@gmx.net 96 : 1 : printf(_(" -f, --file=FILE receive log into this file, - for stdout\n"));
4419 andres@anarazel.de 97 : 1 : printf(_(" -F --fsync-interval=SECS\n"
98 : : " time between fsyncs to the output file (default: %d)\n"), (fsync_interval / 1000));
3940 peter_e@gmx.net 99 : 1 : printf(_(" --if-not-exists do not error if slot already exists when creating a slot\n"));
4279 100 : 1 : printf(_(" -I, --startpos=LSN where in an existing slot should the streaming start\n"));
4487 rhaas@postgresql.org 101 : 1 : printf(_(" -n, --no-loop do not loop on connection lost\n"));
4279 peter_e@gmx.net 102 : 1 : printf(_(" -o, --option=NAME[=VALUE]\n"
103 : : " pass option NAME with optional value VALUE to the\n"
104 : : " output plugin\n"));
105 : 1 : printf(_(" -P, --plugin=PLUGIN use output plugin PLUGIN (default: %s)\n"), plugin);
106 : 1 : printf(_(" -s, --status-interval=SECS\n"
107 : : " time between status packets sent to server (default: %d)\n"), (standby_message_timeout / 1000));
108 : 1 : printf(_(" -S, --slot=SLOTNAME name of the logical replication slot\n"));
366 peter@eisentraut.org 109 : 1 : printf(_(" -t, --enable-two-phase enable decoding of prepared transactions when creating a slot\n"));
110 : 1 : printf(_(" --two-phase (same as --enable-two-phase, deprecated)\n"));
4487 rhaas@postgresql.org 111 : 1 : printf(_(" -v, --verbose output verbose messages\n"));
112 : 1 : printf(_(" -V, --version output version information, then exit\n"));
113 : 1 : printf(_(" -?, --help show this help, then exit\n"));
114 : 1 : printf(_("\nConnection options:\n"));
115 : 1 : printf(_(" -d, --dbname=DBNAME database to connect to\n"));
116 : 1 : printf(_(" -h, --host=HOSTNAME database server host or socket directory\n"));
117 : 1 : printf(_(" -p, --port=PORT database server port number\n"));
118 : 1 : printf(_(" -U, --username=NAME connect as specified database user\n"));
119 : 1 : printf(_(" -w, --no-password never prompt for password\n"));
120 : 1 : printf(_(" -W, --password force password prompt (should happen automatically)\n"));
2314 peter@eisentraut.org 121 : 1 : printf(_("\nReport bugs to <%s>.\n"), PACKAGE_BUGREPORT);
122 : 1 : printf(_("%s home page: <%s>\n"), PACKAGE_NAME, PACKAGE_URL);
4487 rhaas@postgresql.org 123 : 1 : }
124 : :
125 : : /*
126 : : * Send a Standby Status Update message to server.
127 : : */
128 : : static bool
3414 tgl@sss.pgh.pa.us 129 : 29 : sendFeedback(PGconn *conn, TimestampTz now, bool force, bool replyRequested)
130 : : {
131 : : static XLogRecPtr last_written_lsn = InvalidXLogRecPtr;
132 : : static XLogRecPtr last_fsync_lsn = InvalidXLogRecPtr;
133 : :
134 : : char replybuf[1 + 8 + 8 + 8 + 8 + 1];
4487 rhaas@postgresql.org 135 : 29 : int len = 0;
136 : :
137 : : /*
138 : : * we normally don't want to send superfluous feedback, but if it's
139 : : * because of a timeout we need to, otherwise wal_sender_timeout will kill
140 : : * us.
141 : : */
142 [ - + ]: 29 : if (!force &&
4487 rhaas@postgresql.org 143 [ # # ]:UBC 0 : last_written_lsn == output_written_lsn &&
2239 noah@leadboat.com 144 [ # # ]: 0 : last_fsync_lsn == output_fsync_lsn)
4487 rhaas@postgresql.org 145 : 0 : return true;
146 : :
4487 rhaas@postgresql.org 147 [ + + ]:CBC 29 : if (verbose)
358 alvherre@kurilemu.de 148 :GNC 2 : pg_log_info("confirming write up to %X/%08X, flush to %X/%08X (slot %s)",
149 : : LSN_FORMAT_ARGS(output_written_lsn),
150 : : LSN_FORMAT_ARGS(output_fsync_lsn),
151 : : replication_slot);
152 : :
328 nathan@postgresql.or 153 : 29 : replybuf[len] = PqReplMsg_StandbyStatusUpdate;
4487 rhaas@postgresql.org 154 :CBC 29 : len += 1;
4438 bruce@momjian.us 155 : 29 : fe_sendint64(output_written_lsn, &replybuf[len]); /* write */
4487 rhaas@postgresql.org 156 : 29 : len += 8;
3296 tgl@sss.pgh.pa.us 157 : 29 : fe_sendint64(output_fsync_lsn, &replybuf[len]); /* flush */
4487 rhaas@postgresql.org 158 : 29 : len += 8;
4438 bruce@momjian.us 159 : 29 : fe_sendint64(InvalidXLogRecPtr, &replybuf[len]); /* apply */
4487 rhaas@postgresql.org 160 : 29 : len += 8;
4438 bruce@momjian.us 161 : 29 : fe_sendint64(now, &replybuf[len]); /* sendTime */
4487 rhaas@postgresql.org 162 : 29 : len += 8;
3296 tgl@sss.pgh.pa.us 163 : 29 : replybuf[len] = replyRequested ? 1 : 0; /* replyRequested */
4487 rhaas@postgresql.org 164 : 29 : len += 1;
165 : :
166 : 29 : startpos = output_written_lsn;
167 : 29 : last_written_lsn = output_written_lsn;
168 : 29 : last_fsync_lsn = output_fsync_lsn;
169 : :
170 [ + - - + ]: 29 : if (PQputCopyData(conn, replybuf, len) <= 0 || PQflush(conn))
171 : : {
2647 peter@eisentraut.org 172 :UBC 0 : pg_log_error("could not send feedback packet: %s",
173 : : PQerrorMessage(conn));
4487 rhaas@postgresql.org 174 : 0 : return false;
175 : : }
176 : :
4487 rhaas@postgresql.org 177 :CBC 29 : return true;
178 : : }
179 : :
180 : : static void
2740 peter@eisentraut.org 181 : 60 : disconnect_atexit(void)
182 : : {
4487 rhaas@postgresql.org 183 [ + + ]: 60 : if (conn != NULL)
184 : 34 : PQfinish(conn);
185 : 60 : }
186 : :
187 : : static void
3414 tgl@sss.pgh.pa.us 188 : 41 : OutputFsync(TimestampTz now)
189 : : {
4487 rhaas@postgresql.org 190 : 41 : output_last_fsync = now;
191 : :
192 : 41 : output_fsync_lsn = output_written_lsn;
193 : :
194 : : /*
195 : : * Save the last flushed position as the replication start point. On
196 : : * reconnect, replication resumes from there to avoid re-sending flushed
197 : : * data.
198 : : */
165 fujii@postgresql.org 199 :GNC 41 : startpos = output_fsync_lsn;
200 : :
4487 rhaas@postgresql.org 201 [ - + ]:CBC 41 : if (fsync_interval <= 0)
165 fujii@postgresql.org 202 :UNC 0 : return;
203 : :
4429 heikki.linnakangas@i 204 [ + + ]:CBC 41 : if (!output_needs_fsync)
165 fujii@postgresql.org 205 :GNC 27 : return;
206 : :
4429 heikki.linnakangas@i 207 :CBC 14 : output_needs_fsync = false;
208 : :
209 : : /* can only fsync if it's a regular file */
4011 andres@anarazel.de 210 [ + + ]: 14 : if (!output_isfile)
165 fujii@postgresql.org 211 :GNC 11 : return;
212 : :
4011 andres@anarazel.de 213 [ - + ]:CBC 3 : if (fsync(outfd) != 0)
1544 tgl@sss.pgh.pa.us 214 :UBC 0 : pg_fatal("could not fsync file \"%s\": %m", outfile);
215 : : }
216 : :
217 : : /*
218 : : * Start the log streaming
219 : : */
220 : : static void
4292 andres@anarazel.de 221 :CBC 27 : StreamLogicalLog(void)
222 : : {
223 : : PGresult *res;
4487 rhaas@postgresql.org 224 : 27 : char *copybuf = NULL;
3414 tgl@sss.pgh.pa.us 225 : 27 : TimestampTz last_status = -1;
226 : : int i;
227 : : PQExpBuffer query;
228 : : XLogRecPtr cur_record_lsn;
229 : :
1076 michael@paquier.xyz 230 : 27 : cur_record_lsn = InvalidXLogRecPtr;
231 : :
232 : : /*
233 : : * Connect in replication mode to the server
234 : : */
4487 rhaas@postgresql.org 235 [ + + ]: 27 : if (!conn)
4487 rhaas@postgresql.org 236 :GBC 1 : conn = GetConnection();
4487 rhaas@postgresql.org 237 [ - + ]:CBC 27 : if (!conn)
238 : : /* Error message already written in GetConnection() */
4487 rhaas@postgresql.org 239 :UBC 0 : return;
240 : :
241 : : /*
242 : : * Start the replication
243 : : */
4487 rhaas@postgresql.org 244 [ + + ]:CBC 27 : if (verbose)
358 alvherre@kurilemu.de 245 :GNC 2 : pg_log_info("starting log streaming at %X/%08X (slot %s)",
246 : : LSN_FORMAT_ARGS(startpos),
247 : : replication_slot);
248 : :
249 : : /* Initiate the replication stream at specified location */
1682 tgl@sss.pgh.pa.us 250 :CBC 27 : query = createPQExpBuffer();
15 251 : 27 : appendPQExpBufferStr(query, "START_REPLICATION SLOT ");
252 : 27 : AppendQuotedIdentifier(query, replication_slot);
15 tgl@sss.pgh.pa.us 253 :GNC 27 : appendPQExpBuffer(query, " LOGICAL %X/%08X", LSN_FORMAT_ARGS(startpos));
254 : :
255 : : /* print options if there are any */
4487 rhaas@postgresql.org 256 [ + + ]:CBC 27 : if (noptions)
257 : 21 : appendPQExpBufferStr(query, " (");
258 : :
259 [ + + ]: 69 : for (i = 0; i < noptions; i++)
260 : : {
261 : : /* separator */
262 [ + + ]: 42 : if (i > 0)
263 : 21 : appendPQExpBufferStr(query, ", ");
264 : :
265 : : /* write option name */
15 tgl@sss.pgh.pa.us 266 : 42 : AppendQuotedIdentifier(query, options[i * 2]);
267 : :
268 : : /* write option value if specified */
269 [ + - ]: 42 : if (options[i * 2 + 1] != NULL)
270 : : {
271 : 42 : appendPQExpBufferChar(query, ' ');
272 : 42 : AppendQuotedLiteral(query, options[i * 2 + 1]);
273 : : }
274 : : }
275 : :
4487 rhaas@postgresql.org 276 [ + + ]: 27 : if (noptions)
277 : 21 : appendPQExpBufferChar(query, ')');
278 : :
279 : 27 : res = PQexec(conn, query->data);
280 [ + + ]: 27 : if (PQresultStatus(res) != PGRES_COPY_BOTH)
281 : : {
2647 peter@eisentraut.org 282 : 6 : pg_log_error("could not send replication command \"%s\": %s",
283 : : query->data, PQresultErrorMessage(res));
4487 rhaas@postgresql.org 284 : 6 : PQclear(res);
285 : 6 : goto error;
286 : : }
287 : 21 : PQclear(res);
288 : 21 : resetPQExpBuffer(query);
289 : :
290 [ + + ]: 21 : if (verbose)
2647 peter@eisentraut.org 291 :GBC 2 : pg_log_info("streaming initiated");
292 : :
4487 rhaas@postgresql.org 293 [ + + ]:CBC 552 : while (!time_to_abort)
294 : : {
295 : : int r;
296 : : int bytes_left;
297 : : int bytes_written;
298 : : TimestampTz now;
299 : : int hdr_len;
300 : :
1076 michael@paquier.xyz 301 : 549 : cur_record_lsn = InvalidXLogRecPtr;
302 : :
4487 rhaas@postgresql.org 303 [ + + ]: 549 : if (copybuf != NULL)
304 : : {
305 : 332 : PQfreemem(copybuf);
306 : 332 : copybuf = NULL;
307 : : }
308 : :
309 : : /*
310 : : * Potentially send a status message to the primary.
311 : : */
312 : 549 : now = feGetCurrentTimestamp();
313 : :
314 [ + + + + ]: 1078 : if (outfd != -1 &&
315 : 529 : feTimestampDifferenceExceeds(output_last_fsync, now,
316 : : fsync_interval))
165 fujii@postgresql.org 317 :GNC 21 : OutputFsync(now);
318 : :
4487 rhaas@postgresql.org 319 [ + - + + ]:CBC 1098 : if (standby_message_timeout > 0 &&
320 : 549 : feTimestampDifferenceExceeds(last_status, now,
321 : : standby_message_timeout))
322 : : {
323 : : /* Time to send feedback! */
324 [ - + ]: 21 : if (!sendFeedback(conn, now, true, false))
4487 rhaas@postgresql.org 325 :GBC 3 : goto error;
326 : :
4487 rhaas@postgresql.org 327 :CBC 21 : last_status = now;
328 : : }
329 : :
330 : : /* got SIGHUP, close output file */
4429 heikki.linnakangas@i 331 [ + + - + : 549 : if (outfd != -1 && output_reopen && strcmp(outfile, "-") != 0)
- - ]
332 : : {
4429 heikki.linnakangas@i 333 :UBC 0 : now = feGetCurrentTimestamp();
165 fujii@postgresql.org 334 :UNC 0 : OutputFsync(now);
4429 heikki.linnakangas@i 335 :UBC 0 : close(outfd);
336 : 0 : outfd = -1;
337 : : }
4429 heikki.linnakangas@i 338 :CBC 549 : output_reopen = false;
339 : :
340 : : /* open the output file, if not open yet */
4428 341 [ + + ]: 549 : if (outfd == -1)
342 : : {
343 : : struct stat statbuf;
344 : :
345 [ + + ]: 20 : if (strcmp(outfile, "-") == 0)
346 : 18 : outfd = fileno(stdout);
347 : : else
4428 heikki.linnakangas@i 348 :GBC 2 : outfd = open(outfile, O_CREAT | O_APPEND | O_WRONLY | PG_BINARY,
349 : : pg_file_create_mode);
4428 heikki.linnakangas@i 350 [ - + ]:CBC 20 : if (outfd == -1)
351 : : {
2647 peter@eisentraut.org 352 :UBC 0 : pg_log_error("could not open log file \"%s\": %m", outfile);
4428 heikki.linnakangas@i 353 : 0 : goto error;
354 : : }
355 : :
4011 andres@anarazel.de 356 [ - + ]:CBC 20 : if (fstat(outfd, &statbuf) != 0)
357 : : {
2647 peter@eisentraut.org 358 :UBC 0 : pg_log_error("could not stat file \"%s\": %m", outfile);
1797 michael@paquier.xyz 359 : 0 : goto error;
360 : : }
361 : :
4011 andres@anarazel.de 362 [ + + + - ]:CBC 20 : output_isfile = S_ISREG(statbuf.st_mode) && !isatty(outfd);
363 : : }
364 : :
4487 rhaas@postgresql.org 365 : 549 : r = PQgetCopyData(conn, ©buf, 1);
366 [ + + ]: 549 : if (r == 0)
367 : 196 : {
368 : : /*
369 : : * In async mode, and no data available. We block on reading but
370 : : * not more than the specified timeout, so that we can send a
371 : : * response back to the client.
372 : : */
373 : : fd_set input_mask;
3414 tgl@sss.pgh.pa.us 374 : 202 : TimestampTz message_target = 0;
375 : 202 : TimestampTz fsync_target = 0;
376 : : struct timeval timeout;
4487 rhaas@postgresql.org 377 : 202 : struct timeval *timeoutptr = NULL;
378 : :
3766 peter_e@gmx.net 379 [ - + ]: 202 : if (PQsocket(conn) < 0)
380 : : {
2647 peter@eisentraut.org 381 :UBC 0 : pg_log_error("invalid socket: %s", PQerrorMessage(conn));
3766 peter_e@gmx.net 382 :CBC 3 : goto error;
383 : : }
384 : :
4487 rhaas@postgresql.org 385 [ + + ]: 3434 : FD_ZERO(&input_mask);
386 : 202 : FD_SET(PQsocket(conn), &input_mask);
387 : :
388 : : /* Compute when we need to wakeup to send a keepalive message. */
389 [ + - ]: 202 : if (standby_message_timeout)
390 : 202 : message_target = last_status + (standby_message_timeout - 1) *
391 : : ((int64) 1000);
392 : :
393 : : /* Compute when we need to wakeup to fsync the output file. */
4429 heikki.linnakangas@i 394 [ + - + + ]: 202 : if (fsync_interval > 0 && output_needs_fsync)
4487 rhaas@postgresql.org 395 : 93 : fsync_target = output_last_fsync + (fsync_interval - 1) *
396 : : ((int64) 1000);
397 : :
398 : : /* Now compute when to wakeup. */
399 [ - + - - ]: 202 : if (message_target > 0 || fsync_target > 0)
400 : : {
401 : : TimestampTz targettime;
402 : : long secs;
403 : : int usecs;
404 : :
405 : 202 : targettime = message_target;
406 : :
407 [ + + + + ]: 202 : if (fsync_target > 0 && fsync_target < targettime)
4487 rhaas@postgresql.org 408 :GBC 11 : targettime = fsync_target;
409 : :
4487 rhaas@postgresql.org 410 :CBC 202 : feTimestampDifference(now,
411 : : targettime,
412 : : &secs,
413 : : &usecs);
414 [ + + ]: 202 : if (secs <= 0)
4487 rhaas@postgresql.org 415 :GBC 11 : timeout.tv_sec = 1; /* Always sleep at least 1 sec */
416 : : else
4487 rhaas@postgresql.org 417 :CBC 191 : timeout.tv_sec = secs;
418 : 202 : timeout.tv_usec = usecs;
419 : 202 : timeoutptr = &timeout;
420 : : }
421 : :
422 : 202 : r = select(PQsocket(conn) + 1, &input_mask, NULL, NULL, timeoutptr);
423 [ + - + + : 202 : if (r == 0 || (r < 0 && errno == EINTR))
+ - ]
424 : : {
425 : : /*
426 : : * Got a timeout or signal. Continue the loop and either
427 : : * deliver a status packet to the server or just go back into
428 : : * blocking.
429 : : */
430 : 199 : continue;
431 : : }
432 [ - + ]: 199 : else if (r < 0)
433 : : {
1894 peter@eisentraut.org 434 :UBC 0 : pg_log_error("%s() failed: %m", "select");
4487 rhaas@postgresql.org 435 : 0 : goto error;
436 : : }
437 : :
438 : : /* Else there is actually data on the socket */
4487 rhaas@postgresql.org 439 [ + + ]:CBC 199 : if (PQconsumeInput(conn) == 0)
440 : : {
2647 peter@eisentraut.org 441 : 3 : pg_log_error("could not receive data from WAL stream: %s",
442 : : PQerrorMessage(conn));
4487 rhaas@postgresql.org 443 : 3 : goto error;
444 : : }
445 : 196 : continue;
446 : : }
447 : :
448 : : /* End of copy stream */
449 [ + + ]: 347 : if (r == -1)
450 : 15 : break;
451 : :
452 : : /* Failure while reading the copy stream */
453 [ - + ]: 339 : if (r == -2)
454 : : {
2647 peter@eisentraut.org 455 :UBC 0 : pg_log_error("could not read COPY data: %s",
456 : : PQerrorMessage(conn));
4487 rhaas@postgresql.org 457 : 0 : goto error;
458 : : }
459 : :
460 : : /* Check the message type. */
328 nathan@postgresql.or 461 [ + + ]:GNC 339 : if (copybuf[0] == PqReplMsg_Keepalive)
4487 rhaas@postgresql.org 462 :CBC 216 : {
463 : : int pos;
464 : : bool replyRequested;
465 : : XLogRecPtr walEnd;
3464 simon@2ndQuadrant.co 466 : 218 : bool endposReached = false;
467 : :
468 : : /*
469 : : * Parse the keepalive message, enclosed in the CopyData message.
470 : : * We just check if the server requested a reply, and ignore the
471 : : * rest.
472 : : */
328 nathan@postgresql.or 473 :GNC 218 : pos = 1; /* skip msgtype PqReplMsg_Keepalive */
4487 rhaas@postgresql.org 474 :CBC 218 : walEnd = fe_recvint64(©buf[pos]);
475 : 218 : output_written_lsn = Max(walEnd, output_written_lsn);
476 : :
477 : 218 : pos += 8; /* read walEnd */
478 : :
479 : 218 : pos += 8; /* skip sendTime */
480 : :
481 [ - + ]: 218 : if (r < pos + 1)
482 : : {
2647 peter@eisentraut.org 483 :UBC 0 : pg_log_error("streaming header too small: %d", r);
4487 rhaas@postgresql.org 484 : 0 : goto error;
485 : : }
4487 rhaas@postgresql.org 486 :CBC 218 : replyRequested = copybuf[pos];
487 : :
236 alvherre@kurilemu.de 488 [ + + + + ]:GNC 218 : if (XLogRecPtrIsValid(endpos) && walEnd >= endpos)
489 : : {
490 : : /*
491 : : * If there's nothing to read on the socket until a keepalive
492 : : * we know that the server has nothing to send us; and if
493 : : * walEnd has passed endpos, we know nothing else can have
494 : : * committed before endpos. So we can bail out now.
495 : : */
3464 simon@2ndQuadrant.co 496 :CBC 2 : endposReached = true;
497 : : }
498 : :
499 : : /* Send a reply, if necessary */
500 [ + + + + ]: 218 : if (replyRequested || endposReached)
501 : : {
502 [ - + ]: 3 : if (!flushAndSendFeedback(conn, &now))
4487 rhaas@postgresql.org 503 :UBC 0 : goto error;
4487 rhaas@postgresql.org 504 :CBC 3 : last_status = now;
505 : : }
506 : :
3464 simon@2ndQuadrant.co 507 [ + + ]: 218 : if (endposReached)
508 : : {
1076 michael@paquier.xyz 509 : 2 : stop_reason = STREAM_STOP_KEEPALIVE;
3464 simon@2ndQuadrant.co 510 : 2 : time_to_abort = true;
511 : 2 : break;
512 : : }
513 : :
4487 rhaas@postgresql.org 514 : 216 : continue;
515 : : }
328 nathan@postgresql.or 516 [ - + ]:GNC 121 : else if (copybuf[0] != PqReplMsg_WALData)
517 : : {
2647 peter@eisentraut.org 518 :UBC 0 : pg_log_error("unrecognized streaming header: \"%c\"",
519 : : copybuf[0]);
4487 rhaas@postgresql.org 520 : 0 : goto error;
521 : : }
522 : :
523 : : /*
524 : : * Read the header of the WALData message, enclosed in the CopyData
525 : : * message. We only need the WAL location field (dataStart), the rest
526 : : * of the header is ignored.
527 : : */
328 nathan@postgresql.or 528 :GNC 121 : hdr_len = 1; /* msgtype PqReplMsg_WALData */
4487 rhaas@postgresql.org 529 :CBC 121 : hdr_len += 8; /* dataStart */
530 : 121 : hdr_len += 8; /* walEnd */
531 : 121 : hdr_len += 8; /* sendTime */
532 [ - + ]: 121 : if (r < hdr_len + 1)
533 : : {
2647 peter@eisentraut.org 534 :UBC 0 : pg_log_error("streaming header too small: %d", r);
4487 rhaas@postgresql.org 535 : 0 : goto error;
536 : : }
537 : :
538 : : /* Extract WAL location for this block */
3464 simon@2ndQuadrant.co 539 :CBC 121 : cur_record_lsn = fe_recvint64(©buf[1]);
540 : :
236 alvherre@kurilemu.de 541 [ + + - + ]:GNC 121 : if (XLogRecPtrIsValid(endpos) && cur_record_lsn > endpos)
542 : : {
543 : : /*
544 : : * We've read past our endpoint, so prepare to go away being
545 : : * cautious about what happens to our output data.
546 : : */
3464 simon@2ndQuadrant.co 547 [ # # ]:UBC 0 : if (!flushAndSendFeedback(conn, &now))
548 : 0 : goto error;
1076 michael@paquier.xyz 549 : 0 : stop_reason = STREAM_STOP_END_OF_WAL;
3464 simon@2ndQuadrant.co 550 : 0 : time_to_abort = true;
551 : 0 : break;
552 : : }
553 : :
3464 simon@2ndQuadrant.co 554 :CBC 121 : output_written_lsn = Max(cur_record_lsn, output_written_lsn);
555 : :
4487 rhaas@postgresql.org 556 : 121 : bytes_left = r - hdr_len;
557 : 121 : bytes_written = 0;
558 : :
559 : : /* signal that a fsync is needed */
4429 heikki.linnakangas@i 560 : 121 : output_needs_fsync = true;
561 : :
4487 rhaas@postgresql.org 562 [ + + ]: 242 : while (bytes_left)
563 : : {
564 : : int ret;
565 : :
566 : 242 : ret = write(outfd,
567 : 121 : copybuf + hdr_len + bytes_written,
568 : : bytes_left);
569 : :
570 [ - + ]: 121 : if (ret < 0)
571 : : {
1686 peter@eisentraut.org 572 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
573 : : bytes_left, outfile);
4487 rhaas@postgresql.org 574 : 0 : goto error;
575 : : }
576 : :
577 : : /* Write was successful, advance our position */
4487 rhaas@postgresql.org 578 :CBC 121 : bytes_written += ret;
579 : 121 : bytes_left -= ret;
580 : : }
581 : :
582 [ - + ]: 121 : if (write(outfd, "\n", 1) != 1)
583 : : {
1686 peter@eisentraut.org 584 :UBC 0 : pg_log_error("could not write %d bytes to log file \"%s\": %m",
585 : : 1, outfile);
4487 rhaas@postgresql.org 586 : 0 : goto error;
587 : : }
588 : :
236 alvherre@kurilemu.de 589 [ + + + + ]:GNC 121 : if (XLogRecPtrIsValid(endpos) && cur_record_lsn == endpos)
590 : : {
591 : : /* endpos was exactly the record we just processed, we're done */
3464 simon@2ndQuadrant.co 592 [ - + ]:CBC 5 : if (!flushAndSendFeedback(conn, &now))
3464 simon@2ndQuadrant.co 593 :UBC 0 : goto error;
1076 michael@paquier.xyz 594 :CBC 5 : stop_reason = STREAM_STOP_END_OF_WAL;
3464 simon@2ndQuadrant.co 595 : 5 : time_to_abort = true;
596 : 5 : break;
597 : : }
598 : : }
599 : :
600 : : /* Clean up connection state if stream has been aborted */
1076 michael@paquier.xyz 601 [ + + ]: 18 : if (time_to_abort)
602 : 10 : prepareToTerminate(conn, endpos, stop_reason, cur_record_lsn);
603 : :
4487 rhaas@postgresql.org 604 : 18 : res = PQgetResult(conn);
3464 simon@2ndQuadrant.co 605 [ + + ]: 18 : if (PQresultStatus(res) == PGRES_COPY_OUT)
606 : : {
2239 noah@leadboat.com 607 : 10 : PQclear(res);
608 : :
609 : : /*
610 : : * We're doing a client-initiated clean exit and have sent CopyDone to
611 : : * the server. Drain any messages, so we don't miss a last-minute
612 : : * ErrorResponse. The walsender stops generating WALData records once
613 : : * it sees CopyDone, so expect this to finish quickly. After CopyDone,
614 : : * it's too late for sendFeedback(), even if this were to take a long
615 : : * time. Hence, use synchronous-mode PQgetCopyData().
616 : : */
617 : : while (1)
618 : 52 : {
619 : : int r;
620 : :
621 [ + + ]: 62 : if (copybuf != NULL)
622 : : {
623 : 59 : PQfreemem(copybuf);
624 : 59 : copybuf = NULL;
625 : : }
626 : 62 : r = PQgetCopyData(conn, ©buf, 0);
627 [ + + ]: 62 : if (r == -1)
628 : 10 : break;
629 [ - + ]: 52 : if (r == -2)
630 : : {
2239 noah@leadboat.com 631 :UBC 0 : pg_log_error("could not read COPY data: %s",
632 : : PQerrorMessage(conn));
633 : 0 : time_to_abort = false; /* unclean exit */
634 : 0 : goto error;
635 : : }
636 : : }
637 : :
2239 noah@leadboat.com 638 :CBC 10 : res = PQgetResult(conn);
639 : : }
640 [ + + ]: 18 : if (PQresultStatus(res) != PGRES_COMMAND_OK)
641 : : {
2647 peter@eisentraut.org 642 : 7 : pg_log_error("unexpected termination of replication stream: %s",
643 : : PQresultErrorMessage(res));
462 dgustafsson@postgres 644 : 7 : PQclear(res);
4487 rhaas@postgresql.org 645 : 7 : goto error;
646 : : }
647 : 11 : PQclear(res);
648 : :
649 [ + - + + ]: 11 : if (outfd != -1 && strcmp(outfile, "-") != 0)
650 : : {
3414 tgl@sss.pgh.pa.us 651 :GBC 2 : TimestampTz t = feGetCurrentTimestamp();
652 : :
4487 rhaas@postgresql.org 653 : 2 : OutputFsync(t);
654 [ - + ]: 2 : if (close(outfd) != 0)
2647 peter@eisentraut.org 655 :UBC 0 : pg_log_error("could not close file \"%s\": %m", outfile);
656 : : }
4487 rhaas@postgresql.org 657 :CBC 11 : outfd = -1;
658 : 27 : error:
4439 heikki.linnakangas@i 659 [ - + ]: 27 : if (copybuf != NULL)
660 : : {
4439 heikki.linnakangas@i 661 :UBC 0 : PQfreemem(copybuf);
662 : 0 : copybuf = NULL;
663 : : }
4487 rhaas@postgresql.org 664 :CBC 27 : destroyPQExpBuffer(query);
665 : 27 : PQfinish(conn);
666 : 27 : conn = NULL;
667 : : }
668 : :
669 : : /*
670 : : * Unfortunately we can't do sensible signal handling on windows...
671 : : */
672 : : #ifndef WIN32
673 : :
674 : : /*
675 : : * When SIGINT/SIGTERM are caught, just tell the system to exit at the next
676 : : * possible moment.
677 : : */
678 : : static void
1385 tgl@sss.pgh.pa.us 679 : 3 : sigexit_handler(SIGNAL_ARGS)
680 : : {
1076 michael@paquier.xyz 681 : 3 : stop_reason = STREAM_STOP_SIGNAL;
4487 rhaas@postgresql.org 682 : 3 : time_to_abort = true;
683 : 3 : }
684 : :
685 : : /*
686 : : * Trigger the output file to be reopened.
687 : : */
688 : : static void
1385 tgl@sss.pgh.pa.us 689 :UBC 0 : sighup_handler(SIGNAL_ARGS)
690 : : {
4487 rhaas@postgresql.org 691 : 0 : output_reopen = true;
692 : 0 : }
693 : : #endif
694 : :
695 : :
696 : : int
4487 rhaas@postgresql.org 697 :CBC 68 : main(int argc, char **argv)
698 : : {
699 : : static struct option long_options[] = {
700 : : /* general options */
701 : : {"file", required_argument, NULL, 'f'},
702 : : {"fsync-interval", required_argument, NULL, 'F'},
703 : : {"no-loop", no_argument, NULL, 'n'},
704 : : {"enable-failover", no_argument, NULL, 5},
705 : : {"enable-two-phase", no_argument, NULL, 't'},
706 : : {"two-phase", no_argument, NULL, 't'}, /* deprecated */
707 : : {"verbose", no_argument, NULL, 'v'},
708 : : {"version", no_argument, NULL, 'V'},
709 : : {"help", no_argument, NULL, '?'},
710 : : /* connection options */
711 : : {"dbname", required_argument, NULL, 'd'},
712 : : {"host", required_argument, NULL, 'h'},
713 : : {"port", required_argument, NULL, 'p'},
714 : : {"username", required_argument, NULL, 'U'},
715 : : {"no-password", no_argument, NULL, 'w'},
716 : : {"password", no_argument, NULL, 'W'},
717 : : /* replication options */
718 : : {"startpos", required_argument, NULL, 'I'},
719 : : {"endpos", required_argument, NULL, 'E'},
720 : : {"option", required_argument, NULL, 'o'},
721 : : {"plugin", required_argument, NULL, 'P'},
722 : : {"status-interval", required_argument, NULL, 's'},
723 : : {"slot", required_argument, NULL, 'S'},
724 : : /* action */
725 : : {"create-slot", no_argument, NULL, 1},
726 : : {"start", no_argument, NULL, 2},
727 : : {"drop-slot", no_argument, NULL, 3},
728 : : {"if-not-exists", no_argument, NULL, 4},
729 : : {NULL, 0, NULL, 0}
730 : : };
731 : : int c;
732 : : int option_index;
733 : : uint32 hi,
734 : : lo;
735 : : char *db_name;
736 : :
2647 peter@eisentraut.org 737 : 68 : pg_logging_init(argv[0]);
4487 rhaas@postgresql.org 738 : 68 : progname = get_progname(argv[0]);
3837 alvherre@alvh.no-ip. 739 : 68 : set_pglocale_pgservice(argv[0], PG_TEXTDOMAIN("pg_basebackup"));
740 : :
4487 rhaas@postgresql.org 741 [ + + ]: 68 : if (argc > 1)
742 : : {
743 [ + + - + ]: 67 : if (strcmp(argv[1], "--help") == 0 || strcmp(argv[1], "-?") == 0)
744 : : {
745 : 1 : usage();
746 : 1 : exit(0);
747 : : }
748 [ + - ]: 66 : else if (strcmp(argv[1], "-V") == 0 ||
749 [ + + ]: 66 : strcmp(argv[1], "--version") == 0)
750 : : {
751 : 1 : puts("pg_recvlogical (PostgreSQL) " PG_VERSION);
752 : 1 : exit(0);
753 : : }
754 : : }
755 : :
1296 peter@eisentraut.org 756 : 389 : while ((c = getopt_long(argc, argv, "E:f:F:ntvd:h:p:U:wWI:o:P:s:S:",
4487 rhaas@postgresql.org 757 [ + + ]: 389 : long_options, &option_index)) != -1)
758 : : {
759 [ + + + + : 324 : switch (c)
+ + + - -
- - - - +
+ + + + +
+ + - + ]
760 : : {
761 : : /* general options */
762 : 27 : case 'f':
763 : 27 : outfile = pg_strdup(optarg);
764 : 27 : break;
4419 andres@anarazel.de 765 :GBC 2 : case 'F':
1802 michael@paquier.xyz 766 [ - + ]: 2 : if (!option_parse_int(optarg, "-F/--fsync-interval", 0,
767 : : INT_MAX / 1000,
768 : : &fsync_interval))
4419 andres@anarazel.de 769 :UBC 0 : exit(1);
1802 michael@paquier.xyz 770 :GBC 2 : fsync_interval *= 1000;
4419 andres@anarazel.de 771 : 2 : break;
4487 rhaas@postgresql.org 772 :CBC 24 : case 'n':
773 : 24 : noloop = 1;
774 : 24 : break;
1826 akapila@postgresql.o 775 : 2 : case 't':
776 : 2 : two_phase = true;
777 : 2 : break;
1296 peter@eisentraut.org 778 :GBC 1 : case 'v':
779 : 1 : verbose++;
780 : 1 : break;
452 msawada@postgresql.o 781 :CBC 1 : case 5:
782 : 1 : failover = true;
783 : 1 : break;
784 : : /* connection options */
4487 rhaas@postgresql.org 785 : 62 : case 'd':
786 : 62 : dbname = pg_strdup(optarg);
787 : 62 : break;
4487 rhaas@postgresql.org 788 :UBC 0 : case 'h':
789 : 0 : dbhost = pg_strdup(optarg);
790 : 0 : break;
791 : 0 : case 'p':
792 : 0 : dbport = pg_strdup(optarg);
793 : 0 : break;
794 : 0 : case 'U':
795 : 0 : dbuser = pg_strdup(optarg);
796 : 0 : break;
797 : 0 : case 'w':
798 : 0 : dbgetpassword = -1;
799 : 0 : break;
800 : 0 : case 'W':
801 : 0 : dbgetpassword = 1;
802 : 0 : break;
803 : : /* replication options */
4419 andres@anarazel.de 804 : 0 : case 'I':
358 alvherre@kurilemu.de 805 [ # # ]:UNC 0 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
1544 tgl@sss.pgh.pa.us 806 :UBC 0 : pg_fatal("could not parse start position \"%s\"", optarg);
4419 andres@anarazel.de 807 : 0 : startpos = ((uint64) hi) << 32 | lo;
808 : 0 : break;
3464 simon@2ndQuadrant.co 809 :CBC 8 : case 'E':
358 alvherre@kurilemu.de 810 [ - + ]:GNC 8 : if (sscanf(optarg, "%X/%08X", &hi, &lo) != 2)
1544 tgl@sss.pgh.pa.us 811 :UBC 0 : pg_fatal("could not parse end position \"%s\"", optarg);
3464 simon@2ndQuadrant.co 812 :CBC 8 : endpos = ((uint64) hi) << 32 | lo;
813 : 8 : break;
4487 rhaas@postgresql.org 814 : 42 : case 'o':
815 : : {
4438 bruce@momjian.us 816 : 42 : char *data = pg_strdup(optarg);
817 : 42 : char *val = strchr(data, '=');
818 : :
4487 rhaas@postgresql.org 819 [ + - ]: 42 : if (val != NULL)
820 : : {
821 : : /* remove =; separate data from val */
822 : 42 : *val = '\0';
823 : 42 : val++;
824 : : }
825 : :
826 : 42 : noptions += 1;
123 michael@paquier.xyz 827 :GNC 42 : options = pg_realloc_array(options, char *, noptions * 2);
828 : :
4487 rhaas@postgresql.org 829 :CBC 42 : options[(noptions - 1) * 2] = data;
830 : 42 : options[(noptions - 1) * 2 + 1] = val;
831 : : }
832 : :
833 : 42 : break;
834 : 27 : case 'P':
835 : 27 : plugin = pg_strdup(optarg);
836 : 27 : break;
4487 rhaas@postgresql.org 837 :GBC 1 : case 's':
1802 michael@paquier.xyz 838 [ - + ]: 1 : if (!option_parse_int(optarg, "-s/--status-interval", 0,
839 : : INT_MAX / 1000,
840 : : &standby_message_timeout))
4487 rhaas@postgresql.org 841 :UBC 0 : exit(1);
1802 michael@paquier.xyz 842 :GBC 1 : standby_message_timeout *= 1000;
4487 rhaas@postgresql.org 843 : 1 : break;
4487 rhaas@postgresql.org 844 :CBC 64 : case 'S':
845 : 64 : replication_slot = pg_strdup(optarg);
846 : 64 : break;
847 : : /* action */
848 : 31 : case 1:
849 : 31 : do_create_slot = true;
850 : 31 : break;
851 : 28 : case 2:
852 : 28 : do_start_slot = true;
853 : 28 : break;
854 : 3 : case 3:
855 : 3 : do_drop_slot = true;
856 : 3 : break;
4006 andres@anarazel.de 857 :UBC 0 : case 4:
858 : 0 : slot_exists_ok = true;
859 : 0 : break;
860 : :
4487 rhaas@postgresql.org 861 :CBC 1 : default:
862 : : /* getopt_long already emitted a complaint */
1544 tgl@sss.pgh.pa.us 863 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 864 : 1 : exit(1);
865 : : }
866 : : }
867 : :
868 : : /*
869 : : * Any non-option arguments?
870 : : */
871 [ - + ]: 65 : if (optind < argc)
872 : : {
2647 peter@eisentraut.org 873 :UBC 0 : pg_log_error("too many command-line arguments (first is \"%s\")",
874 : : argv[optind]);
1544 tgl@sss.pgh.pa.us 875 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 876 : 0 : exit(1);
877 : : }
878 : :
879 : : /*
880 : : * Required arguments
881 : : */
4487 rhaas@postgresql.org 882 [ + + ]:CBC 65 : if (replication_slot == NULL)
883 : : {
2647 peter@eisentraut.org 884 : 1 : pg_log_error("no slot specified");
1544 tgl@sss.pgh.pa.us 885 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 886 : 1 : exit(1);
887 : : }
888 : :
889 [ + + + + ]: 64 : if (do_start_slot && outfile == NULL)
890 : : {
2647 peter@eisentraut.org 891 : 1 : pg_log_error("no target file specified");
1544 tgl@sss.pgh.pa.us 892 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 893 : 1 : exit(1);
894 : : }
895 : :
896 [ + + + + ]: 63 : if (!do_drop_slot && dbname == NULL)
897 : : {
2647 peter@eisentraut.org 898 : 1 : pg_log_error("no database specified");
1544 tgl@sss.pgh.pa.us 899 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 900 : 1 : exit(1);
901 : : }
902 : :
903 [ + + + + : 62 : if (!do_drop_slot && !do_create_slot && !do_start_slot)
+ + ]
904 : : {
2647 peter@eisentraut.org 905 : 1 : pg_log_error("at least one action needs to be specified");
1544 tgl@sss.pgh.pa.us 906 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 907 : 1 : exit(1);
908 : : }
909 : :
910 [ + + + - : 61 : if (do_drop_slot && (do_create_slot || do_start_slot))
- + ]
911 : : {
2647 peter@eisentraut.org 912 :UBC 0 : pg_log_error("cannot use --create-slot or --start together with --drop-slot");
1544 tgl@sss.pgh.pa.us 913 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 914 : 0 : exit(1);
915 : : }
916 : :
236 alvherre@kurilemu.de 917 [ - + - - :GNC 61 : if (XLogRecPtrIsValid(startpos) && (do_create_slot || do_drop_slot))
- - ]
918 : : {
2647 peter@eisentraut.org 919 :UBC 0 : pg_log_error("cannot use --create-slot or --drop-slot together with --startpos");
1544 tgl@sss.pgh.pa.us 920 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
4487 rhaas@postgresql.org 921 : 0 : exit(1);
922 : : }
923 : :
236 alvherre@kurilemu.de 924 [ + + - + ]:GNC 61 : if (XLogRecPtrIsValid(endpos) && !do_start_slot)
925 : : {
2647 peter@eisentraut.org 926 :UBC 0 : pg_log_error("--endpos may only be specified with --start");
1544 tgl@sss.pgh.pa.us 927 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
3464 simon@2ndQuadrant.co 928 : 0 : exit(1);
929 : : }
930 : :
452 msawada@postgresql.o 931 [ + + ]:CBC 61 : if (!do_create_slot)
932 : : {
933 [ + + ]: 30 : if (two_phase)
934 : : {
366 peter@eisentraut.org 935 : 1 : pg_log_error("%s may only be specified with --create-slot", "--enable-two-phase");
452 msawada@postgresql.o 936 : 1 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
937 : 1 : exit(1);
938 : : }
939 : :
940 [ - + ]: 29 : if (failover)
941 : : {
366 peter@eisentraut.org 942 :UBC 0 : pg_log_error("%s may only be specified with --create-slot", "--enable-failover");
452 msawada@postgresql.o 943 : 0 : pg_log_error_hint("Try \"%s --help\" for more information.", progname);
944 : 0 : exit(1);
945 : : }
946 : : }
947 : :
948 : : /*
949 : : * Obtain a connection to server. Notably, if we need a password, we want
950 : : * to collect it from the user immediately.
951 : : */
4290 andres@anarazel.de 952 :CBC 60 : conn = GetConnection();
953 [ - + ]: 60 : if (!conn)
954 : : /* Error message already written in GetConnection() */
4290 andres@anarazel.de 955 :UBC 0 : exit(1);
2740 peter@eisentraut.org 956 :CBC 60 : atexit(disconnect_atexit);
957 : :
958 : : /*
959 : : * Trap signals. (Don't do this until after the initial password prompt,
960 : : * if one is needed, in GetConnection.)
961 : : */
962 : : #ifndef WIN32
1385 dgustafsson@postgres 963 : 60 : pqsignal(SIGINT, sigexit_handler);
964 : 60 : pqsignal(SIGTERM, sigexit_handler);
1682 tgl@sss.pgh.pa.us 965 : 60 : pqsignal(SIGHUP, sighup_handler);
966 : : #endif
967 : :
968 : : /*
969 : : * Run IDENTIFY_SYSTEM to check the connection type for each action.
970 : : * --create-slot and --start actions require a database-specific
971 : : * replication connection because they handle logical replication slots.
972 : : * --drop-slot can remove replication slots from any replication
973 : : * connection without this restriction.
974 : : */
4290 andres@anarazel.de 975 [ - + ]: 60 : if (!RunIdentifySystem(conn, NULL, NULL, NULL, &db_name))
2740 peter@eisentraut.org 976 :UBC 0 : exit(1);
977 : :
462 fujii@postgresql.org 978 [ + + - + ]:CBC 60 : if (!do_drop_slot && db_name == NULL)
1544 tgl@sss.pgh.pa.us 979 :UBC 0 : pg_fatal("could not establish database-specific replication connection");
980 : :
981 : : /*
982 : : * Set umask so that directories/files are created with the same
983 : : * permissions as directories/files in the source data directory.
984 : : *
985 : : * pg_mode_mask is set to owner-only by default and then updated in
986 : : * GetConnection() where we get the mode from the server-side with
987 : : * RetrieveDataDirCreatePerm() and then call SetDataDirectoryCreatePerm().
988 : : */
3006 sfrost@snowman.net 989 :CBC 60 : umask(pg_mode_mask);
990 : :
991 : : /* Drop a replication slot. */
4487 rhaas@postgresql.org 992 [ + + ]: 60 : if (do_drop_slot)
993 : : {
994 [ - + ]: 3 : if (verbose)
2647 peter@eisentraut.org 995 :UBC 0 : pg_log_info("dropping replication slot \"%s\"", replication_slot);
996 : :
4290 andres@anarazel.de 997 [ - + ]:CBC 3 : if (!DropReplicationSlot(conn, replication_slot))
2740 peter@eisentraut.org 998 :UBC 0 : exit(1);
999 : : }
1000 : :
1001 : : /* Create a replication slot. */
4487 rhaas@postgresql.org 1002 [ + + ]:CBC 60 : if (do_create_slot)
1003 : : {
1004 [ - + ]: 31 : if (verbose)
2647 peter@eisentraut.org 1005 :UBC 0 : pg_log_info("creating replication slot \"%s\"", replication_slot);
1006 : :
3199 peter_e@gmx.net 1007 [ - + ]:CBC 31 : if (!CreateReplicationSlot(conn, replication_slot, plugin, false,
1008 : : false, false, slot_exists_ok, two_phase,
1009 : : failover))
2740 peter@eisentraut.org 1010 :UBC 0 : exit(1);
4006 andres@anarazel.de 1011 :CBC 31 : startpos = InvalidXLogRecPtr;
1012 : : }
1013 : :
4487 rhaas@postgresql.org 1014 [ + + ]: 60 : if (!do_start_slot)
2740 peter@eisentraut.org 1015 : 34 : exit(0);
1016 : :
1017 : : /* Stream loop */
1018 : : while (true)
1019 : : {
4290 andres@anarazel.de 1020 : 27 : StreamLogicalLog();
4487 rhaas@postgresql.org 1021 [ + + ]: 27 : if (time_to_abort)
1022 : : {
1023 : : /*
1024 : : * We've been Ctrl-C'ed or reached an exit limit condition. That's
1025 : : * not an error, so exit without an errorcode.
1026 : : */
2740 peter@eisentraut.org 1027 : 10 : exit(0);
1028 : : }
1029 : :
1030 : : /*
1031 : : * Ensure all written data is flushed to disk before exiting or
1032 : : * starting a new replication.
1033 : : */
165 fujii@postgresql.org 1034 [ + + ]:GNC 17 : if (outfd != -1)
1035 : 10 : OutputFsync(feGetCurrentTimestamp());
1036 : :
1037 [ + + ]: 17 : if (noloop)
1038 : : {
1544 tgl@sss.pgh.pa.us 1039 :CBC 16 : pg_fatal("disconnected");
1040 : : }
1041 : : else
1042 : : {
1043 : : /* translator: check source for value for %d */
2647 peter@eisentraut.org 1044 :GBC 1 : pg_log_info("disconnected; waiting %d seconds to try again",
1045 : : RECONNECT_SLEEP_TIME);
4487 rhaas@postgresql.org 1046 : 1 : pg_usleep(RECONNECT_SLEEP_TIME * 1000000);
1047 : : }
1048 : : }
1049 : : }
1050 : :
1051 : : /*
1052 : : * Fsync our output data, and send a feedback message to the server. Returns
1053 : : * true if successful, false otherwise.
1054 : : *
1055 : : * If successful, *now is updated to the current timestamp just before sending
1056 : : * feedback.
1057 : : */
1058 : : static bool
3464 simon@2ndQuadrant.co 1059 :CBC 8 : flushAndSendFeedback(PGconn *conn, TimestampTz *now)
1060 : : {
1061 : : /* flush data to disk, so that we send a recent flush pointer */
165 fujii@postgresql.org 1062 :GNC 8 : OutputFsync(*now);
3464 simon@2ndQuadrant.co 1063 :CBC 8 : *now = feGetCurrentTimestamp();
1064 [ - + ]: 8 : if (!sendFeedback(conn, *now, true, false))
3464 simon@2ndQuadrant.co 1065 :UBC 0 : return false;
1066 : :
3464 simon@2ndQuadrant.co 1067 :CBC 8 : return true;
1068 : : }
1069 : :
1070 : : /*
1071 : : * Try to inform the server about our upcoming demise, but don't wait around or
1072 : : * retry on failure.
1073 : : */
1074 : : static void
1076 michael@paquier.xyz 1075 : 10 : prepareToTerminate(PGconn *conn, XLogRecPtr endpos, StreamStopReason reason,
1076 : : XLogRecPtr lsn)
1077 : : {
3464 simon@2ndQuadrant.co 1078 : 10 : (void) PQputCopyEnd(conn, NULL);
1079 : 10 : (void) PQflush(conn);
1080 : :
1081 [ + + ]: 10 : if (verbose)
1082 : : {
1076 michael@paquier.xyz 1083 [ + - - - :GBC 1 : switch (reason)
- ]
1084 : : {
1085 : 1 : case STREAM_STOP_SIGNAL:
1086 : 1 : pg_log_info("received interrupt signal, exiting");
1087 : 1 : break;
1076 michael@paquier.xyz 1088 :UBC 0 : case STREAM_STOP_KEEPALIVE:
358 alvherre@kurilemu.de 1089 :UNC 0 : pg_log_info("end position %X/%08X reached by keepalive",
1090 : : LSN_FORMAT_ARGS(endpos));
1076 michael@paquier.xyz 1091 :UBC 0 : break;
1092 : 0 : case STREAM_STOP_END_OF_WAL:
236 alvherre@kurilemu.de 1093 [ # # ]:UNC 0 : Assert(XLogRecPtrIsValid(lsn));
358 1094 : 0 : pg_log_info("end position %X/%08X reached by WAL record at %X/%08X",
1095 : : LSN_FORMAT_ARGS(endpos), LSN_FORMAT_ARGS(lsn));
1076 michael@paquier.xyz 1096 :UBC 0 : break;
1097 : 0 : case STREAM_STOP_NONE:
1098 : 0 : Assert(false);
1099 : : break;
1100 : : }
1101 : : }
3464 simon@2ndQuadrant.co 1102 :CBC 10 : }
|